Skip to content

Commit 253b6cc

Browse files
authored
fix: remove Realtime.Repo requirements (#1627)
we were using Realtime.Repo to build the required SQL to call tenants databases. as we move towards some regions only using Replicas we can remove fully this logic and rely on the Replica modules to build said queries
1 parent d412b66 commit 253b6cc

File tree

13 files changed

+271
-259
lines changed

13 files changed

+271
-259
lines changed

lib/realtime/api.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,8 @@ defmodule Realtime.Api do
190190

191191
defp list_extensions(type) do
192192
query = from(e in Extensions, where: e.type == ^type, select: e)
193-
194-
Repo.all(query)
193+
replica = Replica.replica()
194+
replica.all(query)
195195
end
196196

197197
def rename_settings_field(from, to) do

lib/realtime/messages.ex

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ defmodule Realtime.Messages do
5959
limit: ^limit,
6060
order_by: [desc: m.inserted_at]
6161

62-
{latency, value} = :timer.tc(Realtime.Repo, :all, [conn, query, Message, [timeout: @default_timeout]], :millisecond)
62+
{latency, value} =
63+
:timer.tc(Realtime.Tenants.Repo, :all, [conn, query, Message, [timeout: @default_timeout]], :millisecond)
64+
6365
:telemetry.execute([:realtime, :tenants, :replay], %{latency: latency}, %{tenant: tenant_id})
6466
value
6567
end

lib/realtime/repo.ex

Lines changed: 0 additions & 244 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
11
defmodule Realtime.Repo do
2-
use Realtime.Logs
3-
42
use Ecto.Repo,
53
otp_app: :realtime,
64
adapter: Ecto.Adapters.Postgres
75

8-
import Ecto.Query
9-
106
def with_dynamic_repo(config, callback) do
117
default_dynamic_repo = get_dynamic_repo()
128
{:ok, repo} = [name: nil, pool_size: 2] |> Keyword.merge(config) |> Realtime.Repo.start_link()
@@ -19,244 +15,4 @@ defmodule Realtime.Repo do
1915
Supervisor.stop(repo)
2016
end
2117
end
22-
23-
@doc """
24-
Lists all records for a given query and converts them into a given struct
25-
"""
26-
@spec all(DBConnection.conn(), Ecto.Queryable.t(), module(), [Postgrex.execute_option()]) ::
27-
{:ok, list(struct())} | {:error, any()}
28-
def all(conn, query, result_struct, opts \\ []) do
29-
conn
30-
|> run_all_query(query, opts)
31-
|> result_to_structs(result_struct)
32-
end
33-
34-
@doc """
35-
Fetches one record for a given query and converts it into a given struct
36-
"""
37-
@spec one(
38-
DBConnection.conn(),
39-
Ecto.Query.t(),
40-
module(),
41-
Postgrex.option() | Keyword.t()
42-
) ::
43-
{:error, any()} | {:ok, struct()} | Ecto.Changeset.t()
44-
def one(conn, query, result_struct, opts \\ []) do
45-
conn
46-
|> run_all_query(query, opts)
47-
|> result_to_single_struct(result_struct, nil)
48-
end
49-
50-
@doc """
51-
Inserts a given changeset into the database and converts the result into a given struct
52-
"""
53-
@spec insert(
54-
DBConnection.conn(),
55-
Ecto.Changeset.t(),
56-
module(),
57-
Postgrex.option() | Keyword.t()
58-
) ::
59-
{:ok, struct()} | {:error, any()} | Ecto.Changeset.t()
60-
def insert(conn, changeset, result_struct, opts \\ []) do
61-
with {:ok, {query, args}} <- insert_query_from_changeset(changeset) do
62-
conn
63-
|> run_query_with_trap(query, args, opts)
64-
|> result_to_single_struct(result_struct, changeset)
65-
end
66-
end
67-
68-
@doc """
69-
Inserts all changesets into the database and converts the result into a given list of structs
70-
"""
71-
@spec insert_all_entries(
72-
DBConnection.conn(),
73-
[Ecto.Changeset.t()],
74-
module(),
75-
Postgrex.option() | Keyword.t()
76-
) ::
77-
{:ok, [struct()]} | {:error, any()} | Ecto.Changeset.t()
78-
def insert_all_entries(conn, changesets, result_struct, opts \\ []) do
79-
with {:ok, {query, args}} <- insert_all_query_from_changeset(changesets) do
80-
conn
81-
|> run_query_with_trap(query, args, opts)
82-
|> result_to_structs(result_struct)
83-
end
84-
end
85-
86-
@doc """
87-
Deletes records for a given query and returns the number of deleted records
88-
"""
89-
@spec del(DBConnection.conn(), Ecto.Queryable.t()) ::
90-
{:ok, non_neg_integer()} | {:error, any()}
91-
def del(conn, query) do
92-
with {:ok, %Postgrex.Result{num_rows: num_rows}} <- run_delete_query(conn, query) do
93-
{:ok, num_rows}
94-
end
95-
end
96-
97-
@doc """
98-
Updates an entry based on the changeset and returns the updated entry
99-
"""
100-
@spec update(DBConnection.conn(), Ecto.Changeset.t(), module()) ::
101-
{:ok, struct()} | {:error, any()} | Ecto.Changeset.t()
102-
def update(conn, changeset, result_struct, opts \\ []) do
103-
with {:ok, {query, args}} <- update_query_from_changeset(changeset) do
104-
conn
105-
|> run_query_with_trap(query, args, opts)
106-
|> result_to_single_struct(result_struct, changeset)
107-
end
108-
end
109-
110-
defp result_to_single_struct(
111-
{:error, %Postgrex.Error{postgres: %{code: :unique_violation, constraint: "channels_name_index"}}},
112-
_struct,
113-
changeset
114-
) do
115-
Ecto.Changeset.add_error(changeset, :name, "has already been taken")
116-
end
117-
118-
defp result_to_single_struct({:error, _} = error, _, _), do: error
119-
120-
defp result_to_single_struct({:ok, %Postgrex.Result{rows: []}}, _, _) do
121-
{:error, :not_found}
122-
end
123-
124-
defp result_to_single_struct({:ok, %Postgrex.Result{rows: [row], columns: columns}}, struct, _) do
125-
{:ok, load(struct, Enum.zip(columns, row))}
126-
end
127-
128-
defp result_to_single_struct({:ok, %Postgrex.Result{num_rows: num_rows}}, _, _) do
129-
raise("expected at most one result but got #{num_rows} in result")
130-
end
131-
132-
defp result_to_structs({:error, _} = error, _), do: error
133-
134-
defp result_to_structs({:ok, %Postgrex.Result{rows: rows, columns: columns}}, struct) do
135-
{:ok, Enum.map(rows, &load(struct, Enum.zip(columns, &1)))}
136-
end
137-
138-
defp insert_query_from_changeset(%{valid?: false} = changeset), do: {:error, changeset}
139-
140-
defp insert_query_from_changeset(changeset) do
141-
schema = changeset.data.__struct__
142-
source = schema.__schema__(:source)
143-
prefix = schema.__schema__(:prefix)
144-
acc = %{header: [], rows: []}
145-
146-
%{header: header, rows: rows} =
147-
Enum.reduce(changeset.changes, acc, fn {field, row}, %{header: header, rows: rows} ->
148-
row =
149-
case row do
150-
row when is_boolean(row) -> row
151-
row when is_atom(row) -> Atom.to_string(row)
152-
_ -> row
153-
end
154-
155-
%{
156-
header: [Atom.to_string(field) | header],
157-
rows: [row | rows]
158-
}
159-
end)
160-
161-
table = "\"#{prefix}\".\"#{source}\""
162-
header = "(#{Enum.map_join(header, ",", &"\"#{&1}\"")})"
163-
164-
arg_index =
165-
rows
166-
|> Enum.with_index(1)
167-
|> Enum.map_join(",", fn {_, index} -> "$#{index}" end)
168-
169-
{:ok, {"INSERT INTO #{table} #{header} VALUES (#{arg_index}) RETURNING *", rows}}
170-
end
171-
172-
defp insert_all_query_from_changeset(changesets) do
173-
invalid = Enum.filter(changesets, &(!&1.valid?))
174-
175-
if invalid != [] do
176-
{:error, changesets}
177-
else
178-
[schema] = changesets |> Enum.map(& &1.data.__struct__) |> Enum.uniq()
179-
180-
source = schema.__schema__(:source)
181-
prefix = schema.__schema__(:prefix)
182-
changes = Enum.map(changesets, & &1.changes)
183-
184-
%{header: header, rows: rows} =
185-
Enum.reduce(changes, %{header: [], rows: []}, fn v, changes_acc ->
186-
Enum.reduce(v, changes_acc, fn {field, row}, %{header: header, rows: rows} ->
187-
row =
188-
case row do
189-
row when is_boolean(row) -> row
190-
row when is_atom(row) -> Atom.to_string(row)
191-
_ -> row
192-
end
193-
194-
%{
195-
header: Enum.uniq([Atom.to_string(field) | header]),
196-
rows: [row | rows]
197-
}
198-
end)
199-
end)
200-
201-
args_index =
202-
rows
203-
|> Enum.chunk_every(length(header))
204-
|> Enum.reduce({"", 1}, fn row, {acc, count} ->
205-
arg_index =
206-
row
207-
|> Enum.with_index(count)
208-
|> Enum.map_join("", fn {_, index} -> "$#{index}," end)
209-
|> String.trim_trailing(",")
210-
|> then(&"(#{&1})")
211-
212-
{"#{acc},#{arg_index}", count + length(row)}
213-
end)
214-
|> elem(0)
215-
|> String.trim_leading(",")
216-
217-
table = "\"#{prefix}\".\"#{source}\""
218-
header = "(#{Enum.map_join(header, ",", &"\"#{&1}\"")})"
219-
{:ok, {"INSERT INTO #{table} #{header} VALUES #{args_index} RETURNING *", rows}}
220-
end
221-
end
222-
223-
defp update_query_from_changeset(%{valid?: false} = changeset), do: {:error, changeset}
224-
225-
defp update_query_from_changeset(changeset) do
226-
%Ecto.Changeset{data: %{id: id, __struct__: struct}, changes: changes} = changeset
227-
changes = Keyword.new(changes)
228-
query = from(c in struct, where: c.id == ^id, select: c, update: [set: ^changes])
229-
{:ok, to_sql(:update_all, query)}
230-
end
231-
232-
defp run_all_query(conn, query, opts) do
233-
{query, args} = to_sql(:all, query)
234-
run_query_with_trap(conn, query, args, opts)
235-
end
236-
237-
defp run_delete_query(conn, query) do
238-
{query, args} = to_sql(:delete_all, query)
239-
run_query_with_trap(conn, query, args)
240-
end
241-
242-
defp run_query_with_trap(conn, query, args, opts \\ []) do
243-
Postgrex.query(conn, query, args, opts)
244-
rescue
245-
e ->
246-
log_error("ErrorRunningQuery", e)
247-
{:error, :postgrex_exception}
248-
catch
249-
:exit, {:noproc, {DBConnection.Holder, :checkout, _}} ->
250-
log_error(
251-
"UnableCheckoutConnection",
252-
"Unable to checkout connection, please check your connection pool configuration"
253-
)
254-
255-
{:error, :postgrex_exception}
256-
257-
:exit, reason ->
258-
log_error("UnknownError", reason)
259-
260-
{:error, :postgrex_exception}
261-
end
26218
end

lib/realtime/tenants/authorization.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ defmodule Realtime.Tenants.Authorization do
1717
alias Realtime.Database
1818
alias Realtime.GenCounter
1919
alias Realtime.GenRpc
20-
alias Realtime.Repo
20+
alias Realtime.Tenants.Repo
2121
alias Realtime.Tenants.Authorization.Policies
2222

2323
defstruct [:tenant_id, :topic, :headers, :jwt, :claims, :role, :sub]

0 commit comments

Comments
 (0)