@@ -74,6 +74,77 @@ func Test_TryProcessSingleOrchestrationWorkItem_BasicFlow(t *testing.T) {
7474 }, 1 * time .Second , 100 * time .Millisecond )
7575
7676 worker .StopAndDrain ()
77+
78+ t .Logf ("state.NewEvents: %v" , state .NewEvents )
79+ require .Len (t , state .NewEvents , 2 )
80+ require .NotNil (t , wi .State .NewEvents [0 ].GetOrchestratorStarted ())
81+ require .NotNil (t , wi .State .NewEvents [1 ].GetExecutionStarted ())
82+ }
83+
84+ func Test_TryProcessSingleOrchestrationWorkItem_Idempotency (t * testing.T ) {
85+ workflowID := "test123"
86+ wi := & backend.OrchestrationWorkItem {
87+ InstanceID : api .InstanceID (workflowID ),
88+ NewEvents : []* protos.HistoryEvent {
89+ {
90+ EventId : - 1 ,
91+ Timestamp : timestamppb .New (time .Now ()),
92+ EventType : & protos.HistoryEvent_ExecutionStarted {
93+ ExecutionStarted : & protos.ExecutionStartedEvent {
94+ Name : "MyOrch" ,
95+ OrchestrationInstance : & protos.OrchestrationInstance {
96+ InstanceId : workflowID ,
97+ ExecutionId : wrapperspb .String (uuid .New ().String ()),
98+ },
99+ },
100+ },
101+ },
102+ },
103+ State : runtimestate .NewOrchestrationRuntimeState (workflowID , nil , []* protos.HistoryEvent {}),
104+ }
105+
106+ ctx , cancel := context .WithCancel (context .Background ())
107+ t .Cleanup (cancel )
108+
109+ completed := atomic.Bool {}
110+ be := mocks .NewBackend (t )
111+ ex := mocks .NewExecutor (t )
112+
113+ callNumber := 0
114+ ex .EXPECT ().ExecuteOrchestrator (anyContext , wi .InstanceID , wi .State .OldEvents , mock .Anything ).RunAndReturn (func (ctx context.Context , iid api.InstanceID , oldEvents []* protos.HistoryEvent , newEvents []* protos.HistoryEvent ) (* protos.OrchestratorResponse , error ) {
115+ callNumber ++
116+ logger .Debugf ("execute orchestrator called %d times" , callNumber )
117+ if callNumber == 1 {
118+ return nil , errors .New ("dummy error" )
119+ }
120+ return & protos.OrchestratorResponse {}, nil
121+ }).Times (2 )
122+
123+ be .EXPECT ().NextOrchestrationWorkItem (anyContext ).Return (wi , nil ).Once ()
124+ be .EXPECT ().AbandonOrchestrationWorkItem (anyContext , wi ).Return (nil ).Once ()
125+
126+ be .EXPECT ().NextOrchestrationWorkItem (anyContext ).Return (wi , nil ).Once ()
127+ be .EXPECT ().CompleteOrchestrationWorkItem (anyContext , wi ).RunAndReturn (func (ctx context.Context , owi * backend.OrchestrationWorkItem ) error {
128+ completed .Store (true )
129+ return nil
130+ }).Once ()
131+
132+ be .EXPECT ().NextOrchestrationWorkItem (anyContext ).Return (nil , errors .New ("" )).Once ().Run (func (mock.Arguments ) {
133+ cancel ()
134+ })
135+
136+ worker := backend .NewOrchestrationWorker (be , ex , logger , backend .WithMaxParallelism (1 ))
137+ worker .Start (ctx )
138+
139+ require .Eventually (t , completed .Load , 2 * time .Second , 10 * time .Millisecond )
140+
141+ worker .StopAndDrain ()
142+
143+ t .Logf ("state.NewEvents: %v" , wi .State .NewEvents )
144+ require .Len (t , wi .State .NewEvents , 3 )
145+ require .NotNil (t , wi .State .NewEvents [0 ].GetOrchestratorStarted ())
146+ require .NotNil (t , wi .State .NewEvents [1 ].GetExecutionStarted ())
147+ require .NotNil (t , wi .State .NewEvents [2 ].GetOrchestratorStarted ())
77148}
78149
79150func Test_TryProcessSingleOrchestrationWorkItem_ExecutionStartedAndCompleted (t * testing.T ) {
@@ -156,6 +227,12 @@ func Test_TryProcessSingleOrchestrationWorkItem_ExecutionStartedAndCompleted(t *
156227 }, 1 * time .Second , 100 * time .Millisecond )
157228
158229 worker .StopAndDrain ()
230+
231+ t .Logf ("state.NewEvents: %v" , state .NewEvents )
232+ require .Len (t , state .NewEvents , 3 )
233+ require .NotNil (t , wi .State .NewEvents [0 ].GetOrchestratorStarted ())
234+ require .NotNil (t , wi .State .NewEvents [1 ].GetExecutionStarted ())
235+ require .NotNil (t , wi .State .NewEvents [2 ].GetExecutionCompleted ())
159236}
160237
161238func Test_TaskWorker (t * testing.T ) {
0 commit comments