Skip to content

Commit 6a1d77a

Browse files
authored
fix: improve metrics handling (#1654)
* Allow 3x more max heap size than other processes * Avoid compressing given that gen_rpc does this for us already * fix: remove limit_concurrent metric * fix: revert the way we fetched tenants to report on connected metric
1 parent 2994d65 commit 6a1d77a

File tree

8 files changed

+51
-111
lines changed

8 files changed

+51
-111
lines changed

lib/realtime/monitoring/prom_ex.ex

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,4 @@ defmodule Realtime.PromEx do
134134

135135
metrics
136136
end
137-
138-
@doc "Compressed metrics using :zlib.compress/1"
139-
@spec get_compressed_metrics() :: binary()
140-
def get_compressed_metrics do
141-
get_metrics()
142-
|> :zlib.compress()
143-
end
144137
end

lib/realtime/monitoring/prom_ex/plugins/tenant.ex

Lines changed: 1 addition & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ defmodule Realtime.PromEx.Plugins.Tenant do
2222
[
2323
channel_events(),
2424
replication_metrics(),
25-
subscription_metrics(),
2625
payload_size_metrics()
2726
]
2827
end
@@ -78,21 +77,14 @@ defmodule Realtime.PromEx.Plugins.Tenant do
7877
description: "The cluster total count of connected clients for a tenant.",
7978
measurement: :connected_cluster,
8079
tags: [:tenant]
81-
),
82-
last_value(
83-
[:realtime, :connections, :limit_concurrent],
84-
event_name: [:realtime, :connections],
85-
description: "The total count of connected clients for a tenant.",
86-
measurement: :limit,
87-
tags: [:tenant]
8880
)
8981
],
9082
detach_on_error: false
9183
)
9284
end
9385

9486
def execute_tenant_metrics do
95-
tenants = Tenants.Connect.list_tenants()
87+
tenants = Tenants.list_connected_tenants(Node.self())
9688
cluster_counts = UsersCounter.tenant_counts()
9789
node_counts = UsersCounter.tenant_counts(Node.self())
9890

@@ -136,28 +128,6 @@ defmodule Realtime.PromEx.Plugins.Tenant do
136128
)
137129
end
138130

139-
defp subscription_metrics do
140-
Event.build(
141-
:realtime_tenant_channel_event_metrics,
142-
[
143-
sum(
144-
[:realtime, :subscriptions_checker, :pid_not_found],
145-
event_name: [:realtime, :subscriptions_checker, :pid_not_found],
146-
measurement: :sum,
147-
description: "Sum of pids not found in Subscription tables.",
148-
tags: [:tenant]
149-
),
150-
sum(
151-
[:realtime, :subscriptions_checker, :phantom_pid_detected],
152-
event_name: [:realtime, :subscriptions_checker, :phantom_pid_detected],
153-
measurement: :sum,
154-
description: "Sum of phantom pids detected in Subscription tables.",
155-
tags: [:tenant]
156-
)
157-
]
158-
)
159-
end
160-
161131
defmodule PolicyAuthorization.Buckets do
162132
@moduledoc false
163133
use Peep.Buckets.Custom, buckets: [10, 250, 5000, 15_000]
@@ -237,20 +207,6 @@ defmodule Realtime.PromEx.Plugins.Tenant do
237207
measurement: :size,
238208
tags: [:tenant]
239209
),
240-
last_value(
241-
[:realtime, :channel, :events, :limit_per_second],
242-
event_name: [:realtime, :rate_counter, :channel, :events],
243-
measurement: :limit,
244-
description: "Rate limit of messages per second sent on a Realtime Channel.",
245-
tags: [:tenant]
246-
),
247-
last_value(
248-
[:realtime, :channel, :joins, :limit_per_second],
249-
event_name: [:realtime, :rate_counter, :channel, :joins],
250-
measurement: :limit,
251-
description: "Rate limit of joins per second on a Realtime Channel.",
252-
tags: [:tenant]
253-
),
254210
distribution(
255211
[:realtime, :tenants, :read_authorization_check],
256212
event_name: [:realtime, :tenants, :read_authorization_check],

lib/realtime/tenants.ex

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,15 @@ defmodule Realtime.Tenants do
1515
alias Realtime.Tenants.Migrations
1616
alias Realtime.UsersCounter
1717

18+
@doc """
19+
Gets a list of connected tenant `external_id` strings in the cluster or a node.
20+
"""
21+
@spec list_connected_tenants(atom()) :: [String.t()]
22+
def list_connected_tenants(node) do
23+
UsersCounter.scopes()
24+
|> Enum.flat_map(fn scope -> :syn.group_names(scope, node) end)
25+
end
26+
1827
@doc """
1928
Gets the database connection pid managed by the Tenants.Connect process.
2029

lib/realtime_web/controllers/metrics_controller.ex

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,40 +4,50 @@ defmodule RealtimeWeb.MetricsController do
44
alias Realtime.PromEx
55
alias Realtime.GenRpc
66

7+
# We give more memory and time to collect metrics from all nodes as this is a lot of work
78
def index(conn, _) do
8-
timeout = Application.fetch_env!(:realtime, :metrics_rpc_timeout)
9-
10-
cluster_metrics =
11-
Node.list()
12-
|> Task.async_stream(
13-
fn node ->
14-
{node, GenRpc.call(node, PromEx, :get_compressed_metrics, [], timeout: timeout)}
15-
end,
16-
timeout: :infinity
17-
)
18-
|> Enum.reduce([PromEx.get_metrics()], fn {_, {node, response}}, acc ->
19-
case response do
20-
{:error, :rpc_error, reason} ->
21-
Logger.error("Cannot fetch metrics from the node #{inspect(node)} because #{inspect(reason)}")
22-
acc
23-
24-
metrics ->
25-
[uncompress(metrics) | acc]
26-
end
27-
end)
28-
|> Enum.reverse()
9+
{time, metrics} = :timer.tc(&cluster_metrics/0, :millisecond)
10+
Logger.info("Collected cluster metrics in #{time} milliseconds")
2911

3012
conn
3113
|> put_resp_content_type("text/plain")
32-
|> send_resp(200, cluster_metrics)
14+
|> send_resp(200, metrics)
15+
end
16+
17+
defp cluster_metrics() do
18+
bump_max_heap_size()
19+
timeout = Application.fetch_env!(:realtime, :metrics_rpc_timeout)
20+
21+
Node.list()
22+
|> Task.async_stream(
23+
fn node ->
24+
{node, GenRpc.call(node, __MODULE__, :get_metrics, [], timeout: timeout)}
25+
end,
26+
timeout: :infinity
27+
)
28+
|> Enum.reduce([PromEx.get_metrics()], fn {_, {node, response}}, acc ->
29+
case response do
30+
{:error, :rpc_error, reason} ->
31+
Logger.error("Cannot fetch metrics from the node #{inspect(node)} because #{inspect(reason)}")
32+
acc
33+
34+
metrics ->
35+
[metrics | acc]
36+
end
37+
end)
3338
end
3439

35-
defp uncompress(compressed_data) do
36-
:zlib.uncompress(compressed_data)
37-
rescue
38-
error ->
39-
Logger.error("Failed to decompress metrics data: #{inspect(error)}")
40-
# Return empty string to not impact the aggregated metrics
41-
""
40+
def get_metrics() do
41+
bump_max_heap_size()
42+
PromEx.get_metrics()
43+
end
44+
45+
defp bump_max_heap_size() do
46+
system_max_heap_size = :erlang.system_info(:max_heap_size)[:size]
47+
48+
# it's 0 when there is no limit
49+
if is_integer(system_max_heap_size) and system_max_heap_size > 0 do
50+
Process.flag(:max_heap_size, system_max_heap_size * 3)
51+
end
4252
end
4353
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.67.6",
7+
version: "2.67.7",
88
elixir: "~> 1.18",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/rt_channel_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2163,7 +2163,7 @@ defmodule Realtime.Integration.RtChannelTest do
21632163
# Postgres Change events
21642164
for _ <- 1..5, do: Postgrex.query!(conn, "insert into test (details) values ('test')", [])
21652165

2166-
for _ <- 1..5 do
2166+
for _ <- 1..10 do
21672167
assert_receive %Message{
21682168
topic: ^topic,
21692169
event: "postgres_changes",

test/realtime/monitoring/prom_ex/plugins/tenant_test.exs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,20 +105,12 @@ defmodule Realtime.PromEx.Plugins.TenantTest do
105105

106106
_ = Rpc.call(node, FakeUserCounter, :fake_add, [external_id])
107107

108-
# fake empty tenant_id
109-
empty_tenant = tenant_fixture()
110-
empty_tenant_id = empty_tenant.external_id
111-
:syn.register(Realtime.Tenants.Connect, empty_tenant_id, self(), %{conn: nil})
112-
113108
Process.sleep(500)
114109
Tenant.execute_tenant_metrics()
115110

116111
assert_receive {[:realtime, :connections], %{connected: 1, limit: 200, connected_cluster: 2},
117112
%{tenant: ^external_id}}
118113

119-
assert_receive {[:realtime, :connections], %{connected: 0, limit: 200, connected_cluster: 0},
120-
%{tenant: ^empty_tenant_id}}
121-
122114
refute_receive {[:realtime, :connections], %{connected: 1, limit: 200, connected_cluster: 2},
123115
%{tenant: ^bad_tenant_id}}
124116
end

test/realtime/monitoring/prom_ex_test.exs

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,4 @@ defmodule Realtime.PromExTest do
2020
)
2121
end
2222
end
23-
24-
describe "get_compressed_metrics/0" do
25-
test "builds metrics compressed using zlib" do
26-
compressed_metrics = PromEx.get_compressed_metrics()
27-
28-
metrics = :zlib.uncompress(compressed_metrics)
29-
30-
assert String.contains?(
31-
metrics,
32-
"# HELP beam_system_schedulers_online_info The number of scheduler threads that are online."
33-
)
34-
35-
assert String.contains?(metrics, "# TYPE beam_system_schedulers_online_info gauge")
36-
37-
assert String.contains?(
38-
metrics,
39-
"beam_system_schedulers_online_info{host=\"nohost\",id=\"nohost\",region=\"us-east-1\"}"
40-
)
41-
end
42-
end
4323
end

0 commit comments

Comments
 (0)