Skip to content

Commit 215a5b6

Browse files
committed
Only lock Session.WriteBuffer and wipe cache on lock timeout
1 parent e99275d commit 215a5b6

File tree

4 files changed

+109
-96
lines changed

4 files changed

+109
-96
lines changed

lib/plausible/event/write_buffer.ex

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,4 @@ defmodule Plausible.Event.WriteBuffer do
3535
def flush do
3636
Plausible.Ingestion.WriteBuffer.flush(__MODULE__)
3737
end
38-
39-
def lock(timeout) do
40-
Plausible.Ingestion.WriteBuffer.lock(__MODULE__, timeout)
41-
end
42-
43-
def unlock() do
44-
Plausible.Ingestion.WriteBuffer.unlock(__MODULE__)
45-
end
4638
end

lib/plausible/ingestion/write_buffer.ex

Lines changed: 33 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@ defmodule Plausible.Ingestion.WriteBuffer do
55

66
alias Plausible.IngestRepo
77

8-
@lock_timeout :timer.seconds(3)
9-
@lock_interval 100
10-
118
def start_link(opts) do
129
GenServer.start_link(__MODULE__, opts, name: Keyword.fetch!(opts, :name))
1310
end
@@ -20,54 +17,39 @@ defmodule Plausible.Ingestion.WriteBuffer do
2017
GenServer.call(server, :flush, :infinity)
2118
end
2219

20+
def flush_async(server) do
21+
GenServer.cast(server, :flush)
22+
end
23+
2324
@impl true
2425
def init(opts) do
2526
name = Keyword.fetch!(opts, :name)
2627
buffer = opts[:buffer] || []
2728
max_buffer_size = opts[:max_buffer_size] || default_max_buffer_size()
2829
flush_interval_ms = opts[:flush_interval_ms] || default_flush_interval_ms()
29-
lock_timeout_ms = opts[:lock_timeouts_ms] || @lock_timeout
30-
lock_interval_ms = opts[:lock_interval_ms] || @lock_interval
30+
on_init = Keyword.get(opts, :on_init, fn _opts -> %{} end)
3131

3232
Process.flag(:trap_exit, true)
3333
timer = Process.send_after(self(), :tick, flush_interval_ms)
3434

35-
^name = :ets.new(name, [:named_table, :set, :public])
35+
extra_state = on_init.(opts)
3636

3737
{:ok,
38-
%{
39-
buffer: buffer,
40-
timer: timer,
41-
name: name,
42-
insert_sql: Keyword.fetch!(opts, :insert_sql),
43-
insert_opts: Keyword.fetch!(opts, :insert_opts),
44-
header: Keyword.fetch!(opts, :header),
45-
buffer_size: IO.iodata_length(buffer),
46-
max_buffer_size: max_buffer_size,
47-
flush_interval_ms: flush_interval_ms,
48-
lock_timeout_ms: lock_timeout_ms,
49-
lock_interval_ms: lock_interval_ms
50-
}}
51-
end
52-
53-
@spec lock(atom(), pid(), pos_integer()) :: :ok | {:error, :timeout}
54-
def lock(name, locker \\ self(), timeout) do
55-
true = :ets.insert(name, {:state, %{locker: locker}})
56-
57-
flush(name)
58-
59-
receive do
60-
{:locked, ^name} -> :ok
61-
after
62-
timeout -> {:error, :timeout}
63-
end
64-
end
65-
66-
@spec unlock(atom()) :: :ok
67-
def unlock(name) do
68-
true = :ets.insert(name, {:state, %{locker: nil}})
69-
70-
:ok
38+
Map.merge(
39+
%{
40+
buffer: buffer,
41+
timer: timer,
42+
name: name,
43+
insert_sql: Keyword.fetch!(opts, :insert_sql),
44+
insert_opts: Keyword.fetch!(opts, :insert_opts),
45+
on_flush: Keyword.get(opts, :on_flush, fn _result, state -> state end),
46+
header: Keyword.fetch!(opts, :header),
47+
buffer_size: IO.iodata_length(buffer),
48+
max_buffer_size: max_buffer_size,
49+
flush_interval_ms: flush_interval_ms
50+
},
51+
extra_state
52+
)}
7153
end
7254

7355
@impl true
@@ -89,6 +71,14 @@ defmodule Plausible.Ingestion.WriteBuffer do
8971
end
9072
end
9173

74+
def handle_cast(:flush, state) do
75+
%{timer: timer, flush_interval_ms: flush_interval_ms} = state
76+
Process.cancel_timer(timer)
77+
do_flush(state)
78+
new_timer = Process.send_after(self(), :tick, flush_interval_ms)
79+
{:noreply, %{state | buffer: [], buffer_size: 0, timer: new_timer}}
80+
end
81+
9282
@impl true
9383
def handle_info(:tick, state) do
9484
do_flush(state)
@@ -118,44 +108,18 @@ defmodule Plausible.Ingestion.WriteBuffer do
118108
insert_opts: insert_opts,
119109
insert_sql: insert_sql,
120110
header: header,
121-
name: name
111+
name: name,
112+
on_flush: on_flush
122113
} = state
123114

124115
case buffer do
125116
[] ->
126-
nil
117+
on_flush.(:empty, state)
127118

128119
_not_empty ->
129120
Logger.notice("Flushing #{buffer_size} byte(s) RowBinary from #{name}")
130121
IngestRepo.query!(insert_sql, [header | buffer], insert_opts)
131-
end
132-
133-
case :ets.lookup(state.name, :state) do
134-
[state: %{locker: pid}] when is_pid(pid) ->
135-
send(pid, {:locked, state.name})
136-
now = System.monotonic_time()
137-
lock_loop(state.name, now, state.lock_timeout_ms, state.lock_interval_ms)
138-
139-
_ ->
140-
:ignore
141-
end
142-
end
143-
144-
defp lock_loop(name, start, lock_timeout, lock_interval) do
145-
now = System.monotonic_time()
146-
147-
if now - start <= lock_timeout do
148-
Process.sleep(lock_interval)
149-
150-
case :ets.lookup(name, :state) do
151-
[state: %{locker: pid}] when is_pid(pid) ->
152-
lock_loop(name, start, lock_timeout, lock_interval)
153-
154-
_ ->
155-
:pass
156-
end
157-
else
158-
unlock(name)
122+
on_flush.(:success, state)
159123
end
160124
end
161125

lib/plausible/session/transfer.ex

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,7 @@ defmodule Plausible.Session.Transfer do
120120
@cmd_takeover_done ->
121121
# Wipe the cache after transfer is complete to avoid making the transferred cache stale
122122
Cache.Adapter.wipe(:sessions)
123-
# Unblock ingest after trasnsfer is done
124-
Plausible.Event.WriteBuffer.unlock()
123+
# Unblock ingest after transfer is done
125124
Plausible.Session.WriteBuffer.unlock()
126125
:counters.add(given_counter, 1, 1)
127126
end
@@ -130,19 +129,12 @@ defmodule Plausible.Session.Transfer do
130129
defp list_cache_names(session_version, parent) do
131130
if session_version == session_version() and attempted?(parent) do
132131
# Blocking ingest before transfer
133-
locks_acquired? =
134-
[
135-
Task.async(fn -> Plausible.Event.WriteBuffer.lock(@lock_acquire_timeout) end),
136-
Task.async(fn -> Plausible.Session.WriteBuffer.lock(@lock_acquire_timeout) end)
137-
]
138-
# timeouts are managed by WriteBuffer.lock/1 calls instead
139-
|> Task.await_many(:infinity)
140-
|> Enum.all?(&(&1 == :ok))
141-
142-
if locks_acquired? do
143-
Cache.Adapter.get_names(:sessions)
144-
else
145-
[]
132+
case Plausible.Session.WriteBuffer.lock(@lock_acquire_timeout) do
133+
:ok ->
134+
Cache.Adapter.get_names(:sessions)
135+
136+
_ ->
137+
[]
146138
end
147139
else
148140
[]

lib/plausible/session/write_buffer.ex

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
defmodule Plausible.Session.WriteBuffer do
22
@moduledoc false
33

4+
@lock_timeout :timer.seconds(3)
5+
@lock_interval 100
6+
47
%{
58
header: header,
69
insert_sql: insert_sql,
@@ -16,7 +19,9 @@ defmodule Plausible.Session.WriteBuffer do
1619
name: __MODULE__,
1720
header: unquote(header),
1821
insert_sql: unquote(insert_sql),
19-
insert_opts: unquote(insert_opts)
22+
insert_opts: unquote(insert_opts),
23+
on_init: &Plausible.Session.WriteBuffer.on_init/1,
24+
on_flush: &Plausible.Session.WriteBuffer.on_flush/2
2025
)
2126

2227
Plausible.Ingestion.WriteBuffer.child_spec(opts)
@@ -41,11 +46,71 @@ defmodule Plausible.Session.WriteBuffer do
4146
Plausible.Ingestion.WriteBuffer.flush(__MODULE__)
4247
end
4348

44-
def lock(timeout) do
45-
Plausible.Ingestion.WriteBuffer.lock(__MODULE__, timeout)
49+
@doc false
50+
def on_init(opts) do
51+
name = Keyword.fetch!(opts, :name)
52+
53+
^name = :ets.new(name, [:named_table, :set, :public])
54+
55+
%{
56+
lock_timeout_ms: opts[:lock_timeouts_ms] || @lock_timeout,
57+
lock_interval_ms: opts[:lock_interval_ms] || @lock_interval
58+
}
59+
end
60+
61+
@doc false
62+
def on_flush(_, state) do
63+
case :ets.lookup(state.name, :state) do
64+
[state: %{locker: pid}] when is_pid(pid) ->
65+
send(pid, {:locked, state.name})
66+
now = System.monotonic_time()
67+
lock_loop(state.name, now, state.lock_timeout_ms, state.lock_interval_ms)
68+
69+
_ ->
70+
:ignore
71+
end
72+
73+
state
74+
end
75+
76+
def lock(locker \\ self(), timeout) do
77+
name = __MODULE__
78+
79+
true = :ets.insert(name, {:state, %{locker: locker}})
80+
Plausible.Ingestion.WriteBuffer.flush_async(name)
81+
82+
receive do
83+
{:locked, ^name} -> :ok
84+
after
85+
timeout -> {:error, :timeout}
86+
end
4687
end
4788

4889
def unlock() do
49-
Plausible.Ingestion.WriteBuffer.unlock(__MODULE__)
90+
name = __MODULE__
91+
true = :ets.insert(name, {:state, %{locker: nil}})
92+
93+
:ok
94+
end
95+
96+
defp lock_loop(name, start, lock_timeout, lock_interval) do
97+
now = System.monotonic_time()
98+
99+
if now - start <= lock_timeout do
100+
Process.sleep(lock_interval)
101+
102+
case :ets.lookup(name, :state) do
103+
[state: %{locker: pid}] when is_pid(pid) ->
104+
lock_loop(name, start, lock_timeout, lock_interval)
105+
106+
_ ->
107+
:pass
108+
end
109+
else
110+
# Wipe the cache before unlocking to prevent stale session in case
111+
# transfer actually occurs, either partially or completely
112+
Plausible.Cache.Adapter.wipe(:sessions)
113+
unlock()
114+
end
50115
end
51116
end

0 commit comments

Comments
 (0)