Skip to content

Commit c41c4fe

Browse files
authored
fix: replicate pg_output issue (#1625)
If the string size matches the uuid length we were parsing it to be uuid which created issues. To prevent it we now handle payload based on the relations provided and we further simplified the logic to not use send in our message handling
1 parent 253b6cc commit c41c4fe

File tree

5 files changed

+178
-224
lines changed

5 files changed

+178
-224
lines changed

lib/realtime/adapters/postgres/decoder.ex

Lines changed: 45 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -136,44 +136,36 @@ defmodule Realtime.Adapters.Postgres.Decoder do
136136

137137
@pg_epoch DateTime.from_iso8601("2000-01-01T00:00:00Z")
138138

139-
alias Messages.{
140-
Begin,
141-
Commit,
142-
Origin,
143-
Relation,
144-
Relation.Column,
145-
Insert,
146-
Update,
147-
Delete,
148-
Truncate,
149-
Type,
150-
Unsupported
151-
}
139+
alias Messages.Begin
140+
alias Messages.Commit
141+
alias Messages.Origin
142+
alias Messages.Relation
143+
alias Messages.Relation.Column
144+
alias Messages.Insert
145+
alias Messages.Type
146+
alias Messages.Unsupported
152147

153148
alias Realtime.Adapters.Postgres.OidDatabase
154149

155150
@doc """
156151
Parses logical replication messages from Postgres
157-
158-
## Examples
159-
160-
iex> decode_message(<<73, 0, 0, 96, 0, 78, 0, 2, 116, 0, 0, 0, 3, 98, 97, 122, 116, 0, 0, 0, 3, 53, 54, 48>>)
161-
%Realtime.Adapters.Postgres.Decoder.Messages.Insert{relation_id: 24576, tuple_data: {"baz", "560"}}
162-
163152
"""
164-
def decode_message(message) when is_binary(message) do
165-
decode_message_impl(message)
153+
def decode_message(message, relations) when is_binary(message) do
154+
decode_message_impl(message, relations)
166155
end
167156

168-
defp decode_message_impl(<<"B", lsn::binary-8, timestamp::integer-64, xid::integer-32>>) do
157+
defp decode_message_impl(<<"B", lsn::binary-8, timestamp::integer-64, xid::integer-32>>, _relations) do
169158
%Begin{
170159
final_lsn: decode_lsn(lsn),
171160
commit_timestamp: pgtimestamp_to_timestamp(timestamp),
172161
xid: xid
173162
}
174163
end
175164

176-
defp decode_message_impl(<<"C", _flags::binary-1, lsn::binary-8, end_lsn::binary-8, timestamp::integer-64>>) do
165+
defp decode_message_impl(
166+
<<"C", _flags::binary-1, lsn::binary-8, end_lsn::binary-8, timestamp::integer-64>>,
167+
_relations
168+
) do
177169
%Commit{
178170
flags: [],
179171
lsn: decode_lsn(lsn),
@@ -183,14 +175,14 @@ defmodule Realtime.Adapters.Postgres.Decoder do
183175
end
184176

185177
# TODO: Verify this is correct with real data from Postgres
186-
defp decode_message_impl(<<"O", lsn::binary-8, name::binary>>) do
178+
defp decode_message_impl(<<"O", lsn::binary-8, name::binary>>, _relations) do
187179
%Origin{
188180
origin_commit_lsn: decode_lsn(lsn),
189181
name: name
190182
}
191183
end
192184

193-
defp decode_message_impl(<<"R", id::integer-32, rest::binary>>) do
185+
defp decode_message_impl(<<"R", id::integer-32, rest::binary>>, _relations) do
194186
[
195187
namespace
196188
| [name | [<<replica_identity::binary-1, _number_of_columns::integer-16, columns::binary>>]]
@@ -214,70 +206,17 @@ defmodule Realtime.Adapters.Postgres.Decoder do
214206
}
215207
end
216208

217-
defp decode_message_impl(<<"I", relation_id::integer-32, "N", number_of_columns::integer-16, tuple_data::binary>>) do
218-
{<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns)
219-
220-
%Insert{relation_id: relation_id, tuple_data: decoded_tuple_data}
221-
end
222-
223-
defp decode_message_impl(<<"U", relation_id::integer-32, "N", number_of_columns::integer-16, tuple_data::binary>>) do
224-
{<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns)
225-
226-
%Update{relation_id: relation_id, tuple_data: decoded_tuple_data}
227-
end
228-
229-
defp decode_message_impl(
230-
<<"U", relation_id::integer-32, key_or_old::binary-1, number_of_columns::integer-16, tuple_data::binary>>
231-
)
232-
when key_or_old == "O" or key_or_old == "K" do
233-
{<<"N", new_number_of_columns::integer-16, new_tuple_binary::binary>>, old_decoded_tuple_data} =
234-
decode_tuple_data(tuple_data, number_of_columns)
235-
236-
{<<>>, decoded_tuple_data} = decode_tuple_data(new_tuple_binary, new_number_of_columns)
237-
238-
base_update_msg = %Update{relation_id: relation_id, tuple_data: decoded_tuple_data}
239-
240-
case key_or_old do
241-
"K" -> Map.put(base_update_msg, :changed_key_tuple_data, old_decoded_tuple_data)
242-
"O" -> Map.put(base_update_msg, :old_tuple_data, old_decoded_tuple_data)
243-
end
244-
end
245-
246209
defp decode_message_impl(
247-
<<"D", relation_id::integer-32, key_or_old::binary-1, number_of_columns::integer-16, tuple_data::binary>>
248-
)
249-
when key_or_old == "K" or key_or_old == "O" do
250-
{<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns)
251-
252-
base_delete_msg = %Delete{relation_id: relation_id}
253-
254-
case key_or_old do
255-
"K" -> Map.put(base_delete_msg, :changed_key_tuple_data, decoded_tuple_data)
256-
"O" -> Map.put(base_delete_msg, :old_tuple_data, decoded_tuple_data)
257-
end
258-
end
259-
260-
defp decode_message_impl(<<"T", number_of_relations::integer-32, options::integer-8, column_ids::binary>>) do
261-
truncated_relations =
262-
for relation_id_bin <- column_ids |> :binary.bin_to_list() |> Enum.chunk_every(4),
263-
do: relation_id_bin |> :binary.list_to_bin() |> :binary.decode_unsigned()
264-
265-
decoded_options =
266-
case options do
267-
0 -> []
268-
1 -> [:cascade]
269-
2 -> [:restart_identity]
270-
3 -> [:cascade, :restart_identity]
271-
end
210+
<<"I", relation_id::integer-32, "N", number_of_columns::integer-16, tuple_data::binary>>,
211+
relations
212+
) do
213+
relation = relations |> Map.get(relation_id) |> Map.get(:columns)
214+
{<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns, relation)
272215

273-
%Truncate{
274-
number_of_relations: number_of_relations,
275-
options: decoded_options,
276-
truncated_relations: truncated_relations
277-
}
216+
%Insert{relation_id: relation_id, tuple_data: decoded_tuple_data}
278217
end
279218

280-
defp decode_message_impl(<<"Y", data_type_id::integer-32, namespace_and_name::binary>>) do
219+
defp decode_message_impl(<<"Y", data_type_id::integer-32, namespace_and_name::binary>>, _relations) do
281220
[namespace, name_with_null] = :binary.split(namespace_and_name, <<0>>)
282221
name = String.slice(name_with_null, 0..-2//1)
283222

@@ -288,52 +227,53 @@ defmodule Realtime.Adapters.Postgres.Decoder do
288227
}
289228
end
290229

291-
defp decode_message_impl(binary), do: %Unsupported{data: binary}
230+
defp decode_message_impl(binary, _relations), do: %Unsupported{data: binary}
292231

293-
defp decode_tuple_data(binary, columns_remaining, accumulator \\ [])
232+
defp decode_tuple_data(binary, columns_remaining, relations, accumulator \\ [])
294233

295-
defp decode_tuple_data(remaining_binary, 0, accumulator) when is_binary(remaining_binary),
234+
defp decode_tuple_data(remaining_binary, 0, _relations, accumulator) when is_binary(remaining_binary),
296235
do: {remaining_binary, accumulator |> Enum.reverse() |> List.to_tuple()}
297236

298-
defp decode_tuple_data(<<"n", rest::binary>>, columns_remaining, accumulator),
299-
do: decode_tuple_data(rest, columns_remaining - 1, [nil | accumulator])
237+
defp decode_tuple_data(<<"n", rest::binary>>, columns_remaining, [_ | relations], accumulator),
238+
do: decode_tuple_data(rest, columns_remaining - 1, relations, [nil | accumulator])
300239

301-
defp decode_tuple_data(<<"u", rest::binary>>, columns_remaining, accumulator),
302-
do: decode_tuple_data(rest, columns_remaining - 1, [:unchanged_toast | accumulator])
240+
defp decode_tuple_data(<<"u", rest::binary>>, columns_remaining, [_ | relations], accumulator),
241+
do: decode_tuple_data(rest, columns_remaining - 1, relations, [:unchanged_toast | accumulator])
303242

304243
@start_date "2000-01-01T00:00:00Z"
305244
defp decode_tuple_data(
306245
<<"b", column_length::integer-32, rest::binary>>,
307246
columns_remaining,
247+
[%Column{type: type} | relations],
308248
accumulator
309249
) do
310250
data = :erlang.binary_part(rest, {0, column_length})
311251
remainder = :erlang.binary_part(rest, {byte_size(rest), -(byte_size(rest) - column_length)})
312252

313253
data =
314-
case data do
315-
<<1>> ->
316-
true
254+
case type do
255+
"bool" ->
256+
data == <<1>>
317257

318-
<<0>> ->
319-
false
258+
"jsonb" ->
259+
<<1, rest::binary>> = data
260+
rest
320261

321-
<<uuid_binary::binary-16>> ->
322-
UUID.binary_to_string!(uuid_binary)
262+
"timestamp" ->
263+
<<microseconds::signed-big-64>> = data
323264

324-
<<microseconds::signed-big-64>> ->
325265
@start_date
326266
|> NaiveDateTime.from_iso8601!()
327267
|> NaiveDateTime.add(microseconds, :microsecond)
328268

329-
<<1, binary::binary-size(column_length - 1)>> ->
330-
binary
331-
332-
data when is_binary(data) ->
269+
"text" ->
333270
data
271+
272+
"uuid" ->
273+
UUID.binary_to_string!(data)
334274
end
335275

336-
decode_tuple_data(remainder, columns_remaining - 1, [data | accumulator])
276+
decode_tuple_data(remainder, columns_remaining - 1, relations, [data | accumulator])
337277
end
338278

339279
defp decode_columns(binary, accumulator \\ [])

lib/realtime/tenants/replication_connection.ex

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -267,8 +267,7 @@ defmodule Realtime.Tenants.ReplicationConnection do
267267

268268
def handle_data(data, state) when is_write(data) do
269269
%Write{message: message} = parse(data)
270-
message |> decode_message() |> then(&send(self(), &1))
271-
{:noreply, [], state}
270+
message |> decode_message(state.relations) |> then(&handle_message(&1, state))
272271
end
273272

274273
def handle_data(e, state) do
@@ -277,12 +276,16 @@ defmodule Realtime.Tenants.ReplicationConnection do
277276
end
278277

279278
@impl true
280-
def handle_info(%Decoder.Messages.Begin{commit_timestamp: commit_timestamp}, state) do
279+
280+
def handle_info({:DOWN, _, :process, _, _}, _), do: {:disconnect, :shutdown}
281+
def handle_info(_, state), do: {:noreply, state}
282+
283+
defp handle_message(%Decoder.Messages.Begin{commit_timestamp: commit_timestamp}, state) do
281284
latency_committed_at = NaiveDateTime.utc_now() |> NaiveDateTime.diff(commit_timestamp, :millisecond)
282285
{:noreply, %{state | latency_committed_at: latency_committed_at}}
283286
end
284287

285-
def handle_info(%Decoder.Messages.Relation{} = msg, state) do
288+
defp handle_message(%Decoder.Messages.Relation{} = msg, state) do
286289
%Decoder.Messages.Relation{id: id, namespace: namespace, name: name, columns: columns} = msg
287290
%{relations: relations} = state
288291
relation = %{name: name, columns: columns, namespace: namespace}
@@ -298,7 +301,7 @@ defmodule Realtime.Tenants.ReplicationConnection do
298301
{:noreply, state}
299302
end
300303

301-
def handle_info(%Decoder.Messages.Insert{} = msg, state) do
304+
defp handle_message(%Decoder.Messages.Insert{} = msg, state) do
302305
%Decoder.Messages.Insert{relation_id: relation_id, tuple_data: tuple_data} = msg
303306
%{relations: relations, tenant_id: tenant_id, latency_committed_at: latency_committed_at} = state
304307

@@ -351,9 +354,7 @@ defmodule Realtime.Tenants.ReplicationConnection do
351354
{:noreply, state}
352355
end
353356

354-
def handle_info({:DOWN, _, :process, _, _}, _), do: {:disconnect, :shutdown}
355-
def handle_info(_, state), do: {:noreply, state}
356-
357+
defp handle_message(_, state), do: {:noreply, state}
357358
@impl true
358359
def handle_disconnect(state) do
359360
Logger.warning("Disconnecting broadcast changes handler in the step : #{inspect(state.step)}")

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.65.2",
7+
version: "2.65.3",
88
elixir: "~> 1.18",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

0 commit comments

Comments
 (0)