Skip to content

Commit d7a2b25

Browse files
committed
Manage broker and journal lifecycles
Introduce a Backend ABC that can make a shared connection for broker and journal.
1 parent 494e48e commit d7a2b25

29 files changed

+338
-572
lines changed

CHANGELOG.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,13 @@ The format is loosely based on [Keep a Changelog](https://keepachangelog.com).
88
[Unreleased]
99
------------
1010

11+
[0.7.0] - 2026-03-09
12+
--------------------
13+
1114
### Breaking Changes
1215

16+
- The `pika` configuration key in `[tool.queueio]` has been renamed to `broker`.
17+
- The `QUEUEIO_PIKA` environment variable has been renamed to `QUEUEIO_BROKER`.
1318
- Removed `Broker.create(queue)`, in favor of `Broker.sync([queue])`.
1419
- Removed `Broker.delete(queue)`, without replacement.
1520
- Removed `QueueIO.create(queue)`, in favor of `QueueIO.sync([queue])`.
@@ -19,6 +24,8 @@ The format is loosely based on [Keep a Changelog](https://keepachangelog.com).
1924

2025
- Added `Broker.sync()` to back `queueio sync`.
2126
- Added `QueueIO.sync()` to back `queueio sync`.
27+
- `Backend` ABC with `broker()` and `journal()` methods
28+
that return independent context managers with their own lifecycles.
2229

2330
[0.6.0] - 2026-02-16
2431
--------------------
@@ -118,7 +125,8 @@ Thank you to Nick Anderegg for allowing me to use the queueio name for this proj
118125
- The queuespec syntax to `queue run` to consume multiple queues with shared capacity.
119126
- `queueio monitor` command to monitor activity in the queueio system.
120127

121-
[Unreleased]: https://github.com/ryanhiebert/queueio/compare/tag/0.6.0...HEAD
128+
[Unreleased]: https://github.com/ryanhiebert/queueio/compare/tag/0.7.0...HEAD
129+
[0.7.0]: https://github.com/ryanhiebert/queueio/compare/tag/0.6.0...tag/0.7.0
122130
[0.6.0]: https://github.com/ryanhiebert/queueio/compare/tag/0.5.0...tag/0.6.0
123131
[0.5.0]: https://github.com/ryanhiebert/queueio/compare/tag/0.4.0...tag/0.5.0
124132
[0.4.0]: https://github.com/ryanhiebert/queueio/compare/tag/0.3.0...tag/0.4.0

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,17 +46,17 @@ Add the configuration to your `pyproject.toml`:
4646

4747
```toml
4848
[tool.queueio]
49-
# Configure RabbitMQ
50-
pika = "amqp://guest:guest@localhost:5672/"
49+
# Configure the broker (currently supports amqp:// URIs)
50+
broker = "amqp://guest:guest@localhost:5672/"
5151
# Register the modules that the worker should load to find your routines
5252
register = ["basic"]
5353
```
5454

55-
The pika configuration can be overridden with an environment variable
55+
The broker configuration can be overridden with an environment variable
5656
to allow a project to be deployed in multiple environments.
5757

5858
```sh
59-
QUEUEIO_PIKA='amqp://guest:guest@localhost:5672/'
59+
QUEUEIO_BROKER='amqp://guest:guest@localhost:5672/'
6060
```
6161

6262
Sync the queues to the broker:

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "queueio"
3-
version = "0.6.0"
3+
version = "0.7.0"
44
description = "Python background queues with an async twist"
55
readme = "README.md"
66
license = "MIT"
@@ -68,7 +68,7 @@ register = [
6868
"queueio.samples.priority",
6969
"queueio.samples.queuevar",
7070
]
71-
pika = "amqp://localhost:5672"
71+
broker = "amqp://localhost:5672"
7272

7373
[tool.pytest.ini_options]
7474
timeout = 30

queueio/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22

33
from .gather import gather as gather
44
from .pause import pause as pause
5-
from .queueio import QueueIO as RealQueueIO
5+
from .queueio import QueueIO as QueueIO
66
from .queueio import priority as priority
77
from .queuevar import QueueVar as QueueVar
88
from .registry import routine as routine
99

1010

1111
@contextmanager
1212
def activate():
13-
with RealQueueIO().activate():
13+
with QueueIO.default() as queueio, queueio.activate():
1414
yield

queueio/__main__.py

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@
3131
@routine_app.command("list")
3232
def routine_list():
3333
"""Show all registered routines."""
34-
queueio = QueueIO()
35-
try:
34+
with QueueIO.default() as queueio:
3635
routines = queueio.routines()
3736

3837
if not routines:
@@ -52,8 +51,6 @@ def routine_list():
5251
print(f"{'-' * name_width}-+-{'-' * path_width}")
5352
for routine, path in zip(routines, function_paths, strict=False):
5453
print(f"{routine.name:<{name_width}} | {path:<{path_width}}")
55-
finally:
56-
queueio.shutdown()
5754

5855

5956
@app.command(rich_help_panel="Commands")
@@ -63,17 +60,16 @@ def monitor(raw: bool = False):
6360
Show a live view of queueio activity. Use --raw for detailed event output.
6461
"""
6562
if raw:
66-
queueio = QueueIO()
67-
events = queueio.subscribe({object})
68-
try:
69-
while True:
70-
print(events.get())
71-
except KeyboardInterrupt:
72-
print("Shutting down gracefully.")
73-
finally:
74-
queueio.shutdown()
63+
with QueueIO.default() as queueio:
64+
events = queueio.subscribe({object})
65+
try:
66+
while True:
67+
print(events.get())
68+
except KeyboardInterrupt:
69+
print("Shutting down gracefully.")
7570
else:
76-
Monitor().run()
71+
with QueueIO.default() as queueio:
72+
Monitor(queueio).run()
7773

7874

7975
@app.command(rich_help_panel="Commands")
@@ -93,8 +89,8 @@ def run(
9389
The worker will process invocations from the specified queue,
9490
as many at a time as specified by the concurrency.
9591
"""
96-
queueio = QueueIO()
97-
Worker(queueio, queuespec)()
92+
with QueueIO.default() as queueio:
93+
Worker(queueio, queuespec)()
9894

9995

10096
@app.command(rich_help_panel="Commands")
@@ -108,8 +104,7 @@ def sync(
108104
] = False,
109105
):
110106
"""Sync resources for the broker and journal."""
111-
queueio = QueueIO()
112-
try:
107+
with QueueIO.default() as queueio:
113108
routines = queueio.routines()
114109

115110
if not routines:
@@ -133,8 +128,6 @@ def sync(
133128
raise typer.Exit(1) from None
134129

135130
print(f"Successfully synced {len(queues)} queue(s)")
136-
finally:
137-
queueio.shutdown()
138131

139132

140133
@queue_app.command("purge")
@@ -153,8 +146,7 @@ def queue_purge(
153146
This will remove all pending messages from the given queues.
154147
Use with caution as this operation cannot be undone.
155148
"""
156-
queueio = QueueIO()
157-
try:
149+
with QueueIO.default() as queueio:
158150
queue_list = [q.strip() for q in queues.split(",") if q.strip()]
159151
if not queue_list:
160152
print("Error: No valid queue names provided")
@@ -165,8 +157,6 @@ def queue_purge(
165157
queueio.purge(queue=queue)
166158

167159
print(f"Successfully purged {len(queue_list)} queue(s)")
168-
finally:
169-
queueio.shutdown()
170160

171161

172162
if __name__ == "__main__":

queueio/backend.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from abc import ABC
2+
from abc import abstractmethod
3+
from contextlib import AbstractContextManager
4+
5+
from .broker import Broker
6+
from .journal import Journal
7+
8+
9+
class Backend(ABC):
10+
"""A backend provides context-managed access to a broker and journal."""
11+
12+
@abstractmethod
13+
def broker(self) -> AbstractContextManager[Broker]:
14+
raise NotImplementedError("Subclasses must implement this method.")
15+
16+
@abstractmethod
17+
def journal(self) -> AbstractContextManager[Journal]:
18+
raise NotImplementedError("Subclasses must implement this method.")

queueio/broker.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,6 @@ class Broker(ABC):
1414
at-least-once delivery.
1515
"""
1616

17-
@classmethod
18-
@abstractmethod
19-
def from_uri(cls, uri: str, /):
20-
"""Create a broker instance from a URI."""
21-
raise NotImplementedError("Subclasses must implement this method.")
22-
2317
@abstractmethod
2418
def sync(self, queues: Iterable[str], *, recreate: bool = False):
2519
"""Ensure the given queues are ready to use.

queueio/journal.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,6 @@
44

55

66
class Journal(ABC):
7-
@classmethod
8-
@abstractmethod
9-
def from_uri(cls, uri: str, /):
10-
"""Create a journal instance from a URI."""
11-
raise NotImplementedError("Subclasses must implement this method.")
12-
137
@abstractmethod
148
def subscribe(self) -> Iterator[bytes]:
159
raise NotImplementedError("Subclasses must implement this method.")

queueio/monitor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ class Monitor(App):
1717

1818
TITLE = "queueio Monitor"
1919

20-
def __init__(self):
20+
def __init__(self, queueio: QueueIO):
2121
super().__init__()
22-
self.__queueio = QueueIO()
22+
self.__queueio = queueio
2323
self.__thread = Thread(target=self.__listen)
2424
self.__events = self.__queueio.subscribe(
2525
{

queueio/pika/__init__.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from collections.abc import Generator
2+
from contextlib import AbstractContextManager
3+
from contextlib import contextmanager
4+
5+
from pika import URLParameters
6+
7+
from queueio.backend import Backend
8+
9+
from .broker import PikaBroker
10+
from .journal import PikaJournal
11+
from .threadsafe import ThreadsafeConnection
12+
13+
14+
class PikaBackend(Backend):
15+
def __init__(self, connection: ThreadsafeConnection):
16+
self.__connection = connection
17+
18+
@classmethod
19+
@contextmanager
20+
def connect(cls, uri: str, /) -> Generator[PikaBackend]:
21+
connection = ThreadsafeConnection(URLParameters(uri))
22+
try:
23+
yield cls(connection)
24+
finally:
25+
connection.close()
26+
27+
def broker(self) -> AbstractContextManager[PikaBroker]:
28+
return PikaBroker.connect(self.__connection)
29+
30+
def journal(self) -> AbstractContextManager[PikaJournal]:
31+
return PikaJournal.connect(self.__connection)

0 commit comments

Comments
 (0)