Skip to content

nightcityblade/swarmq

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

1 Commit
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

swarmq 🐝

A lightweight distributed task queue built for AI workloads.

Celery is great, but it wasn't designed for GPU-bound AI tasks with variable runtimes, large payloads, and priority scheduling. swarmq is.

pip install swarmq

Quick Start

Define a worker

# worker.py
from swarmq import Worker, task

worker = Worker(redis_url="redis://localhost:6379")

@task(worker, queue="inference")
async def generate(prompt: str, model: str = "llama3") -> str:
    # Your inference logic here
    result = await run_model(model, prompt)
    return result

@task(worker, queue="embeddings", concurrency=4)
async def embed(texts: list[str]) -> list[list[float]]:
    return await run_embeddings(texts)

if __name__ == "__main__":
    worker.run()

Submit tasks

# client.py
from swarmq import Client

client = Client(redis_url="redis://localhost:6379")

# Submit a task
job = await client.submit("inference", "generate", prompt="Hello world", model="llama3")

# Wait for result
result = await job.result(timeout=60)
print(result)

# Submit with priority (higher = sooner)
urgent_job = await client.submit("inference", "generate", prompt="Urgent!", priority=10)

# Submit a batch
jobs = await client.batch("embeddings", "embed", [
    {"texts": batch1},
    {"texts": batch2},
    {"texts": batch3},
])
results = await jobs.results(timeout=120)

CLI

# Start a worker
swarmq worker worker:worker

# Monitor queues
swarmq status

# View recent jobs
swarmq jobs --queue inference --limit 20

# Purge failed jobs
swarmq purge --queue inference --status failed

Features

  • ⚑ Async-first β€” Built on asyncio, native async task handlers
  • 🎯 Priority queues β€” Higher priority tasks jump the line
  • πŸ”„ Smart retries β€” Exponential backoff with configurable limits
  • πŸ“Š Live monitoring β€” Real-time queue stats and job tracking
  • 🏷️ Tags & metadata β€” Organize and filter jobs
  • ⏱️ TTL & timeouts β€” Auto-expire stale jobs and kill hung tasks
  • πŸ”’ Exactly-once β€” Redis-backed job locking prevents duplicate execution
  • πŸ“¦ Large payloads β€” Efficient serialization for numpy arrays and tensors
  • πŸŽ›οΈ Per-task concurrency β€” Limit GPU-bound tasks independently
  • πŸͺ Webhooks β€” HTTP callbacks on job completion/failure

Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Client   │────▢│ Redis       │◀────│ Worker 1 β”‚ (GPU)
β”‚          β”‚     β”‚  - Queues   │◀────│ Worker 2 β”‚ (GPU)
β”‚          β”‚     β”‚  - Results  │◀────│ Worker 3 β”‚ (CPU)
β”‚          β”‚     β”‚  - Locks    β”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
  • Clients submit jobs to named queues
  • Redis stores queues, results, and distributed locks
  • Workers pull from queues and execute tasks

Configuration

worker = Worker(
    redis_url="redis://localhost:6379",
    concurrency=2,                # Max concurrent tasks (global)
    heartbeat_interval=5,         # Health check interval (seconds)
    result_ttl=3600,              # Keep results for 1 hour
    max_retries=3,                # Default retry limit
    retry_backoff=2.0,            # Exponential backoff multiplier
    log_level="INFO",
)

Per-task config

@task(
    worker,
    queue="inference",
    concurrency=1,        # Only 1 inference at a time (GPU bound)
    timeout=120,          # Kill after 2 minutes
    retries=2,            # Retry up to 2 times
    ttl=600,              # Result expires after 10 min
    priority_default=5,   # Default priority
)
async def generate(prompt: str) -> str:
    ...

Monitoring

$ swarmq status

Queue          Pending  Active  Completed  Failed  Workers
inference           3       1        847       2        1
embeddings          0       4       2341       0        2
preprocessing      12       2        156       1        1

Uptime: 4h 23m | Total processed: 3,344 | Throughput: 12.8/min
$ swarmq jobs --queue inference --status failed

ID          Task       Status  Duration  Error
a1b2c3d4    generate   FAILED  45.2s     CUDA out of memory
e5f6g7h8    generate   FAILED  120.0s    Timeout exceeded

Comparison

Feature swarmq Celery RQ Dramatiq
Async native βœ… ❌ ❌ ❌
Priority queues βœ… βœ… ❌ βœ…
Per-task concurrency βœ… ❌ ❌ ❌
GPU-aware βœ… ❌ ❌ ❌
Large payload support βœ… ⚠️ ⚠️ ⚠️
Setup complexity Low High Low Medium
Dependencies Redis Redis/RabbitMQ Redis Redis/RabbitMQ

Requirements

  • Python 3.10+
  • Redis 7.0+

License

MIT

About

🐝 Lightweight distributed task queue built for AI workloads

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages