A lightweight distributed task queue built for AI workloads.
Celery is great, but it wasn't designed for GPU-bound AI tasks with variable runtimes, large payloads, and priority scheduling. swarmq is.
pip install swarmq# worker.py
from swarmq import Worker, task
worker = Worker(redis_url="redis://localhost:6379")
@task(worker, queue="inference")
async def generate(prompt: str, model: str = "llama3") -> str:
# Your inference logic here
result = await run_model(model, prompt)
return result
@task(worker, queue="embeddings", concurrency=4)
async def embed(texts: list[str]) -> list[list[float]]:
return await run_embeddings(texts)
if __name__ == "__main__":
worker.run()# client.py
from swarmq import Client
client = Client(redis_url="redis://localhost:6379")
# Submit a task
job = await client.submit("inference", "generate", prompt="Hello world", model="llama3")
# Wait for result
result = await job.result(timeout=60)
print(result)
# Submit with priority (higher = sooner)
urgent_job = await client.submit("inference", "generate", prompt="Urgent!", priority=10)
# Submit a batch
jobs = await client.batch("embeddings", "embed", [
{"texts": batch1},
{"texts": batch2},
{"texts": batch3},
])
results = await jobs.results(timeout=120)# Start a worker
swarmq worker worker:worker
# Monitor queues
swarmq status
# View recent jobs
swarmq jobs --queue inference --limit 20
# Purge failed jobs
swarmq purge --queue inference --status failed- β‘ Async-first β Built on asyncio, native async task handlers
- π― Priority queues β Higher priority tasks jump the line
- π Smart retries β Exponential backoff with configurable limits
- π Live monitoring β Real-time queue stats and job tracking
- π·οΈ Tags & metadata β Organize and filter jobs
- β±οΈ TTL & timeouts β Auto-expire stale jobs and kill hung tasks
- π Exactly-once β Redis-backed job locking prevents duplicate execution
- π¦ Large payloads β Efficient serialization for numpy arrays and tensors
- ποΈ Per-task concurrency β Limit GPU-bound tasks independently
- πͺ Webhooks β HTTP callbacks on job completion/failure
βββββββββββ βββββββββββββββ ββββββββββββ
β Client ββββββΆβ Redis βββββββ Worker 1 β (GPU)
β β β - Queues βββββββ Worker 2 β (GPU)
β β β - Results βββββββ Worker 3 β (CPU)
β β β - Locks β ββββββββββββ
βββββββββββ βββββββββββββββ
- Clients submit jobs to named queues
- Redis stores queues, results, and distributed locks
- Workers pull from queues and execute tasks
worker = Worker(
redis_url="redis://localhost:6379",
concurrency=2, # Max concurrent tasks (global)
heartbeat_interval=5, # Health check interval (seconds)
result_ttl=3600, # Keep results for 1 hour
max_retries=3, # Default retry limit
retry_backoff=2.0, # Exponential backoff multiplier
log_level="INFO",
)@task(
worker,
queue="inference",
concurrency=1, # Only 1 inference at a time (GPU bound)
timeout=120, # Kill after 2 minutes
retries=2, # Retry up to 2 times
ttl=600, # Result expires after 10 min
priority_default=5, # Default priority
)
async def generate(prompt: str) -> str:
...$ swarmq status
Queue Pending Active Completed Failed Workers
inference 3 1 847 2 1
embeddings 0 4 2341 0 2
preprocessing 12 2 156 1 1
Uptime: 4h 23m | Total processed: 3,344 | Throughput: 12.8/min$ swarmq jobs --queue inference --status failed
ID Task Status Duration Error
a1b2c3d4 generate FAILED 45.2s CUDA out of memory
e5f6g7h8 generate FAILED 120.0s Timeout exceeded| Feature | swarmq | Celery | RQ | Dramatiq |
|---|---|---|---|---|
| Async native | β | β | β | β |
| Priority queues | β | β | β | β |
| Per-task concurrency | β | β | β | β |
| GPU-aware | β | β | β | β |
| Large payload support | β | |||
| Setup complexity | Low | High | Low | Medium |
| Dependencies | Redis | Redis/RabbitMQ | Redis | Redis/RabbitMQ |
- Python 3.10+
- Redis 7.0+
MIT