Skip to content

Commit 14f035f

Browse files
committed
fix: add timeout for database query in workflow estimation to prevent blocking
Add timeout control for GetWorkflowForEstimator database query to prevent workflow execution from being blocked when database is slow or locked. - Add default 5 second timeout for database queries - Configurable via WORKFLOW_ESTIMATION_DB_QUERY_TIMEOUT environment variable - Return default estimator on timeout/error to ensure workflow continues - Add warning logs for timeout and error cases - Add documentation for new environment variable Signed-off-by: shuangkun <[email protected]>
1 parent 1d00f49 commit 14f035f

File tree

2 files changed

+34
-2
lines changed

2 files changed

+34
-2
lines changed

docs/environment-variables.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ This document outlines environment variables that can be used to customize behav
5858
| `SEMAPHORE_NOTIFY_DELAY` | `time.Duration` | `1s` | Tuning Delay when notifying semaphore waiters about availability in the semaphore |
5959
| `WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS` | `bool` | `true` | Whether to watch the Controller's ConfigMap and semaphore ConfigMaps for run-time changes. When disabled, the Controller will only read these ConfigMaps once and will have to be manually restarted to pick up new changes. |
6060
| `SKIP_WORKFLOW_DURATION_ESTIMATION` | `bool` | `false` | Whether to lookup resource usage from prior workflows to estimate usage for new workflows. |
61+
| `WORKFLOW_ESTIMATION_DB_QUERY_TIMEOUT` | `time.Duration` | `5s` | Timeout for database queries when estimating workflow duration. Prevents workflow execution from being blocked when database is slow or locked. |
6162

6263
CLI parameters of the Controller can be specified as environment variables with the `ARGO_` prefix.
6364
For example:

workflow/controller/estimation/estimator_factory.go

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package estimation
33
import (
44
"context"
55
"fmt"
6+
"time"
67

78
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
89
"k8s.io/apimachinery/pkg/labels"
@@ -11,6 +12,7 @@ import (
1112
"github.com/argoproj/argo-workflows/v3/persist/sqldb"
1213
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
1314
"github.com/argoproj/argo-workflows/v3/util/env"
15+
"github.com/argoproj/argo-workflows/v3/util/logging"
1416
"github.com/argoproj/argo-workflows/v3/workflow/common"
1517
"github.com/argoproj/argo-workflows/v3/workflow/controller/indexes"
1618
"github.com/argoproj/argo-workflows/v3/workflow/hydrator"
@@ -32,6 +34,9 @@ var _ EstimatorFactory = &estimatorFactory{}
3234

3335
var (
3436
skipWorkflowDurationEstimation = env.LookupEnvStringOr("SKIP_WORKFLOW_DURATION_ESTIMATION", "false")
37+
// Default timeout for database queries to prevent blocking workflow execution
38+
// Can be overridden by WORKFLOW_ESTIMATION_DB_QUERY_TIMEOUT environment variable
39+
defaultEstimationDBQueryTimeout = 5 * time.Second
3540
)
3641

3742
func NewEstimatorFactory(ctx context.Context, wfInformer cache.SharedIndexInformer, hydrator hydrator.Interface, wfArchive sqldb.WorkflowArchive) EstimatorFactory {
@@ -84,9 +89,35 @@ func (f *estimatorFactory) NewEstimator(ctx context.Context, wf *wfv1.Workflow)
8489
if err != nil {
8590
return defaultEstimator, fmt.Errorf("failed to parse selector to requirements: %v", err)
8691
}
87-
baselineWF, err := f.wfArchive.GetWorkflowForEstimator(ctx, wf.Namespace, requirements)
92+
// Add timeout to database query to prevent blocking workflow execution
93+
// if database is slow or locked
94+
queryTimeout := defaultEstimationDBQueryTimeout
95+
if ctx != nil {
96+
if timeout := env.LookupEnvDurationOr(ctx, "WORKFLOW_ESTIMATION_DB_QUERY_TIMEOUT", defaultEstimationDBQueryTimeout); timeout > 0 {
97+
queryTimeout = timeout
98+
}
99+
}
100+
var baseCtx context.Context
101+
if ctx != nil {
102+
baseCtx = ctx
103+
} else {
104+
baseCtx = context.Background()
105+
}
106+
queryCtx, cancel := context.WithTimeout(baseCtx, queryTimeout)
107+
defer cancel()
108+
109+
baselineWF, err := f.wfArchive.GetWorkflowForEstimator(queryCtx, wf.Namespace, requirements)
88110
if err != nil {
89-
return defaultEstimator, fmt.Errorf("failed to get archived workflow for estimator: %v", err)
111+
// Log the error but return default estimator to not block workflow execution
112+
if ctx != nil {
113+
logger := logging.RequireLoggerFromContext(ctx)
114+
if queryCtx.Err() == context.DeadlineExceeded {
115+
logger.WithError(err).WithField("timeout", queryTimeout).Warn(ctx, "database query for workflow estimation timed out, using default estimator")
116+
} else {
117+
logger.WithError(err).Warn(ctx, "failed to get archived workflow for estimator, using default estimator")
118+
}
119+
}
120+
return defaultEstimator, nil
90121
}
91122
return &estimator{wf, baselineWF}, nil
92123
}

0 commit comments

Comments
 (0)