Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ Changelog
next
====

Bugfixes
--------

* Fix installing resetting the count of pending tasks when flushing tasks due to
the flush interval being reached. (`#95 <https://github.com/clokep/celery-batches/pull/95>`_)

Improvements
------------

Expand Down
1 change: 1 addition & 0 deletions celery_batches/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ def _do_flush(self) -> None:
if len(ready_requests) > 0:
logger.debug("Batches: Ready buffer complete: %s", len(ready_requests))
self.flush(ready_requests)
self._count = count(self._pending.qsize() + 1)

if not ready_requests and self._pending.qsize() == 0:
logger.debug("Batches: Canceling timer: Nothing in buffers.")
Expand Down
28 changes: 28 additions & 0 deletions t/integration/test_batches.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,34 @@ def test_flush_interval(celery_app: Celery, celery_worker: TestWorkController) -
assert result.get() == 1


def test_flush_interval_resets_counter(
celery_app: Celery, celery_worker: TestWorkController
) -> None:
"""Flush counter is reset after flush is triggered by interval."""

if not celery_app.conf.broker_url.startswith("memory"):
raise pytest.skip("Flaky on live brokers")

result_1 = add.delay(1)

# The flush interval is 0.1 second, this is longer.
sleep(2)

# Let the worker work.
_wait_for_ping()

assert result_1.get() == 1

# Run next task, it should not execute as counter was reset
result_2 = add.delay(2)

# The flush interval is 0.1 second, this is shorter.
sleep(0.01)
_wait_for_ping()

assert result_2.state == states.PENDING


def test_flush_calls(celery_worker: TestWorkController) -> None:
"""The batch task runs after two calls."""
result_1 = add.delay(1)
Expand Down
Loading