Skip to content

Commit 395a097

Browse files
authored
feat: peep storage for PromEx (#1636)
1 parent 40bf817 commit 395a097

File tree

26 files changed

+401
-387
lines changed

26 files changed

+401
-387
lines changed

config/config.exs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ config :gen_rpc,
8181
# This is used for process sanitation purposes so please make sure to set it in a sufficiently high number
8282
async_call_inactivity_timeout: 300_000
8383

84+
config :prom_ex, :storage_adapter, Realtime.PromEx.Store
85+
8486
# Import environment specific config. This must remain at the bottom
8587
# of this file so it overrides the configuration defined above.
8688
import_config "#{Mix.env()}.exs"

config/runtime.exs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ queue_target = Env.get_integer("DB_QUEUE_TARGET", 5000)
3333
queue_interval = Env.get_integer("DB_QUEUE_INTERVAL", 5000)
3434
pool_size = Env.get_integer("DB_POOL_SIZE", 5)
3535
master_region = System.get_env("DB_MASTER_REGION")
36+
region = System.get_env("REGION")
3637

3738
after_connect_query_args =
3839
case System.get_env("DB_AFTER_CONNECT_QUERY") do
@@ -94,6 +95,14 @@ socket_options =
9495
end
9596
end
9697

98+
[_, node_host] = node() |> Atom.to_string() |> String.split("@")
99+
100+
metrics_tags = %{
101+
region: region,
102+
host: node_host,
103+
id: Realtime.Nodes.short_node_id_from_name(node())
104+
}
105+
97106
config :realtime, Realtime.Repo,
98107
hostname: default_db_host,
99108
username: username,
@@ -130,7 +139,8 @@ config :realtime,
130139
users_scope_shards: users_scope_shards,
131140
postgres_cdc_scope_shards: postgres_cdc_scope_shards,
132141
regional_broadcasting: regional_broadcasting,
133-
master_region: master_region
142+
master_region: master_region,
143+
metrics_tags: metrics_tags
134144

135145
if config_env() != :test && run_janitor? do
136146
config :realtime,
@@ -280,7 +290,7 @@ if config_env() != :test do
280290
metrics_blocklist: System.get_env("METRICS_TOKEN_BLOCKLIST", "") |> String.split(","),
281291
metrics_jwt_secret: System.get_env("METRICS_JWT_SECRET"),
282292
db_enc_key: System.get_env("DB_ENC_KEY"),
283-
region: System.get_env("REGION"),
293+
region: region,
284294
prom_poll_rate: Env.get_integer("PROM_POLL_RATE", 5000),
285295
slot_name_suffix: slot_name_suffix
286296
end

config/test.exs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ config :realtime, RealtimeWeb.Endpoint,
3030
http: [port: 4002],
3131
server: true
3232

33+
# that's what config/runtime.exs expects to see as region
34+
System.put_env("REGION", "us-east-1")
35+
3336
config :realtime,
3437
regional_broadcasting: true,
3538
region: "us-east-1",

lib/realtime/application.ex

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ defmodule Realtime.Application do
4343
{Realtime.SignalHandler, %{handler_mod: :erl_signal_handler}}
4444
)
4545

46-
Realtime.PromEx.set_metrics_tags()
4746
:ets.new(Realtime.Tenants.Connect, [:named_table, :set, :public])
4847
:syn.set_event_handler(Realtime.SynHandler)
4948
:ok = :syn.add_node_to_scopes([RegionNodes, Realtime.Tenants.Connect | Realtime.UsersCounter.scopes()])

lib/realtime/metrics_cleaner.ex

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ defmodule Realtime.MetricsCleaner do
1818
def handle_info(:check, %{interval: interval} = state) do
1919
Process.cancel_timer(state.check_ref)
2020

21-
{exec_time, _} = :timer.tc(fn -> loop_and_cleanup_metrics_table() end)
21+
{exec_time, _} = :timer.tc(fn -> loop_and_cleanup_metrics_table() end, :millisecond)
2222

2323
if exec_time > :timer.seconds(5),
2424
do: Logger.warning("Metrics check took: #{exec_time} ms")
@@ -31,31 +31,21 @@ defmodule Realtime.MetricsCleaner do
3131
{:noreply, state}
3232
end
3333

34-
defp check(interval) do
35-
Process.send_after(self(), :check, interval)
36-
end
34+
defp check(interval), do: Process.send_after(self(), :check, interval)
35+
36+
@peep_filter_spec [{{{:_, %{tenant: :"$1"}}, :_}, [{:is_binary, :"$1"}], [:"$1"]}]
3737

38-
@metrics_table Realtime.PromEx.Metrics
39-
@filter_spec [{{{:_, %{tenant: :"$1"}}, :_}, [], [:"$1"]}]
4038
defp loop_and_cleanup_metrics_table do
41-
tenant_ids = Realtime.Tenants.Connect.list_tenants()
39+
tenant_ids = Realtime.Tenants.Connect.list_tenants() |> MapSet.new()
4240

43-
:ets.select(@metrics_table, @filter_spec)
44-
|> Enum.uniq()
45-
|> Enum.reject(fn tenant_id -> tenant_id in tenant_ids end)
46-
|> Enum.each(fn tenant_id -> delete_metric(tenant_id) end)
47-
end
41+
{_, tids} = Peep.Persistent.storage(Realtime.PromEx.Metrics)
4842

49-
@doc """
50-
Deletes all metrics that contain the given tenant or database_host.
51-
"""
52-
@spec delete_metric(String.t()) :: :ok
53-
def delete_metric(tenant) do
54-
:ets.select_delete(@metrics_table, [
55-
{{{:_, %{tenant: tenant}}, :_}, [], [true]},
56-
{{{:_, %{database_host: "db.#{tenant}.supabase.co"}}, :_}, [], [true]}
57-
])
58-
59-
:ok
43+
tids
44+
|> Tuple.to_list()
45+
|> Stream.flat_map(fn tid -> :ets.select(tid, @peep_filter_spec) end)
46+
|> Enum.uniq()
47+
|> Stream.reject(fn tenant_id -> MapSet.member?(tenant_ids, tenant_id) end)
48+
|> Enum.map(fn tenant_id -> %{tenant: tenant_id} end)
49+
|> then(&Peep.prune_tags(Realtime.PromEx.Metrics, &1))
6050
end
6151
end

lib/realtime/monitoring/prom_ex.ex

Lines changed: 24 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
defmodule Realtime.PromEx do
2-
alias Realtime.Nodes
32
alias Realtime.PromEx.Plugins.Channels
43
alias Realtime.PromEx.Plugins.Distributed
54
alias Realtime.PromEx.Plugins.GenRpc
@@ -65,6 +64,29 @@ defmodule Realtime.PromEx do
6564

6665
alias PromEx.Plugins
6766

67+
defmodule Store do
68+
@moduledoc false
69+
# Custom store to set global tags and striped storage
70+
71+
@behaviour PromEx.Storage
72+
73+
@impl true
74+
def scrape(name) do
75+
Peep.get_all_metrics(name)
76+
|> Peep.Prometheus.export()
77+
end
78+
79+
@impl true
80+
def child_spec(name, metrics) do
81+
Peep.child_spec(
82+
name: name,
83+
metrics: metrics,
84+
global_tags: Application.get_env(:realtime, :metrics_tags, %{}),
85+
storage: :striped
86+
)
87+
end
88+
end
89+
6890
@impl true
6991
def plugins do
7092
poll_rate = Application.get_env(:realtime, :prom_poll_rate)
@@ -105,28 +127,7 @@ defmodule Realtime.PromEx do
105127
end
106128

107129
def get_metrics do
108-
%{
109-
region: region,
110-
node_host: node_host,
111-
short_alloc_id: short_alloc_id
112-
} = get_metrics_tags()
113-
114-
def_tags = "host=\"#{node_host}\",region=\"#{region}\",id=\"#{short_alloc_id}\""
115-
116-
metrics =
117-
PromEx.get_metrics(Realtime.PromEx)
118-
|> String.split("\n")
119-
|> Enum.map_join("\n", fn line ->
120-
case Regex.run(~r/(?!\#)^(\w+)(?:{(.*?)})?\s*(.+)$/, line) do
121-
nil ->
122-
line
123-
124-
[_, key, tags, value] ->
125-
tags = if tags == "", do: def_tags, else: tags <> "," <> def_tags
126-
127-
"#{key}{#{tags}} #{value}"
128-
end
129-
end)
130+
metrics = PromEx.get_metrics(Realtime.PromEx)
130131

131132
Realtime.PromEx.__ets_cron_flusher_name__()
132133
|> PromEx.ETSCronFlusher.defer_ets_flush()
@@ -140,20 +141,4 @@ defmodule Realtime.PromEx do
140141
get_metrics()
141142
|> :zlib.compress()
142143
end
143-
144-
def set_metrics_tags do
145-
[_, node_host] = node() |> Atom.to_string() |> String.split("@")
146-
147-
metrics_tags = %{
148-
region: Application.get_env(:realtime, :region),
149-
node_host: node_host,
150-
short_alloc_id: Nodes.short_node_id_from_name(node())
151-
}
152-
153-
Application.put_env(:realtime, :metrics_tags, metrics_tags)
154-
end
155-
156-
def get_metrics_tags do
157-
Application.get_env(:realtime, :metrics_tags)
158-
end
159144
end

lib/realtime/monitoring/prom_ex/plugins/distributed.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ defmodule Realtime.PromEx.Plugins.Distributed do
7070
measurement: :size,
7171
tags: [:origin_node, :target_node]
7272
)
73-
]
73+
],
74+
detach_on_error: false
7475
)
7576
end
7677

lib/realtime/monitoring/prom_ex/plugins/gen_rpc.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ defmodule Realtime.PromEx.Plugins.GenRpc do
7171
measurement: :size,
7272
tags: [:origin_node, :target_node]
7373
)
74-
]
74+
],
75+
detach_on_error: false
7576
)
7677
end
7778

lib/realtime/monitoring/prom_ex/plugins/osmon.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ defmodule Realtime.PromEx.Plugins.OsMon do
5757
description: "The average system load in the last 15 minutes.",
5858
measurement: :avg15
5959
)
60-
]
60+
],
61+
detach_on_error: false
6162
)
6263
end
6364

lib/realtime/monitoring/prom_ex/plugins/phoenix.ex

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ if Code.ensure_loaded?(Phoenix) do
5151
description: "The total open connections to ranch.",
5252
measurement: :active
5353
)
54-
]
54+
],
55+
detach_on_error: false
5556
)
5657
end
5758

@@ -66,6 +67,11 @@ if Code.ensure_loaded?(Phoenix) do
6667
:telemetry.execute(@event_all_connections, %{active: active_conn}, %{})
6768
end
6869

70+
defmodule Buckets do
71+
@moduledoc false
72+
use Peep.Buckets.Custom, buckets: [10, 100, 500, 1_000, 5_000, 10_000]
73+
end
74+
6975
defp channel_events(metric_prefix) do
7076
Event.build(
7177
:phoenix_channel_event_metrics,
@@ -94,9 +100,7 @@ if Code.ensure_loaded?(Phoenix) do
94100
event_name: [:phoenix, :channel_handled_in],
95101
measurement: :duration,
96102
description: "The time it takes for the application to respond to channel messages.",
97-
reporter_options: [
98-
buckets: [10, 100, 500, 1_000, 5_000, 10_000]
99-
],
103+
reporter_options: [peep_bucket_calculator: Buckets],
100104
tag_values: fn %{socket: %Socket{endpoint: endpoint}} ->
101105
%{
102106
endpoint: normalize_module_name(endpoint)
@@ -119,9 +123,7 @@ if Code.ensure_loaded?(Phoenix) do
119123
event_name: [:phoenix, :socket_connected],
120124
measurement: :duration,
121125
description: "The time it takes for the application to establish a socket connection.",
122-
reporter_options: [
123-
buckets: [10, 100, 500, 1_000, 5_000, 10_000]
124-
],
126+
reporter_options: [peep_bucket_calculator: Buckets],
125127
tag_values: fn %{result: result, endpoint: endpoint, transport: transport, serializer: serializer} ->
126128
%{
127129
transport: transport,

0 commit comments

Comments
 (0)