5353from materialize .workload_replay .util import print_workload_stats , resolve_tag
5454
5555
56+ def wait_for_freshness (c : Composition ) -> None :
57+ """Wait for all user materializations to be caught up (fresh).
58+
59+ Uses local_lag (lag relative to source) rather than global_lag (lag
60+ relative to wall-clock) because with captured/historical data the source
61+ frontiers are stuck at capture time and global_lag will never shrink.
62+ Sleeps first so the system has time to start processing data;
63+ otherwise frontiers haven't advanced yet and everything looks fresh.
64+ """
65+ print ("Waiting for freshness" )
66+ time .sleep (10 )
67+ prev_lagging : set [str ] = set ()
68+ while True :
69+ lagging : set [str ] = {
70+ entry [0 ]
71+ for entry in c .sql_query (
72+ """
73+ SELECT o.name
74+ FROM mz_internal.mz_materialization_lag l
75+ JOIN mz_objects o ON o.id = l.object_id
76+ WHERE o.name NOT LIKE 'mz_%'
77+ AND o.id NOT IN (SELECT id FROM mz_sinks)
78+ AND (l.local_lag IS NULL OR l.local_lag > INTERVAL '10 seconds')
79+ ORDER BY l.local_lag DESC NULLS FIRST
80+ LIMIT 5;"""
81+ )
82+ }
83+ if lagging :
84+ if lagging != prev_lagging :
85+ print (f" Lagging: { ', ' .join (sorted (lagging ))} " )
86+ prev_lagging = lagging
87+ time .sleep (5 )
88+ else :
89+ break
90+ print ("Freshness complete" )
91+
92+
5693def test (
5794 c : Composition ,
5895 workload : dict [str , Any ],
@@ -110,23 +147,26 @@ def test(
110147 "queries" : {"total" : 0 , "failed" : 0 , "slow" : 0 },
111148 "ingestions" : {"total" : 0 , "failed" : 0 , "slow" : 0 },
112149 }
150+ original_cluster_sizes : dict [str , str ] = {}
113151 if create_objects :
114152 start_time = time .time ()
115- run_create_objects_part_1 (c , services , workload , verbose )
153+ original_cluster_sizes = run_create_objects_part_1 (
154+ c , services , workload , verbose
155+ )
116156 if not early_initial_data :
117157 run_create_objects_part_2 (c , services , workload , verbose )
118158 stats ["object_creation" ] = time .time () - start_time
119- created_data = False
120- if initial_data :
121- print ( "Creating initial data" )
122- stats [ "initial_data" ] = { "docker" : [], "time" : 0.0 }
123- stats_thread = PropagatingThread (
124- target = docker_stats ,
125- name = "docker-stats" ,
126- args = ( stats [ "initial_data" ][ "docker" ], stop_event ),
127- )
128- stats_thread . start ()
129- try :
159+ stats [ "initial_data" ] = { "docker" : [], "time" : 0.0 }
160+ stats_thread = PropagatingThread (
161+ target = docker_stats ,
162+ name = "docker-stats" ,
163+ args = ( stats [ "initial_data" ][ "docker" ], stop_event ),
164+ )
165+ stats_thread . start ()
166+ try :
167+ created_data = False
168+ if initial_data :
169+ print ( "Creating initial data" )
130170 start_time = time .time ()
131171 created_data = create_initial_data_external (
132172 c ,
@@ -137,6 +177,7 @@ def test(
137177 if early_initial_data :
138178 obj_start = time .time ()
139179 run_create_objects_part_2 (c , services , workload , verbose )
180+ stats ["initial_data" ]["sources_created_at" ] = time .time ()
140181 stats ["object_creation" ] += time .time () - obj_start
141182 created_data_requiring_mz = create_initial_data_requiring_mz (
142183 c ,
@@ -146,84 +187,71 @@ def test(
146187 )
147188 created_data = created_data or created_data_requiring_mz
148189 stats ["initial_data" ]["time" ] = time .time () - start_time
149- if not created_data :
150- del stats ["initial_data" ]
151- finally :
152- stop_event .set ()
153- stats_thread .join ()
154- stop_event .clear ()
155- elif early_initial_data :
156- start_time = time .time ()
157- run_create_objects_part_2 (c , services , workload , verbose )
158- stats ["object_creation" ] += time .time () - start_time
190+ elif early_initial_data :
191+ start_time = time .time ()
192+ run_create_objects_part_2 (c , services , workload , verbose )
193+ stats ["object_creation" ] += time .time () - start_time
159194
160- # Wait for all user objects to hydrate before starting queries.
161- print ("Waiting for hydration" )
162- prev_not_hydrated : list [str ] = []
163- while True :
164- not_hydrated : list [str ] = [
165- entry [0 ]
166- for entry in c .sql_query (
167- """
168- SELECT DISTINCT name
169- FROM (
170- SELECT o.name
171- FROM mz_objects o
172- JOIN mz_internal.mz_hydration_statuses h
173- ON o.id = h.object_id
174- WHERE NOT h.hydrated
175- AND o.name NOT LIKE 'mz_%'
176- AND o.id NOT IN (SELECT id FROM mz_sinks)
195+ # Wait for all user objects to hydrate before starting queries.
196+ print ("Waiting for hydration" )
197+ prev_not_hydrated : list [str ] = []
198+ while True :
199+ not_hydrated : list [str ] = [
200+ entry [0 ]
201+ for entry in c .sql_query (
202+ """
203+ SELECT DISTINCT name
204+ FROM (
205+ SELECT o.name
206+ FROM mz_objects o
207+ JOIN mz_internal.mz_hydration_statuses h
208+ ON o.id = h.object_id
209+ WHERE NOT h.hydrated
210+ AND o.name NOT LIKE 'mz_%'
211+ AND o.id NOT IN (SELECT id FROM mz_sinks)
177212
178- UNION ALL
213+ UNION ALL
179214
180- SELECT o.name
181- FROM mz_objects o
182- JOIN mz_internal.mz_compute_hydration_statuses h
183- ON o.id = h.object_id
184- WHERE NOT h.hydrated
185- AND o.name NOT LIKE 'mz_%'
186- AND o.id NOT IN (SELECT id FROM mz_sinks)
187- ) x
188- ORDER BY 1;"""
189- )
190- ]
191- if not_hydrated :
192- if not_hydrated != prev_not_hydrated :
193- print (f" Not yet hydrated: { ', ' .join (not_hydrated )} " )
194- prev_not_hydrated = not_hydrated
195- time .sleep (1 )
196- else :
197- break
198- print ("Hydration complete" )
215+ SELECT o.name
216+ FROM mz_objects o
217+ JOIN mz_internal.mz_compute_hydration_statuses h
218+ ON o.id = h.object_id
219+ WHERE NOT h.hydrated
220+ AND o.name NOT LIKE 'mz_%'
221+ AND o.id NOT IN (SELECT id FROM mz_sinks)
222+ ) x
223+ ORDER BY 1;"""
224+ )
225+ ]
226+ if not_hydrated :
227+ if not_hydrated != prev_not_hydrated :
228+ print (f" Not yet hydrated: { ', ' .join (not_hydrated )} " )
229+ prev_not_hydrated = not_hydrated
230+ time .sleep (1 )
231+ else :
232+ break
233+ print ("Hydration complete" )
234+
235+ wait_for_freshness (c )
236+
237+ # Scale clusters back down to their original sizes.
238+ if original_cluster_sizes :
239+ print ("Scaling clusters to original sizes" )
240+ for name , original_size in original_cluster_sizes .items ():
241+ c .sql (
242+ f"ALTER CLUSTER \" { name } \" SET (SIZE = '{ original_size } ')" ,
243+ user = "mz_system" ,
244+ port = 6877 ,
245+ )
246+ wait_for_freshness (c )
247+ finally :
248+ stop_event .set ()
249+ stats_thread .join ()
250+ stop_event .clear ()
251+
252+ if not created_data :
253+ del stats ["initial_data" ]
199254
200- # Wait for all user materializations to be caught up (fresh).
201- # Sleep first so the system has time to start processing imported data;
202- # otherwise frontiers haven't advanced yet and everything looks fresh.
203- print ("Waiting for freshness" )
204- time .sleep (10 )
205- while True :
206- lagging : list [tuple [str , str ]] = [
207- (entry [0 ], entry [1 ])
208- for entry in c .sql_query (
209- """
210- SELECT o.name, COALESCE(l.global_lag, INTERVAL '999 hours')::text
211- FROM mz_internal.mz_materialization_lag l
212- JOIN mz_objects o ON o.id = l.object_id
213- WHERE o.name NOT LIKE 'mz_%'
214- AND o.id NOT IN (SELECT id FROM mz_sinks)
215- AND (l.global_lag IS NULL OR l.global_lag > INTERVAL '10 seconds')
216- ORDER BY l.global_lag DESC NULLS FIRST
217- LIMIT 5;"""
218- )
219- ]
220- if lagging :
221- summary = ", " .join (f"{ name } ({ lag } )" for name , lag in lagging )
222- print (f" Lagging: { summary } " )
223- time .sleep (5 )
224- else :
225- break
226- print ("Freshness complete" )
227255 if run_ingestions :
228256 print ("Starting continuous ingestions" )
229257 threads .extend (
0 commit comments