Skip to content

Commit 2465198

Browse files
committed
workload-replay: Faster hydration
1 parent b372e56 commit 2465198

File tree

2 files changed

+60
-9
lines changed

2 files changed

+60
-9
lines changed

misc/python/materialize/workload_replay/executor.py

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,12 @@ def test(
110110
"queries": {"total": 0, "failed": 0, "slow": 0},
111111
"ingestions": {"total": 0, "failed": 0, "slow": 0},
112112
}
113+
original_cluster_sizes: dict[str, str] = {}
113114
if create_objects:
114115
start_time = time.time()
115-
run_create_objects_part_1(c, services, workload, verbose)
116+
original_cluster_sizes = run_create_objects_part_1(
117+
c, services, workload, verbose
118+
)
116119
if not early_initial_data:
117120
run_create_objects_part_2(c, services, workload, verbose)
118121
stats["object_creation"] = time.time() - start_time
@@ -202,12 +205,13 @@ def test(
202205
# otherwise frontiers haven't advanced yet and everything looks fresh.
203206
print("Waiting for freshness")
204207
time.sleep(10)
208+
prev_lagging: set[str] = set()
205209
while True:
206-
lagging: list[tuple[str, str]] = [
207-
(entry[0], entry[1])
210+
lagging: set[str] = {
211+
entry[0]
208212
for entry in c.sql_query(
209213
"""
210-
SELECT o.name, COALESCE(l.global_lag, INTERVAL '999 hours')::text
214+
SELECT o.name
211215
FROM mz_internal.mz_materialization_lag l
212216
JOIN mz_objects o ON o.id = l.object_id
213217
WHERE o.name NOT LIKE 'mz_%'
@@ -216,14 +220,25 @@ def test(
216220
ORDER BY l.global_lag DESC NULLS FIRST
217221
LIMIT 5;"""
218222
)
219-
]
223+
}
220224
if lagging:
221-
summary = ", ".join(f"{name} ({lag})" for name, lag in lagging)
222-
print(f" Lagging: {summary}")
225+
if lagging != prev_lagging:
226+
print(f" Lagging: {', '.join(sorted(lagging))}")
227+
prev_lagging = lagging
223228
time.sleep(5)
224229
else:
225230
break
226231
print("Freshness complete")
232+
233+
# Scale clusters back down to their original sizes.
234+
for name, original_size in original_cluster_sizes.items():
235+
print(f" Scaling down {name} back to {original_size}")
236+
c.sql(
237+
f"ALTER CLUSTER {name} SET (SIZE = '{original_size}')",
238+
user="mz_system",
239+
port=6877,
240+
)
241+
227242
if run_ingestions:
228243
print("Starting continuous ingestions")
229244
threads.extend(

misc/python/materialize/workload_replay/objects.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
from __future__ import annotations
1515

16+
import os
1617
import re
1718
import time
1819
from textwrap import dedent
@@ -30,6 +31,7 @@
3031
setup_polaris_for_iceberg,
3132
)
3233
from materialize.mzcompose.services.sql_server import SqlServer
34+
from materialize.workload_replay.config import cluster_replica_sizes
3335
from materialize.workload_replay.util import (
3436
get_kafka_topic,
3537
get_mysql_reference_db_table,
@@ -41,8 +43,12 @@
4143

4244
def run_create_objects_part_1(
4345
c: Composition, services: set[str], workload: dict[str, Any], verbose: bool
44-
) -> None:
45-
"""Create clusters, databases, schemas, types, connections, and prepare sources."""
46+
) -> dict[str, str]:
47+
"""Create clusters, databases, schemas, types, connections, and prepare sources.
48+
49+
Returns a mapping of cluster name to original SIZE, so the caller can
50+
scale clusters back down after hydration completes.
51+
"""
4652
c.sql(
4753
"DROP CLUSTER IF EXISTS quickstart CASCADE",
4854
user="mz_system",
@@ -117,12 +123,40 @@ def run_create_objects_part_1(
117123
)
118124

119125
print("Creating clusters")
126+
# Create clusters at a large size for faster hydration. The original
127+
# sizes are returned so the caller can scale back down afterwards.
128+
# Pick the largest valid scale=1 size that fits on this machine.
129+
num_cpus = os.cpu_count() or 1
130+
best_workers = max(
131+
(
132+
cfg["workers"]
133+
for cfg in cluster_replica_sizes.values()
134+
if cfg.get("scale") == 1
135+
and isinstance(cfg.get("workers"), int)
136+
and cfg["workers"] <= num_cpus
137+
),
138+
default=1,
139+
)
140+
hydration_size = f"scale=1,workers={best_workers}"
141+
original_cluster_sizes: dict[str, str] = {}
120142
for name, cluster in workload["clusters"].items():
121143
if cluster["managed"]:
122144
# Need at least one replica for everything to hydrate
123145
create_sql = cluster["create_sql"].replace(
124146
"REPLICATION FACTOR = 0", "REPLICATION FACTOR = 1"
125147
)
148+
# Swap in the hydration size, remembering the original.
149+
size_match = re.search(r"SIZE\s*=\s*'([^']+)'", create_sql, re.IGNORECASE)
150+
if size_match:
151+
original_cluster_sizes[name] = size_match.group(1)
152+
create_sql = (
153+
create_sql[: size_match.start()]
154+
+ f"SIZE = '{hydration_size}'"
155+
+ create_sql[size_match.end() :]
156+
)
157+
print(
158+
f" {name}: creating at {hydration_size} (original: {size_match.group(1)})"
159+
)
126160
c.sql(create_sql, user="mz_system", port=6877, print_statement=verbose)
127161
else:
128162
raise ValueError("Handle unmanaged clusters")
@@ -541,6 +575,8 @@ def run_create_objects_part_1(
541575
flags=re.DOTALL | re.IGNORECASE,
542576
)
543577

578+
return original_cluster_sizes
579+
544580

545581
def run_create_objects_part_2(
546582
c: Composition, services: set[str], workload: dict[str, Any], verbose: bool

0 commit comments

Comments
 (0)