Push-based worker dispatch with blocking Start#74
Draft
Push-based worker dispatch with blocking Start#74
Conversation
The NewOrchestrationWorker signature was changed to take an OrchestratorOptions struct but the samples were not updated. Signed-off-by: joshvanl <[email protected]>
Introduces the event-driven loop infrastructure that will be used by the worker and executor refactors: - backend/loops: EventWorker and EventExecutor marker interfaces with concrete event types (DispatchWorkItem, Shutdown, ExecuteOrchestrator, ExecuteActivity, ConnectStream, DisconnectStream, etc.) - backend/loops/worker: Handler that processes dispatched work items inline within a loop, calling Process/Complete/Abandon on the processor. - backend/loops/executor: Handler that manages gRPC streams and dispatches orchestrator/activity work items to connected clients. - backend/local/loops: EventTask types and handler for the local tasks backend, replacing sync.Map-based pending task tracking with serialized loop processing. Branched from dapr#72 Signed-off-by: joshvanl <[email protected]>
Replaces the channel-based work item queue and sync.Map-based pending task tracking in the gRPC executor with a single-threaded event loop. All stream management, work item dispatch, and cleanup now happen serially within the loop handler, eliminating data races. - grpcExecutor: replaces workItemQueue channel and pendingOrchestrators/ pendingActivities sync.Maps with an executor event loop. Adds Start() to the Executor interface. GetWorkItems now connects a stream via the loop and blocks until disconnected. - local.TasksBackend: replaces sync.Map-based pending task tracking with a task event loop. All complete/cancel/register operations are now serialized through the loop. - sqlite/postgres: Start now runs the TasksBackend loop in a goroutine, Stop closes it. - task.taskExecutor: adds no-op Start() that blocks until context done. - Executor mock: adds Start() mock method. - gRPC tests: start the executor loop and cancel on cleanup. Branched from dapr#72 Signed-off-by: joshvanl <[email protected]>
Replaces the poll-based worker model with push-based dispatch and makes all Start methods blocking (return when context is cancelled). Worker changes: - TaskWorker interface: Start(ctx) returns error (blocking), adds Dispatch(wi, callback) for push-based dispatch, removes StopAndDrain. - TaskProcessor interface: removes NextWorkItem (no longer polls). - taskWorker uses N event loops (one per parallelism slot) with round-robin dispatch via atomic counter. - Removes processWorkItem from worker.go (moved to loops/worker handler). - activity.go/orchestration.go: removes NextWorkItem methods. TaskHub changes: - TaskHubWorker interface: removes Shutdown (callers cancel context). - taskHubWorker.Start uses RunnerManager to run backend, workers, and pollAndDispatch bridges concurrently. Blocks until context cancelled. - pollAndDispatch bridges blocking Next*WorkItem calls into fire-and- forget Dispatch calls to worker loops. Backend changes: - sqlite/postgres Start: now blocking (delegates to TasksBackend.Run). - sqlite CreateTaskHub: idempotent (returns ErrTaskHubExists if already initialized). Test changes: - All tests use context cancellation instead of StopAndDrain/Shutdown. - orchestrations_test.go: initTaskHubWorker returns CancelFunc, uses ready channel for synchronization. - worker_test.go: rewritten for push-based Dispatch API. - taskhub_test.go: simplified for blocking Start. - grpc_test.go: uses goroutines for blocking Start, context cancel for cleanup. Sample changes: - All samples updated: Init returns CancelFunc, Start runs in goroutine. Branched from dapr#73 Signed-off-by: joshvanl <[email protected]>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Replaces the poll-based worker model with push-based dispatch and makes
all Start methods blocking (return when context is cancelled).
Worker changes:
Dispatch(wi, callback) for push-based dispatch, removes StopAndDrain.
round-robin dispatch via atomic counter.
TaskHub changes:
pollAndDispatch bridges concurrently. Blocks until context cancelled.
forget Dispatch calls to worker loops.
Backend changes:
initialized).
Test changes:
ready channel for synchronization.
cleanup.
Sample changes:
Branched from #73