Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions celery_batches/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ def Strategy(self, task: "Batches", app: Celery, consumer: Consumer) -> Callable
connection_errors = consumer.connection_errors

eventer = consumer.event_dispatcher
events = eventer and eventer.enabled
send_event = eventer and eventer.send
task_sends_events = events and task.send_events

Request = symbol_by_name(task.Request)
# Celery 5.1 added the app argument to create_request_cls.
Expand Down Expand Up @@ -256,6 +259,21 @@ def task_message_handler(

signals.task_received.send(sender=consumer, request=req)

signals.task_received.send(sender=consumer, request=request)
if task_sends_events:
send_event(
"task-received",
uuid=request.id,
name=request.name,
args=request.argsrepr,
kwargs=request.kwargsrepr,
root_id=request.root_id,
parent_id=request.parent_id,
retries=request.request_dict.get("retries", 0),
eta=request.eta and request.eta.isoformat(),
expires=request.expires and request.expires.isoformat(),
)

if self._tref is None: # first request starts flush timer.
self._tref = timer.call_repeatedly(self.flush_interval, flush_buffer)

Expand Down Expand Up @@ -359,10 +377,17 @@ def flush(self, requests: Collection[Request]) -> Any:
def on_accepted(pid: int, time_accepted: float) -> None:
for req in acks_early:
req.acknowledge()
for request in requests:
request.send_event("task-started")

def on_return(result: Optional[Any]) -> None:
for req in acks_late:
req.acknowledge()
for request in requests:
runtime = 0
if isinstance(result, int):
runtime = result
request.send_event("task-succeeded", result=None, runtime=runtime)

return self._pool.apply_async(
apply_batches_task,
Expand Down
2 changes: 2 additions & 0 deletions t/integration/test_batches.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ def test_signals(celery_app: Celery, celery_worker: TestWorkController) -> None:
(signals.task_success, 1),
# Other task signals are not implemented.
(signals.task_retry, 0),
(signals.task_success, 1),
(signals.task_received, 3),
(signals.task_failure, 0),
(signals.task_revoked, 0),
(signals.task_internal_error, 0),
Expand Down
Loading