Skip to content

Commit 1920bf4

Browse files
committed
fix: speed up tenant connected metrics
1 parent 03e2f71 commit 1920bf4

File tree

5 files changed

+130
-36
lines changed

5 files changed

+130
-36
lines changed

lib/realtime/monitoring/prom_ex/plugins/tenant.ex

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,16 +93,20 @@ defmodule Realtime.PromEx.Plugins.Tenant do
9393

9494
def execute_tenant_metrics do
9595
tenants = Tenants.Connect.list_tenants()
96+
cluster_counts = UsersCounter.tenant_counts()
97+
node_counts = UsersCounter.tenant_counts(Node.self())
9698

9799
for t <- tenants do
98-
count = UsersCounter.tenant_users(Node.self(), t)
99-
cluster_count = UsersCounter.tenant_users(t)
100100
tenant = Tenants.Cache.get_tenant_by_external_id(t)
101101

102102
if tenant != nil do
103103
Telemetry.execute(
104104
[:realtime, :connections],
105-
%{connected: count, connected_cluster: cluster_count, limit: tenant.max_concurrent_users},
105+
%{
106+
connected: Map.get(node_counts, t, 0),
107+
connected_cluster: Map.get(cluster_counts, t, 0),
108+
limit: tenant.max_concurrent_users
109+
},
106110
%{tenant: t}
107111
)
108112
end

lib/realtime/user_counter.ex

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,54 @@ defmodule Realtime.UsersCounter do
2222
@spec tenant_users(atom, String.t()) :: non_neg_integer()
2323
def tenant_users(node_name, tenant_id), do: tenant_id |> scope() |> :syn.member_count(tenant_id, node_name)
2424

25+
@count_all_nodes_spec [
26+
{
27+
# Match the tuple structure, capture group_name
28+
{{:"$1", :_}, :_, :_, :_, :_},
29+
# No guards
30+
[],
31+
# Return only the group_name
32+
[:"$1"]
33+
}
34+
]
35+
36+
@doc """
37+
Returns the counts of all connected clients for all tenants for the cluster.
38+
"""
39+
@spec tenant_counts() :: %{String.t() => non_neg_integer()}
40+
def tenant_counts() do
41+
scopes()
42+
|> Stream.flat_map(fn scope ->
43+
:syn_backbone.get_table_name(:syn_pg_by_name, scope)
44+
|> :ets.select(@count_all_nodes_spec)
45+
end)
46+
|> Enum.frequencies()
47+
end
48+
49+
@doc """
50+
Returns the counts of all connected clients for all tenants for a single node.
51+
"""
52+
@spec tenant_counts(node) :: %{String.t() => non_neg_integer()}
53+
def tenant_counts(node) do
54+
count_single_node_spec = [
55+
{
56+
# Match the tuple structure with specific node, capture group_name
57+
{{:"$1", :_}, :_, :_, :_, node},
58+
# No guards
59+
[],
60+
# Return only the group_name
61+
[:"$1"]
62+
}
63+
]
64+
65+
scopes()
66+
|> Stream.flat_map(fn scope ->
67+
:syn_backbone.get_table_name(:syn_pg_by_name, scope)
68+
|> :ets.select(count_single_node_spec)
69+
end)
70+
|> Enum.frequencies()
71+
end
72+
2573
@doc """
2674
Returns the scope for a given tenant id.
2775
"""

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.67.4",
7+
version: "2.67.5",
88
elixir: "~> 1.18",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/realtime/monitoring/prom_ex/plugins/tenant_test.exs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,21 @@ defmodule Realtime.PromEx.Plugins.TenantTest do
104104
UsersCounter.add(self(), bad_tenant_id)
105105

106106
_ = Rpc.call(node, FakeUserCounter, :fake_add, [external_id])
107+
108+
# fake empty tenant_id
109+
empty_tenant = tenant_fixture()
110+
empty_tenant_id = empty_tenant.external_id
111+
:syn.register(Realtime.Tenants.Connect, empty_tenant_id, self(), %{conn: nil})
112+
107113
Process.sleep(500)
108114
Tenant.execute_tenant_metrics()
109115

110116
assert_receive {[:realtime, :connections], %{connected: 1, limit: 200, connected_cluster: 2},
111117
%{tenant: ^external_id}}
112118

119+
assert_receive {[:realtime, :connections], %{connected: 0, limit: 200, connected_cluster: 0},
120+
%{tenant: ^empty_tenant_id}}
121+
113122
refute_receive {[:realtime, :connections], %{connected: 1, limit: 200, connected_cluster: 2},
114123
%{tenant: ^bad_tenant_id}}
115124
end

test/realtime/user_counter_test.exs

Lines changed: 65 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@ defmodule Realtime.UsersCounterTest do
33
alias Realtime.UsersCounter
44
alias Realtime.Rpc
55

6+
setup_all do
7+
tenant_id = random_string()
8+
{nodes, count} = generate_load(tenant_id)
9+
%{tenant_id: tenant_id, count: count, nodes: nodes}
10+
end
11+
612
describe "add/1" do
713
test "starts counter for tenant" do
814
assert UsersCounter.add(self(), random_string()) == :ok
@@ -14,61 +20,88 @@ defmodule Realtime.UsersCounterTest do
1420
def ping(),
1521
do:
1622
spawn(fn ->
17-
Process.sleep(3000)
23+
Process.sleep(15000)
1824
:pong
1925
end)
2026
end
2127
end)
2228

2329
Code.eval_quoted(@aux_mod)
2430

31+
describe "tenant_counts/0" do
32+
test "map of tenant and number of users", %{tenant_id: tenant_id, count: expected} do
33+
assert UsersCounter.add(self(), tenant_id) == :ok
34+
Process.sleep(1000)
35+
counts = UsersCounter.tenant_counts()
36+
assert counts[tenant_id] == expected + 1
37+
38+
assert map_size(counts) >= 41
39+
end
40+
end
41+
42+
describe "tenant_counts/1" do
43+
test "map of tenant and number of users for a node only", %{tenant_id: tenant_id, nodes: nodes} do
44+
assert UsersCounter.add(self(), tenant_id) == :ok
45+
Process.sleep(1000)
46+
my_counts = UsersCounter.tenant_counts(Node.self())
47+
# Only one connection from this test process on this node
48+
assert my_counts == %{tenant_id => 1}
49+
50+
another_node_counts = UsersCounter.tenant_counts(hd(nodes))
51+
assert another_node_counts[tenant_id] == 2
52+
53+
assert map_size(another_node_counts) == 21
54+
end
55+
end
56+
2557
describe "tenant_users/1" do
26-
test "returns count of connected clients for tenant on cluster node" do
27-
tenant_id = random_string()
28-
expected = generate_load(tenant_id)
58+
test "returns count of connected clients for tenant on cluster node", %{tenant_id: tenant_id, count: expected} do
2959
Process.sleep(1000)
3060
assert UsersCounter.tenant_users(tenant_id) == expected
3161
end
3262
end
3363

3464
describe "tenant_users/2" do
35-
test "returns count of connected clients for tenant on target cluster" do
36-
tenant_id = random_string()
37-
generate_load(tenant_id)
65+
test "returns count of connected clients for tenant on target cluster", %{tenant_id: tenant_id} do
3866
{:ok, node} = Clustered.start(@aux_mod)
3967
pid = Rpc.call(node, Aux, :ping, [])
4068
UsersCounter.add(pid, tenant_id)
4169
assert UsersCounter.tenant_users(node, tenant_id) == 1
4270
end
4371
end
4472

45-
defp generate_load(tenant_id, nodes \\ 2, processes \\ 2) do
46-
for i <- 1..nodes do
47-
# Avoid port collision
48-
extra_config = [
49-
{:gen_rpc, :tcp_server_port, 15970 + i}
50-
]
51-
52-
{:ok, node} = Clustered.start(@aux_mod, extra_config: extra_config, phoenix_port: 4012 + i)
53-
54-
for _ <- 1..processes do
55-
pid = Rpc.call(node, Aux, :ping, [])
56-
57-
for _ <- 1..10 do
58-
# replicate same pid added multiple times concurrently
59-
Task.start(fn ->
60-
UsersCounter.add(pid, tenant_id)
61-
end)
62-
63-
# noisy neighbors to test handling of bigger loads on concurrent calls
64-
Task.start(fn ->
65-
pid = Rpc.call(node, Aux, :ping, [])
66-
UsersCounter.add(pid, random_string())
67-
end)
73+
defp generate_load(tenant_id, n_nodes \\ 2, processes \\ 2) do
74+
nodes =
75+
for i <- 1..n_nodes do
76+
# Avoid port collision
77+
extra_config = [
78+
{:gen_rpc, :tcp_server_port, 15970 + i}
79+
]
80+
81+
{:ok, node} = Clustered.start(@aux_mod, extra_config: extra_config, phoenix_port: 4012 + i)
82+
83+
for _ <- 1..processes do
84+
pid = Rpc.call(node, Aux, :ping, [])
85+
86+
for _ <- 1..10 do
87+
# replicate same pid added multiple times concurrently
88+
Task.start(fn ->
89+
UsersCounter.add(pid, tenant_id)
90+
Process.sleep(10000)
91+
end)
92+
93+
# noisy neighbors to test handling of bigger loads on concurrent calls
94+
Task.start(fn ->
95+
pid = Rpc.call(node, Aux, :ping, [])
96+
UsersCounter.add(pid, random_string())
97+
Process.sleep(10000)
98+
end)
99+
end
68100
end
101+
102+
node
69103
end
70-
end
71104

72-
nodes * processes
105+
{nodes, n_nodes * processes}
73106
end
74107
end

0 commit comments

Comments
 (0)