From 3481a4395afe1490faca9f3184db18fd91206ac7 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Tue, 3 Mar 2026 10:48:14 +0000 Subject: [PATCH] scalability test: Retry before marking as regression --- .../executor/benchmark_executor.py | 78 +++++++++++++++++-- 1 file changed, 71 insertions(+), 7 deletions(-) diff --git a/misc/python/materialize/scalability/executor/benchmark_executor.py b/misc/python/materialize/scalability/executor/benchmark_executor.py index 4e52020de9c58..5df725c7052f5 100644 --- a/misc/python/materialize/scalability/executor/benchmark_executor.py +++ b/misc/python/materialize/scalability/executor/benchmark_executor.py @@ -85,7 +85,7 @@ def run_workload_for_all_endpoints( for other_endpoint in self.other_endpoints: comparison_outcome = self.run_and_evaluate_workload_for_endpoint( - workload_cls, other_endpoint, baseline_result, try_count=0 + workload_cls, other_endpoint, baseline_result ) self.result.add_regression(comparison_outcome) @@ -95,7 +95,6 @@ def run_and_evaluate_workload_for_endpoint( workload_cls: type[Workload], other_endpoint: Endpoint, baseline_result: WorkloadResult | None, - try_count: int, ) -> ComparisonOutcome | None: workload_name = workload_cls.__name__ other_endpoint_result = self.run_workload_for_endpoint( @@ -114,17 +113,82 @@ def run_and_evaluate_workload_for_endpoint( other_endpoint_result, ) - if outcome.has_regressions() and try_count < MAX_RETRIES_ON_REGRESSION: + # Targeted retries: re-run only the regressed concurrency levels + for retry in range(MAX_RETRIES_ON_REGRESSION): + if not outcome.has_regressions(): + break + + regressed_concurrencies = [r.concurrency for r in outcome.regressions] print( - f"Potential regression in workload {workload_name} at endpoint {other_endpoint}," - f" triggering retry {try_count + 1} of {MAX_RETRIES_ON_REGRESSION}" + f"Potential regression in workload {workload_name} at concurrencies " + f"{regressed_concurrencies}, triggering retry {retry + 1} of " + f"{MAX_RETRIES_ON_REGRESSION}" + ) + + # Re-run only regressed concurrency levels + workload = self.create_workload_instance( + workload_cls, endpoint=other_endpoint + ) + retry_result = self.run_workload_for_endpoint_with_concurrencies( + other_endpoint, workload, regressed_concurrencies + ) + + # Replace only the retried concurrency rows in the result + keep_totals = other_endpoint_result.df_totals.data[ + ~other_endpoint_result.df_totals.data[df_totals_cols.CONCURRENCY].isin( + regressed_concurrencies + ) + ] + keep_details = other_endpoint_result.df_details.data[ + ~other_endpoint_result.df_details.data[ + df_details_cols.CONCURRENCY + ].isin(regressed_concurrencies) + ] + other_endpoint_result = WorkloadResult( + other_endpoint_result.workload, + other_endpoint_result.endpoint, + DfTotals(pd.concat([keep_totals, retry_result.df_totals.data])), + DfDetails(pd.concat([keep_details, retry_result.df_details.data])), ) - return self.run_and_evaluate_workload_for_endpoint( - workload_cls, other_endpoint, baseline_result, try_count=try_count + 1 + + # Re-evaluate + outcome = self.result_analyzer.perform_comparison_in_workload( + workload_name, + self.baseline_endpoint, + other_endpoint, + baseline_result, + other_endpoint_result, ) return outcome + def run_workload_for_endpoint_with_concurrencies( + self, + endpoint: Endpoint, + workload: Workload, + concurrencies: list[int], + ) -> WorkloadResult: + """Run a workload for only the specified concurrency levels.""" + print( + f"--- Re-running workload {workload.name()} on {endpoint} " + f"for concurrencies {concurrencies}" + ) + + df_totals = DfTotals() + df_details = DfDetails() + + for concurrency in concurrencies: + df_total, df_detail = self.run_workload_for_endpoint_with_concurrency( + endpoint, + workload, + concurrency, + self.config.get_count_for_concurrency(concurrency), + ) + df_totals = concat_df_totals([df_totals, df_total]) + df_details = concat_df_details([df_details, df_detail]) + + return WorkloadResult(workload, endpoint, df_totals, df_details) + def run_workload_for_endpoint( self, endpoint: Endpoint,