Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def run_workload_for_all_endpoints(

for other_endpoint in self.other_endpoints:
comparison_outcome = self.run_and_evaluate_workload_for_endpoint(
workload_cls, other_endpoint, baseline_result, try_count=0
workload_cls, other_endpoint, baseline_result
)

self.result.add_regression(comparison_outcome)
Expand All @@ -95,7 +95,6 @@ def run_and_evaluate_workload_for_endpoint(
workload_cls: type[Workload],
other_endpoint: Endpoint,
baseline_result: WorkloadResult | None,
try_count: int,
) -> ComparisonOutcome | None:
workload_name = workload_cls.__name__
other_endpoint_result = self.run_workload_for_endpoint(
Expand All @@ -114,17 +113,82 @@ def run_and_evaluate_workload_for_endpoint(
other_endpoint_result,
)

if outcome.has_regressions() and try_count < MAX_RETRIES_ON_REGRESSION:
# Targeted retries: re-run only the regressed concurrency levels
for retry in range(MAX_RETRIES_ON_REGRESSION):
if not outcome.has_regressions():
break

regressed_concurrencies = [r.concurrency for r in outcome.regressions]
print(
f"Potential regression in workload {workload_name} at endpoint {other_endpoint},"
f" triggering retry {try_count + 1} of {MAX_RETRIES_ON_REGRESSION}"
f"Potential regression in workload {workload_name} at concurrencies "
f"{regressed_concurrencies}, triggering retry {retry + 1} of "
f"{MAX_RETRIES_ON_REGRESSION}"
)

# Re-run only regressed concurrency levels
workload = self.create_workload_instance(
workload_cls, endpoint=other_endpoint
)
retry_result = self.run_workload_for_endpoint_with_concurrencies(
other_endpoint, workload, regressed_concurrencies
)

# Replace only the retried concurrency rows in the result
keep_totals = other_endpoint_result.df_totals.data[
~other_endpoint_result.df_totals.data[df_totals_cols.CONCURRENCY].isin(
regressed_concurrencies
)
]
keep_details = other_endpoint_result.df_details.data[
~other_endpoint_result.df_details.data[
df_details_cols.CONCURRENCY
].isin(regressed_concurrencies)
]
other_endpoint_result = WorkloadResult(
other_endpoint_result.workload,
other_endpoint_result.endpoint,
DfTotals(pd.concat([keep_totals, retry_result.df_totals.data])),
DfDetails(pd.concat([keep_details, retry_result.df_details.data])),
)
return self.run_and_evaluate_workload_for_endpoint(
workload_cls, other_endpoint, baseline_result, try_count=try_count + 1

# Re-evaluate
outcome = self.result_analyzer.perform_comparison_in_workload(
workload_name,
self.baseline_endpoint,
other_endpoint,
baseline_result,
other_endpoint_result,
)

return outcome

def run_workload_for_endpoint_with_concurrencies(
self,
endpoint: Endpoint,
workload: Workload,
concurrencies: list[int],
) -> WorkloadResult:
"""Run a workload for only the specified concurrency levels."""
print(
f"--- Re-running workload {workload.name()} on {endpoint} "
f"for concurrencies {concurrencies}"
)

df_totals = DfTotals()
df_details = DfDetails()

for concurrency in concurrencies:
df_total, df_detail = self.run_workload_for_endpoint_with_concurrency(
endpoint,
workload,
concurrency,
self.config.get_count_for_concurrency(concurrency),
)
df_totals = concat_df_totals([df_totals, df_total])
df_details = concat_df_details([df_details, df_detail])

return WorkloadResult(workload, endpoint, df_totals, df_details)

def run_workload_for_endpoint(
self,
endpoint: Endpoint,
Expand Down