Skip to content

Commit 6830a19

Browse files
authored
fix: improve crud remote calls (#1633)
This approach prevents sending more information than needed reducing the amount of data send via gen_rpc. We also remove some unnecessary calls to primary in the delete controller operation
1 parent 1cd0319 commit 6830a19

File tree

17 files changed

+135
-140
lines changed

17 files changed

+135
-140
lines changed

lib/realtime/api.ex

Lines changed: 64 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -115,26 +115,32 @@ defmodule Realtime.Api do
115115
Logger.debug("create_tenant #{inspect(attrs, pretty: true)}")
116116
tenant_id = Map.get(attrs, :external_id) || Map.get(attrs, "external_id")
117117

118-
%Tenant{}
119-
|> Tenant.changeset(attrs)
120-
|> region_aware_write(:insert, tenant_id)
118+
if master_region?() do
119+
%Tenant{}
120+
|> Tenant.changeset(attrs)
121+
|> Repo.insert()
122+
else
123+
call(:create_tenant, [attrs], tenant_id)
124+
end
121125
end
122126

123127
@doc """
124128
Updates a tenant.
125-
126-
## Examples
127-
128-
iex> update_tenant(tenant, %{field: new_value})
129-
{:ok, %Tenant{}}
130-
131-
iex> update_tenant(tenant, %{field: bad_value})
132-
{:error, %Ecto.Changeset{}}
133-
134129
"""
135-
def update_tenant(%Tenant{} = tenant, attrs) do
130+
@spec update_tenant_by_external_id(binary(), map()) :: {:ok, Tenant.t()} | {:error, term()}
131+
def update_tenant_by_external_id(tenant_id, attrs) when is_binary(tenant_id) do
132+
if master_region?() do
133+
tenant_id
134+
|> get_tenant_by_external_id(use_replica?: false)
135+
|> update_tenant(attrs)
136+
else
137+
call(:update_tenant_by_external_id, [tenant_id, attrs], tenant_id)
138+
end
139+
end
140+
141+
defp update_tenant(%Tenant{} = tenant, attrs) do
136142
changeset = Tenant.changeset(tenant, attrs)
137-
updated = region_aware_write(changeset, :update, tenant.external_id)
143+
updated = Repo.update(changeset)
138144

139145
case updated do
140146
{:ok, tenant} ->
@@ -150,42 +156,31 @@ defmodule Realtime.Api do
150156
updated
151157
end
152158

153-
@doc """
154-
Deletes a tenant.
155-
156-
## Examples
157-
158-
iex> delete_tenant(tenant)
159-
{:ok, %Tenant{}}
160-
161-
iex> delete_tenant(tenant)
162-
{:error, %Ecto.Changeset{}}
163-
164-
"""
165-
def delete_tenant(%Tenant{} = tenant), do: Repo.delete(tenant)
166-
167159
@spec delete_tenant_by_external_id(String.t()) :: boolean()
168160
def delete_tenant_by_external_id(id) do
169-
from(t in Tenant, where: t.external_id == ^id)
170-
|> region_aware_write(:delete_all, id)
171-
|> case do
172-
{num, _} when num > 0 -> true
173-
_ -> false
161+
if master_region?() do
162+
query = from(t in Tenant, where: t.external_id == ^id)
163+
{num, _} = Repo.delete_all(query)
164+
num > 0
165+
else
166+
call(:delete_tenant_by_external_id, [id], id)
174167
end
175168
end
176169

177-
@spec get_tenant_by_external_id(String.t(), atom()) :: Tenant.t() | nil
178-
def get_tenant_by_external_id(external_id, repo \\ :replica)
179-
when repo in [:primary, :replica] do
180-
repo =
181-
case repo do
182-
:primary -> Repo
183-
:replica -> Replica.replica()
184-
end
170+
@spec get_tenant_by_external_id(String.t(), Keyword.t()) :: Tenant.t() | nil
171+
def get_tenant_by_external_id(external_id, opts \\ []) do
172+
use_replica? = Keyword.get(opts, :use_replica?, true)
185173

186-
Tenant
187-
|> repo.get_by(external_id: external_id)
188-
|> repo.preload(:extensions)
174+
cond do
175+
use_replica? ->
176+
Replica.replica().get_by(Tenant, external_id: external_id) |> Replica.replica().preload(:extensions)
177+
178+
!use_replica? and master_region?() ->
179+
Repo.get_by(Tenant, external_id: external_id) |> Repo.preload(:extensions)
180+
181+
true ->
182+
call(:get_tenant_by_external_id, [external_id, opts], external_id)
183+
end
189184
end
190185

191186
defp list_extensions(type) do
@@ -195,26 +190,36 @@ defmodule Realtime.Api do
195190
end
196191

197192
def rename_settings_field(from, to) do
198-
for extension <- list_extensions("postgres_cdc_rls") do
199-
{value, settings} = Map.pop(extension.settings, from)
200-
new_settings = Map.put(settings, to, value)
201-
202-
extension
203-
|> Changeset.cast(%{settings: new_settings}, [:settings])
204-
|> region_aware_write(:update!, extension.external_id)
193+
if master_region?() do
194+
for extension <- list_extensions("postgres_cdc_rls") do
195+
{value, settings} = Map.pop(extension.settings, from)
196+
new_settings = Map.put(settings, to, value)
197+
198+
extension
199+
|> Changeset.cast(%{settings: new_settings}, [:settings])
200+
|> Repo.update()
201+
end
202+
else
203+
call(:rename_settings_field, [from, to], from)
205204
end
206205
end
207206

207+
@spec preload_counters(nil | Realtime.Api.Tenant.t(), any()) :: nil | Realtime.Api.Tenant.t()
208208
@doc """
209209
Updates the migrations_ran field for a tenant.
210210
"""
211211
@spec update_migrations_ran(binary(), integer()) :: {:ok, Tenant.t()} | {:error, term()}
212212
def update_migrations_ran(external_id, count) do
213-
external_id
214-
|> Cache.get_tenant_by_external_id()
215-
|> Tenant.changeset(%{migrations_ran: count})
216-
|> region_aware_write(:update!, external_id)
217-
|> tap(fn _ -> Cache.distributed_invalidate_tenant_cache(external_id) end)
213+
if master_region?() do
214+
tenant = get_tenant_by_external_id(external_id, use_replica?: false)
215+
216+
tenant
217+
|> Tenant.changeset(%{migrations_ran: count})
218+
|> Repo.update()
219+
|> tap(fn _ -> Cache.distributed_invalidate_tenant_cache(external_id) end)
220+
else
221+
call(:update_migrations_ran, [external_id, count], external_id)
222+
end
218223
end
219224

220225
def preload_counters(nil), do: nil
@@ -257,21 +262,13 @@ defmodule Realtime.Api do
257262

258263
defp maybe_restart_db_connection(_changeset), do: nil
259264

260-
defp local_call? do
265+
defp master_region? do
261266
region = Application.get_env(:realtime, :region)
262267
master_region = Application.get_env(:realtime, :master_region) || region
263268
region == master_region
264269
end
265270

266-
defp region_aware_write(%struct{} = argument, operation, tenant_id) when struct in [Changeset, Ecto.Query] do
267-
if local_call?(),
268-
do: local_call(operation, [argument], tenant_id),
269-
else: remote_call(operation, [argument], tenant_id)
270-
end
271-
272-
defp local_call(operation, args, _tenant_id), do: apply(Realtime.Repo, operation, args)
273-
274-
defp remote_call(operation, args, tenant_id) do
271+
defp call(operation, args, tenant_id) do
275272
master_region = Application.get_env(:realtime, :master_region)
276273

277274
with {:ok, master_node} <- Nodes.node_from_region(master_region, self()),
@@ -281,7 +278,7 @@ defmodule Realtime.Api do
281278
end
282279

283280
defp wrapped_call(master_node, operation, args, tenant_id) do
284-
case GenRpc.call(master_node, Realtime.Repo, operation, args, tenant_id: tenant_id) do
281+
case GenRpc.call(master_node, __MODULE__, operation, args, tenant_id: tenant_id) do
285282
{:error, :rpc_error, reason} -> {:error, reason}
286283
{:error, reason} -> {:error, reason}
287284
result -> {:ok, result}

lib/realtime/repo_replica.ex

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,9 @@ defmodule Realtime.Repo.Replica do
5151
# Do not create module if replica isn't set or configuration is not present
5252
cond do
5353
is_nil(replica) ->
54-
Logger.info("Replica region not found, defaulting to Realtime.Repo")
5554
Realtime.Repo
5655

5756
is_nil(replica_conf) ->
58-
Logger.info("Replica config not found for #{region} region")
5957
Realtime.Repo
6058

6159
true ->

lib/realtime/tenants.ex

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -458,8 +458,7 @@ defmodule Realtime.Tenants do
458458
@spec suspend_tenant_by_external_id(String.t()) :: {:ok, Tenant.t()} | {:error, term()}
459459
def suspend_tenant_by_external_id(external_id) do
460460
external_id
461-
|> Cache.get_tenant_by_external_id()
462-
|> Api.update_tenant(%{suspend: true})
461+
|> Api.update_tenant_by_external_id(%{suspend: true})
463462
|> tap(fn _ -> broadcast_operation_event(:suspend_tenant, external_id) end)
464463
end
465464

@@ -469,8 +468,7 @@ defmodule Realtime.Tenants do
469468
@spec unsuspend_tenant_by_external_id(String.t()) :: {:ok, Tenant.t()} | {:error, term()}
470469
def unsuspend_tenant_by_external_id(external_id) do
471470
external_id
472-
|> Cache.get_tenant_by_external_id()
473-
|> Api.update_tenant(%{suspend: false})
471+
|> Api.update_tenant_by_external_id(%{suspend: false})
474472
|> tap(fn _ -> broadcast_operation_event(:unsuspend_tenant, external_id) end)
475473
end
476474

lib/realtime_web/controllers/tenant_controller.ex

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ defmodule RealtimeWeb.TenantController do
137137
)
138138

139139
def update(conn, %{"tenant_id" => external_id, "tenant" => tenant_params}) do
140-
tenant = Api.get_tenant_by_external_id(external_id)
140+
tenant = Api.get_tenant_by_external_id(external_id, use_replica?: false)
141141

142142
case tenant do
143143
nil ->
@@ -160,7 +160,7 @@ defmodule RealtimeWeb.TenantController do
160160
end
161161

162162
tenant ->
163-
with {:ok, %Tenant{} = tenant} <- Api.update_tenant(tenant, tenant_params) do
163+
with {:ok, %Tenant{} = tenant} <- Api.update_tenant_by_external_id(tenant.external_id, tenant_params) do
164164
conn
165165
|> put_status(:ok)
166166
|> put_resp_header("location", Routes.tenant_path(conn, :show, tenant))
@@ -192,7 +192,7 @@ defmodule RealtimeWeb.TenantController do
192192
def delete(conn, %{"tenant_id" => tenant_id}) do
193193
stop_all_timeout = Enum.count(PostgresCdc.available_drivers()) * 1_000
194194

195-
with %Tenant{} = tenant <- Api.get_tenant_by_external_id(tenant_id, :primary),
195+
with %Tenant{} = tenant <- Api.get_tenant_by_external_id(tenant_id, use_replica: false),
196196
_ <- Tenants.suspend_tenant_by_external_id(tenant_id),
197197
true <- Api.delete_tenant_by_external_id(tenant_id),
198198
true <- Cache.distributed_invalidate_tenant_cache(tenant_id),
@@ -231,7 +231,7 @@ defmodule RealtimeWeb.TenantController do
231231
)
232232

233233
def reload(conn, %{"tenant_id" => tenant_id}) do
234-
case Tenants.get_tenant_by_external_id(tenant_id) do
234+
case Api.get_tenant_by_external_id(tenant_id, use_replica?: false) do
235235
nil ->
236236
log_error("TenantNotFound", "Tenant not found")
237237

lib/realtime_web/plugs/assign_tenant.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ defmodule RealtimeWeb.Plugs.AssignTenant do
2020

2121
def call(%Plug.Conn{host: host} = conn, _opts) do
2222
with {:ok, external_id} <- Database.get_external_id(host),
23-
%Tenant{} = tenant <- Api.get_tenant_by_external_id(external_id) do
23+
%Tenant{} = tenant <- Api.get_tenant_by_external_id(external_id, use_replica?: true) do
2424
Logger.metadata(external_id: external_id, project: external_id)
2525
OpenTelemetry.Tracer.set_attributes(external_id: external_id)
2626

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.66.1",
7+
version: "2.66.2",
88
elixir: "~> 1.18",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/region_aware_routing_test.exs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ defmodule Realtime.Integration.RegionAwareRoutingTest do
4949

5050
Mimic.expect(Realtime.GenRpc, :call, fn node, mod, func, args, opts ->
5151
assert node == master_node
52-
assert mod == Realtime.Repo
53-
assert func == :insert
52+
assert mod == Realtime.Api
53+
assert func == :create_tenant
5454
assert opts[:tenant_id] == external_id
5555

5656
call_original(GenRpc, :call, [node, mod, func, args, opts])
@@ -80,16 +80,16 @@ defmodule Realtime.Integration.RegionAwareRoutingTest do
8080
Realtime.GenRpc
8181
|> Mimic.expect(:call, fn node, mod, func, args, opts ->
8282
assert node == master_node
83-
assert mod == Realtime.Repo
84-
assert func == :insert
83+
assert mod == Realtime.Api
84+
assert func == :create_tenant
8585
assert opts[:tenant_id] == tenant_attrs["external_id"]
8686

8787
call_original(GenRpc, :call, [node, mod, func, args, opts])
8888
end)
8989
|> Mimic.expect(:call, fn node, mod, func, args, opts ->
9090
assert node == master_node
91-
assert mod == Realtime.Repo
92-
assert func == :update
91+
assert mod == Realtime.Api
92+
assert func == :update_tenant_by_external_id
9393
assert opts[:tenant_id] == tenant_attrs["external_id"]
9494

9595
call_original(GenRpc, :call, [node, mod, func, args, opts])
@@ -98,7 +98,7 @@ defmodule Realtime.Integration.RegionAwareRoutingTest do
9898
tenant = tenant_fixture(tenant_attrs)
9999

100100
new_name = "updated_via_routing"
101-
result = Api.update_tenant(tenant, %{name: new_name})
101+
result = Api.update_tenant_by_external_id(tenant.external_id, %{name: new_name})
102102

103103
assert {:ok, %Tenant{} = updated} = result
104104
assert updated.name == new_name
@@ -123,16 +123,16 @@ defmodule Realtime.Integration.RegionAwareRoutingTest do
123123
Realtime.GenRpc
124124
|> Mimic.expect(:call, fn node, mod, func, args, opts ->
125125
assert node == master_node
126-
assert mod == Realtime.Repo
127-
assert func == :insert
126+
assert mod == Realtime.Api
127+
assert func == :create_tenant
128128
assert opts[:tenant_id] == tenant_attrs["external_id"]
129129

130130
call_original(GenRpc, :call, [node, mod, func, args, opts])
131131
end)
132132
|> Mimic.expect(:call, fn node, mod, func, args, opts ->
133133
assert node == master_node
134-
assert mod == Realtime.Repo
135-
assert func == :delete_all
134+
assert mod == Realtime.Api
135+
assert func == :delete_tenant_by_external_id
136136
assert opts[:tenant_id] == tenant_attrs["external_id"]
137137

138138
call_original(GenRpc, :call, [node, mod, func, args, opts])
@@ -164,16 +164,16 @@ defmodule Realtime.Integration.RegionAwareRoutingTest do
164164
Realtime.GenRpc
165165
|> Mimic.expect(:call, fn node, mod, func, args, opts ->
166166
assert node == master_node
167-
assert mod == Realtime.Repo
168-
assert func == :insert
167+
assert mod == Realtime.Api
168+
assert func == :create_tenant
169169
assert opts[:tenant_id] == tenant_attrs["external_id"]
170170

171171
call_original(GenRpc, :call, [node, mod, func, args, opts])
172172
end)
173173
|> Mimic.expect(:call, fn node, mod, func, args, opts ->
174174
assert node == master_node
175-
assert mod == Realtime.Repo
176-
assert func == :update!
175+
assert mod == Realtime.Api
176+
assert func == :update_migrations_ran
177177
assert opts[:tenant_id] == tenant_attrs["external_id"]
178178

179179
call_original(GenRpc, :call, [node, mod, func, args, opts])
@@ -184,7 +184,7 @@ defmodule Realtime.Integration.RegionAwareRoutingTest do
184184
new_migrations_ran = 5
185185
result = Api.update_migrations_ran(tenant.external_id, new_migrations_ran)
186186

187-
assert %Tenant{} = updated = result
187+
assert {:ok, updated} = result
188188
assert updated.migrations_ran == new_migrations_ran
189189

190190
reloaded = Realtime.Repo.get(Tenant, tenant.id)

0 commit comments

Comments
 (0)