Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3852,6 +3852,17 @@ func addRawOutputFields(node *wfv1.NodeStatus, tmpl *wfv1.Template) *wfv1.NodeSt
if node.Outputs == nil {
node.Outputs = &wfv1.Outputs{Parameters: []wfv1.Parameter{}}
}
// If the output parameter doesn't have a default, check if there's a matching
// input parameter with the same name that has a default value. This allows
// suspend nodes to use input defaults for outputs when the node times out.
if param.ValueFrom.Default == nil {
for _, inParam := range tmpl.Inputs.Parameters {
if inParam.Name == param.Name && inParam.Default != nil {
param.ValueFrom.Default = inParam.Default
break
}
}
}
node.Outputs.Parameters = append(node.Outputs.Parameters, param)
}
}
Expand Down
105 changes: 105 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2996,6 +2996,111 @@ func TestSuspendResumeAfterTemplateNoWait(t *testing.T) {
assert.Empty(t, pods.Items)
}

var suspendWithInputDefaultsTemplate = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: suspend-with-input-defaults
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: wait
template: wait
- - name: print
template: print
arguments:
parameters:
- name: input
value: "{{steps.wait.outputs.parameters.input}}"
- name: select
value: "{{steps.wait.outputs.parameters.select}}"
- name: wait
suspend:
duration: "0s"
inputs:
parameters:
- name: input
default: "Hello World"
- name: select
default: "default"
enum:
- default
- option1
- option2
outputs:
parameters:
- name: input
valueFrom:
supplied: {}
- name: select
valueFrom:
supplied: {}
- name: print
inputs:
parameters:
- name: input
- name: select
container:
image: docker/whalesay
command: [cowsay]
args: ["{{inputs.parameters.input}} - {{inputs.parameters.select}}"]
`

// TestSuspendTimeoutWithInputDefaults tests that when a suspend node times out,
// output parameters get their default values from matching input parameters
func TestSuspendTimeoutWithInputDefaults(t *testing.T) {
cancel, controller := newController(logging.TestContext(t.Context()))
defer cancel()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")

ctx := logging.TestContext(t.Context())
wf := wfv1.MustUnmarshalWorkflow(suspendWithInputDefaultsTemplate)
wf, err := wfcset.Create(ctx, wf, metav1.CreateOptions{})
require.NoError(t, err)

// operate the workflow - it should create the suspend node and immediately timeout (duration: 0s)
woc := newWorkflowOperationCtx(ctx, wf, controller)
woc.operate(ctx)
wf, err = wfcset.Get(ctx, wf.Name, metav1.GetOptions{})
require.NoError(t, err)

// Get the suspend node and verify outputs have defaults from inputs
waitNode := wf.Status.Nodes.FindByDisplayName("wait")
require.NotNil(t, waitNode)
require.NotNil(t, waitNode.Outputs)
require.Len(t, waitNode.Outputs.Parameters, 2)

// Verify "input" output parameter got the default "Hello World" from input
var inputParam, selectParam *wfv1.Parameter
for i := range waitNode.Outputs.Parameters {
p := &waitNode.Outputs.Parameters[i]
switch p.Name {
case "input":
inputParam = p
case "select":
selectParam = p
}
}

require.NotNil(t, inputParam, "input parameter should exist in outputs")
require.NotNil(t, inputParam.Value, "input parameter should have a value")
assert.Equal(t, "Hello World", inputParam.Value.String())

// Verify "select" output parameter got the default "default" from input
require.NotNil(t, selectParam, "select parameter should exist in outputs")
require.NotNil(t, selectParam.Value, "select parameter should have a value")
assert.Equal(t, "default", selectParam.Value.String())

// Verify the workflow eventually succeeds (continues to next step)
woc = newWorkflowOperationCtx(ctx, wf, controller)
woc.operate(ctx)
pods, err := listPods(ctx, woc)
require.NoError(t, err)
assert.Len(t, pods.Items, 1)
}

var volumeWithParam = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down
Loading