Skip to content

Push-based worker dispatch with blocking Start#74

Draft
JoshVanL wants to merge 4 commits intodapr:mainfrom
JoshVanL:joshvanl/push-dispatch-blocking-start
Draft

Push-based worker dispatch with blocking Start#74
JoshVanL wants to merge 4 commits intodapr:mainfrom
JoshVanL:joshvanl/push-dispatch-blocking-start

Conversation

@JoshVanL
Copy link

Replaces the poll-based worker model with push-based dispatch and makes
all Start methods blocking (return when context is cancelled).

Worker changes:

  • TaskWorker interface: Start(ctx) returns error (blocking), adds
    Dispatch(wi, callback) for push-based dispatch, removes StopAndDrain.
  • TaskProcessor interface: removes NextWorkItem (no longer polls).
  • taskWorker uses N event loops (one per parallelism slot) with
    round-robin dispatch via atomic counter.
  • Removes processWorkItem from worker.go (moved to loops/worker handler).
  • activity.go/orchestration.go: removes NextWorkItem methods.

TaskHub changes:

  • TaskHubWorker interface: removes Shutdown (callers cancel context).
  • taskHubWorker.Start uses RunnerManager to run backend, workers, and
    pollAndDispatch bridges concurrently. Blocks until context cancelled.
  • pollAndDispatch bridges blocking Next*WorkItem calls into fire-and-
    forget Dispatch calls to worker loops.

Backend changes:

  • sqlite/postgres Start: now blocking (delegates to TasksBackend.Run).
  • sqlite CreateTaskHub: idempotent (returns ErrTaskHubExists if already
    initialized).

Test changes:

  • All tests use context cancellation instead of StopAndDrain/Shutdown.
  • orchestrations_test.go: initTaskHubWorker returns CancelFunc, uses
    ready channel for synchronization.
  • worker_test.go: rewritten for push-based Dispatch API.
  • taskhub_test.go: simplified for blocking Start.
  • grpc_test.go: uses goroutines for blocking Start, context cancel for
    cleanup.

Sample changes:

  • All samples updated: Init returns CancelFunc, Start runs in goroutine.

Branched from #73

The NewOrchestrationWorker signature was changed to take an
OrchestratorOptions struct but the samples were not updated.

Signed-off-by: joshvanl <[email protected]>
Introduces the event-driven loop infrastructure that will be used by
the worker and executor refactors:

- backend/loops: EventWorker and EventExecutor marker interfaces with
  concrete event types (DispatchWorkItem, Shutdown, ExecuteOrchestrator,
  ExecuteActivity, ConnectStream, DisconnectStream, etc.)
- backend/loops/worker: Handler that processes dispatched work items
  inline within a loop, calling Process/Complete/Abandon on the
  processor.
- backend/loops/executor: Handler that manages gRPC streams and
  dispatches orchestrator/activity work items to connected clients.
- backend/local/loops: EventTask types and handler for the local
  tasks backend, replacing sync.Map-based pending task tracking with
  serialized loop processing.

Branched from dapr#72

Signed-off-by: joshvanl <[email protected]>
Replaces the channel-based work item queue and sync.Map-based pending
task tracking in the gRPC executor with a single-threaded event loop.
All stream management, work item dispatch, and cleanup now happen
serially within the loop handler, eliminating data races.

- grpcExecutor: replaces workItemQueue channel and pendingOrchestrators/
  pendingActivities sync.Maps with an executor event loop. Adds Start()
  to the Executor interface. GetWorkItems now connects a stream via
  the loop and blocks until disconnected.
- local.TasksBackend: replaces sync.Map-based pending task tracking with
  a task event loop. All complete/cancel/register operations are now
  serialized through the loop.
- sqlite/postgres: Start now runs the TasksBackend loop in a goroutine,
  Stop closes it.
- task.taskExecutor: adds no-op Start() that blocks until context done.
- Executor mock: adds Start() mock method.
- gRPC tests: start the executor loop and cancel on cleanup.

Branched from dapr#72

Signed-off-by: joshvanl <[email protected]>
Replaces the poll-based worker model with push-based dispatch and makes
all Start methods blocking (return when context is cancelled).

Worker changes:
- TaskWorker interface: Start(ctx) returns error (blocking), adds
  Dispatch(wi, callback) for push-based dispatch, removes StopAndDrain.
- TaskProcessor interface: removes NextWorkItem (no longer polls).
- taskWorker uses N event loops (one per parallelism slot) with
  round-robin dispatch via atomic counter.
- Removes processWorkItem from worker.go (moved to loops/worker handler).
- activity.go/orchestration.go: removes NextWorkItem methods.

TaskHub changes:
- TaskHubWorker interface: removes Shutdown (callers cancel context).
- taskHubWorker.Start uses RunnerManager to run backend, workers, and
  pollAndDispatch bridges concurrently. Blocks until context cancelled.
- pollAndDispatch bridges blocking Next*WorkItem calls into fire-and-
  forget Dispatch calls to worker loops.

Backend changes:
- sqlite/postgres Start: now blocking (delegates to TasksBackend.Run).
- sqlite CreateTaskHub: idempotent (returns ErrTaskHubExists if already
  initialized).

Test changes:
- All tests use context cancellation instead of StopAndDrain/Shutdown.
- orchestrations_test.go: initTaskHubWorker returns CancelFunc, uses
  ready channel for synchronization.
- worker_test.go: rewritten for push-based Dispatch API.
- taskhub_test.go: simplified for blocking Start.
- grpc_test.go: uses goroutines for blocking Start, context cancel for
  cleanup.

Sample changes:
- All samples updated: Init returns CancelFunc, Start runs in goroutine.

Branched from  dapr#73

Signed-off-by: joshvanl <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant