Skip to content

Commit c6cca16

Browse files
committed
Refine and validate the example in the README
1 parent 8f90627 commit c6cca16

8 files changed

Lines changed: 113 additions & 16 deletions

File tree

README.md

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,36 +18,48 @@ pip install queueio
1818
Create your routines:
1919

2020
```python
21-
# sample.py
21+
# basic.py
22+
from time import sleep as time_sleep
23+
2224
from queueio import QueueIO
2325
from queueio import routine
2426
from queueio.gather import gather
2527
from queueio.sleep import sleep
26-
from time import sleep as time_sleep
2728

2829

2930
@routine(name="blocking", queue="queueio")
3031
def blocking():
31-
pass
32+
time_sleep(0.1) # Regular blocking call
33+
3234

3335
@routine(name="yielding", queue="queueio")
3436
async def yielding(iterations: int):
35-
pass
37+
# Do them two at a time
38+
for _ in range(iterations // 2):
39+
await gather(blocking(), blocking())
40+
await sleep(0.2) # Release processing capacity
41+
if iterations % 2 == 1:
42+
await blocking()
3643

3744

3845
if __name__ == "__main__":
39-
QueueIO().submit(yielding())
46+
q = QueueIO()
47+
try:
48+
q.submit(yielding(7))
49+
finally:
50+
q.shutdown()
51+
4052
```
4153

4254
Add the configuration to your `pyproject.toml`:
4355

4456
```toml
4557
[tool.queueio]
4658
# Configure RabbitMQ
47-
broker = 'pika://guest:guest@localhost:5672/'
48-
journal = 'pika://guest:guest@localhost:5672/'
59+
broker = "pika://guest:guest@localhost:5672/"
60+
journal = "pika://guest:guest@localhost:5672/"
4961
# Register the modules that the worker should load to find your routines
50-
register = ["sample"]
62+
register = ["basic"]
5163
```
5264

5365
The broker and journal can be configured with environment variables
@@ -61,7 +73,7 @@ QUEUEIO_JOURNAL='amqp://guest:guest@localhost:5672/'
6173
Run your script to submit the routine to run on a worker:
6274

6375
```sh
64-
python sample.py
76+
python basic.py
6577
```
6678

6779
Then run the worker to process submitted routines:
@@ -83,4 +95,4 @@ This is a new project.
8395
The design of the public API is under active development and subject to change.
8496
Release notes will provide clear upgrade instructions,
8597
but backward compatibility and deprecation warnings
86-
will not generally be implemented.
98+
will not generally be implemented.

pyproject.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,12 @@ select = ["E", "F", "UP", "B", "SIM", "I"]
3030

3131
[tool.ruff.lint.isort]
3232
force-single-line = true
33+
3334
[tool.queueio]
34-
register = ["queueio.sample"]
35+
register = [
36+
"queueio.samples.basic",
37+
"queueio.samples.expanded",
38+
]
3539
broker = "pika://localhost:5672"
3640
journal = "pika://localhost:5672"
3741

queueio/queueio_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ def test_queueio_with_valid_config(tmp_path):
192192
version = "0.1.0"
193193
194194
[tool.queueio]
195-
register = ["queueio.sample"]
195+
register = ["queueio.samples.expanded"]
196196
broker = "pika://localhost:5672"
197197
journal = "pika://localhost:5672"
198198
"""

queueio/samples/__init__.py

Whitespace-only changes.

queueio/samples/basic.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# This file should exactly mirror the example in README.md
2+
3+
from time import sleep as time_sleep
4+
5+
from queueio import QueueIO
6+
from queueio import routine
7+
from queueio.gather import gather
8+
from queueio.sleep import sleep
9+
10+
11+
@routine(name="blocking", queue="queueio")
12+
def blocking():
13+
time_sleep(0.1) # Regular blocking call
14+
15+
16+
@routine(name="yielding", queue="queueio")
17+
async def yielding(iterations: int):
18+
# Do them two at a time
19+
for _ in range(iterations // 2):
20+
await gather(blocking(), blocking())
21+
await sleep(0.2) # Release processing capacity
22+
if iterations % 2 == 1:
23+
await blocking()
24+
25+
26+
if __name__ == "__main__":
27+
q = QueueIO()
28+
try:
29+
q.submit(yielding(7))
30+
finally:
31+
q.shutdown()

queueio/samples/basic_test.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import subprocess
2+
import sys
3+
4+
import pytest
5+
6+
from queueio import QueueIO
7+
from queueio.invocation import InvocationCompleted
8+
9+
from .basic import yielding
10+
11+
12+
@pytest.mark.timeout(10)
13+
def test_integration():
14+
queueio = QueueIO()
15+
16+
try:
17+
queueio.purge(queue="queueio")
18+
events = queueio.subscribe({InvocationCompleted})
19+
invocation = yielding(7)
20+
queueio.submit(invocation)
21+
22+
proc = subprocess.Popen(
23+
[sys.executable, "-m", "queueio", "worker", "queueio=1"],
24+
stdout=subprocess.PIPE,
25+
stderr=subprocess.PIPE,
26+
)
27+
try:
28+
while event := events.get():
29+
if event.id == invocation.id:
30+
break
31+
finally:
32+
if proc.poll() is None:
33+
proc.terminate()
34+
try:
35+
proc.wait(timeout=5)
36+
except subprocess.TimeoutExpired:
37+
proc.kill()
38+
proc.wait()
39+
40+
finally:
41+
queueio.shutdown()
Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
from contextlib import suppress
22
from time import sleep as time_sleep
33

4+
from queueio import QueueIO
45
from queueio import routine
5-
6-
from .gather import gather
7-
from .sleep import sleep
6+
from queueio.gather import gather
7+
from queueio.sleep import sleep
88

99

1010
@routine(name="regular", queue="queueio")
@@ -44,3 +44,11 @@ async def irregular():
4444
print("queueio sleep ended")
4545
await gather(regular(7, 2), sleep(0.5), abstract(8, 1))
4646
return await abstract(2, 5)
47+
48+
49+
if __name__ == "__main__":
50+
q = QueueIO()
51+
try:
52+
q.submit(irregular())
53+
finally:
54+
q.shutdown()
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55

66
from queueio import QueueIO
77
from queueio.invocation import InvocationCompleted
8-
from queueio.sample import irregular
8+
9+
from .expanded import irregular
910

1011

1112
@pytest.mark.timeout(10)

0 commit comments

Comments
 (0)