Skip to content

Commit 469f56d

Browse files
authored
feat: limit broadcast payload size (#1588)
Limits broadcasting messages with payloads bigger than what the tenant as set as the limit
1 parent 7f28e0f commit 469f56d

File tree

13 files changed

+736
-58
lines changed

13 files changed

+736
-58
lines changed

.tool-versions

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
elixir 1.18.4
1+
elixir 1.18.4-otp-27
22
nodejs 18.13.0
33
erlang 27

lib/realtime/tenants.ex

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,4 +501,20 @@ defmodule Realtime.Tenants do
501501
@spec region(Tenant.t()) :: String.t() | nil
502502
def region(%Tenant{extensions: [%{settings: settings}]}), do: Map.get(settings, "region")
503503
def region(_), do: nil
504+
505+
@doc """
506+
"""
507+
@spec validate_payload_size(Tenant.t() | binary(), map()) :: :ok | {:error, :payload_size_exceeded}
508+
def validate_payload_size(tenant_id, payload) when is_binary(tenant_id) do
509+
tenant_id
510+
|> Cache.get_tenant_by_external_id()
511+
|> validate_payload_size(payload)
512+
end
513+
514+
@payload_size_padding 500
515+
def validate_payload_size(%Tenant{max_payload_size_in_kb: max_payload_size_in_kb}, payload) do
516+
max_payload_size = max_payload_size_in_kb * 1000 + @payload_size_padding
517+
payload_size = :erlang.external_size(payload)
518+
if payload_size > max_payload_size, do: {:error, :payload_size_exceeded}, else: :ok
519+
end
504520
end

lib/realtime/tenants/batch_broadcast.ex

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ defmodule Realtime.Tenants.BatchBroadcast do
3333
messages: list(%{id: String.t(), topic: String.t(), payload: map(), event: String.t(), private: boolean()})
3434
},
3535
super_user :: boolean()
36-
) :: :ok | {:error, atom()}
36+
) :: :ok | {:error, atom() | Ecto.Changeset.t()}
3737
def broadcast(auth_params, tenant, messages, super_user \\ false)
3838

3939
def broadcast(%Plug.Conn{} = conn, %Tenant{} = tenant, messages, super_user) do
@@ -49,7 +49,7 @@ defmodule Realtime.Tenants.BatchBroadcast do
4949
end
5050

5151
def broadcast(auth_params, %Tenant{} = tenant, messages, super_user) do
52-
with %Ecto.Changeset{valid?: true} = changeset <- changeset(%__MODULE__{}, messages),
52+
with %Ecto.Changeset{valid?: true} = changeset <- changeset(%__MODULE__{}, messages, tenant),
5353
%Ecto.Changeset{changes: %{messages: messages}} = changeset,
5454
events_per_second_rate = Tenants.events_per_second_rate(tenant),
5555
:ok <- check_rate_limit(events_per_second_rate, tenant, length(messages)) do
@@ -71,15 +71,11 @@ defmodule Realtime.Tenants.BatchBroadcast do
7171
|> Enum.group_by(fn event -> Map.get(event, :topic) end)
7272
|> Enum.each(fn {topic, events} ->
7373
if super_user do
74-
Enum.each(events, fn message ->
75-
send_message_and_count(tenant, events_per_second_rate, message, false)
76-
end)
74+
Enum.each(events, fn message -> send_message_and_count(tenant, events_per_second_rate, message, false) end)
7775
else
7876
case permissions_for_message(tenant, auth_params, topic) do
7977
%Policies{broadcast: %BroadcastPolicies{write: true}} ->
80-
Enum.each(events, fn message ->
81-
send_message_and_count(tenant, events_per_second_rate, message, false)
82-
end)
78+
Enum.each(events, fn message -> send_message_and_count(tenant, events_per_second_rate, message, false) end)
8379

8480
_ ->
8581
nil
@@ -88,22 +84,26 @@ defmodule Realtime.Tenants.BatchBroadcast do
8884
end)
8985

9086
:ok
87+
else
88+
%Ecto.Changeset{valid?: false} = changeset -> {:error, changeset}
89+
error -> error
9190
end
9291
end
9392

9493
def broadcast(_, nil, _, _), do: {:error, :tenant_not_found}
9594

96-
defp changeset(payload, attrs) do
95+
defp changeset(payload, attrs, tenant) do
9796
payload
9897
|> cast(attrs, [])
99-
|> cast_embed(:messages, required: true, with: &message_changeset/2)
98+
|> cast_embed(:messages, required: true, with: fn message, attrs -> message_changeset(message, tenant, attrs) end)
10099
end
101100

102-
defp message_changeset(message, attrs) do
101+
defp message_changeset(message, tenant, attrs) do
103102
message
104103
|> cast(attrs, [:id, :topic, :payload, :event, :private])
105104
|> maybe_put_private_change()
106105
|> validate_required([:topic, :payload, :event])
106+
|> validate_payload_size(tenant)
107107
end
108108

109109
defp maybe_put_private_change(changeset) do
@@ -113,18 +113,25 @@ defmodule Realtime.Tenants.BatchBroadcast do
113113
end
114114
end
115115

116+
defp validate_payload_size(changeset, tenant) do
117+
payload = get_change(changeset, :payload)
118+
119+
case Tenants.validate_payload_size(tenant, payload) do
120+
:ok -> changeset
121+
_ -> add_error(changeset, :payload, "Payload size exceeds tenant limit")
122+
end
123+
end
124+
116125
@event_type "broadcast"
117126
defp send_message_and_count(tenant, events_per_second_rate, message, public?) do
118127
tenant_topic = Tenants.tenant_topic(tenant, message.topic, public?)
119128

120129
payload = %{"payload" => message.payload, "event" => message.event, "type" => "broadcast"}
121130

122131
payload =
123-
if message[:id] do
124-
Map.put(payload, "meta", %{"id" => message.id})
125-
else
126-
payload
127-
end
132+
if message[:id],
133+
do: Map.put(payload, "meta", %{"id" => message.id}),
134+
else: payload
128135

129136
broadcast = %Phoenix.Socket.Broadcast{topic: message.topic, event: @event_type, payload: payload}
130137

lib/realtime/tenants/replication_connection.ex

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,11 @@ defmodule Realtime.Tenants.ReplicationConnection do
329329

330330
{:noreply, state}
331331
else
332+
{:error, %Ecto.Changeset{valid?: false} = changeset} ->
333+
error = Ecto.Changeset.traverse_errors(changeset, &elem(&1, 0))
334+
log_error("UnableToBroadcastChanges", error)
335+
{:noreply, state}
336+
332337
{:error, error} ->
333338
log_error("UnableToBroadcastChanges", error)
334339
{:noreply, state}

lib/realtime_web/channels/realtime_channel/broadcast_handler.ex

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
66

77
import Phoenix.Socket, only: [assign: 3]
88

9+
alias Realtime.Tenants
910
alias RealtimeWeb.RealtimeChannel
1011
alias RealtimeWeb.TenantBroadcaster
1112
alias Phoenix.Socket
@@ -38,8 +39,23 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
3839
|> increment_rate_counter()
3940

4041
%{ack_broadcast: ack_broadcast} = socket.assigns
41-
send_message(tenant_id, self_broadcast, tenant_topic, payload)
42-
if ack_broadcast, do: {:reply, :ok, socket}, else: {:noreply, socket}
42+
43+
res =
44+
case Tenants.validate_payload_size(tenant_id, payload) do
45+
:ok -> send_message(tenant_id, self_broadcast, tenant_topic, payload)
46+
{:error, error} -> {:error, error}
47+
end
48+
49+
cond do
50+
ack_broadcast && match?({:error, :payload_size_exceeded}, res) ->
51+
{:reply, {:error, :payload_size_exceeded}, socket}
52+
53+
ack_broadcast ->
54+
{:reply, :ok, socket}
55+
56+
true ->
57+
{:noreply, socket}
58+
end
4359

4460
{:ok, policies} ->
4561
{:noreply, assign(socket, :policies, policies)}
@@ -65,11 +81,23 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
6581
} = socket
6682

6783
socket = increment_rate_counter(socket)
68-
send_message(tenant_id, self_broadcast, tenant_topic, payload)
6984

70-
if ack_broadcast,
71-
do: {:reply, :ok, socket},
72-
else: {:noreply, socket}
85+
res =
86+
case Tenants.validate_payload_size(tenant_id, payload) do
87+
:ok -> send_message(tenant_id, self_broadcast, tenant_topic, payload)
88+
{:error, error} -> {:error, error}
89+
end
90+
91+
cond do
92+
ack_broadcast && match?({:error, :payload_size_exceeded}, res) ->
93+
{:reply, {:error, :payload_size_exceeded}, socket}
94+
95+
ack_broadcast ->
96+
{:reply, :ok, socket}
97+
98+
true ->
99+
{:noreply, socket}
100+
end
73101
end
74102

75103
defp send_message(tenant_id, self_broadcast, tenant_topic, payload) do

lib/realtime_web/channels/realtime_channel/presence_handler.ex

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -161,13 +161,5 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
161161
end
162162
end
163163

164-
# Added due to the fact that JSON decoding adds some overhead and erlang term will be slighly larger
165-
@payload_size_padding 500
166-
defp validate_payload_size(tenant, payload) do
167-
if :erlang.external_size(payload) > tenant.max_payload_size_in_kb * 1000 + @payload_size_padding do
168-
{:error, :payload_size_exceeded}
169-
else
170-
:ok
171-
end
172-
end
164+
defp validate_payload_size(tenant, payload), do: Tenants.validate_payload_size(tenant, payload)
173165
end

lib/realtime_web/controllers/fallback_controller.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ defmodule RealtimeWeb.FallbackController do
4545
|> render("error.json", message: message)
4646
end
4747

48-
def call(conn, %Ecto.Changeset{valid?: true} = changeset) do
48+
def call(conn, {:error, %Ecto.Changeset{valid?: false} = changeset}) do
4949
log_error(
5050
"UnprocessableEntity",
5151
Ecto.Changeset.traverse_errors(changeset, &translate_error/1)

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.56.5",
7+
version: "2.57.0",
88
elixir: "~> 1.18",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/e2e/tests.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
import { load } from "https://deno.land/[email protected]/dotenv/mod.ts";
2-
import {
3-
createClient,
4-
SupabaseClient,
5-
} from "npm:@supabase/[email protected]";
2+
import { createClient, SupabaseClient } from "npm:@supabase/supabase-js@latest";
63
import { assertEquals } from "https://deno.land/[email protected]/assert/mod.ts";
74
import {
85
describe,

0 commit comments

Comments
 (0)