A hand-written Rust SDK for Upstash QStash.
Current crate focus:
- publish and batch publish
- messages and bulk cancellation
- logs
- dead letter queue (DLQ)
- queues and enqueue
- schedules
- URL groups
- signing keys
- inbound signature verification via
Receiver
Flow-control management endpoints and product-specific convenience integrations are intentionally out of scope for now.
cargo add qstash-rsuse qstash_rs::{Client, Destination, PublishRequest};
#[tokio::main]
async fn main() -> Result<(), qstash_rs::Error> {
let client = Client::new(std::env::var("QSTASH_TOKEN").expect("missing QSTASH_TOKEN"))?;
let response = client
.publish(
PublishRequest::builder(Destination::url("https://example.com/tasks")?)
.json_body(&serde_json::json!({ "hello": "world" }))?
.retries(3)
.label("demo")
.build(),
)
.await?;
println!("published: {:?}", response);
Ok(())
}use qstash_rs::{BatchRequest, Client, Destination, PublishRequest};
#[tokio::main]
async fn main() -> Result<(), qstash_rs::Error> {
let client = Client::new(std::env::var("QSTASH_TOKEN").expect("missing QSTASH_TOKEN"))?;
let batch = [
BatchRequest::new(
PublishRequest::builder(Destination::url("https://example.com/first")?)
.body("first")
.build(),
),
BatchRequest::new(
PublishRequest::builder(Destination::url_group("workers"))
.body("second")
.build(),
)
.queue_name("critical"),
];
let response = client.batch(batch).await?;
println!("batched: {:?}", response);
Ok(())
}use qstash_rs::{Client, Destination, PublishRequest, QueueUpsertRequest};
#[tokio::main]
async fn main() -> Result<(), qstash_rs::Error> {
let client = Client::new(std::env::var("QSTASH_TOKEN").expect("missing QSTASH_TOKEN"))?;
client
.queues()
.upsert(QueueUpsertRequest {
queue_name: "emails".into(),
parallelism: 1,
})
.await?;
client
.queues()
.enqueue(
"emails",
PublishRequest::builder(Destination::url("https://example.com/email")?)
.json_body(&serde_json::json!({ "kind": "welcome" }))?
.build(),
)
.await?;
Ok(())
}use qstash_rs::{Client, Destination, ScheduleRequest};
#[tokio::main]
async fn main() -> Result<(), qstash_rs::Error> {
let client = Client::new(std::env::var("QSTASH_TOKEN").expect("missing QSTASH_TOKEN"))?;
let schedule = client
.schedules()
.create(
ScheduleRequest::builder(
Destination::url("https://example.com/report")?,
"*/15 * * * *",
)
.json_body(&serde_json::json!({ "report": "daily" }))?
.queue_name("reports")
.build(),
)
.await?;
println!("schedule: {}", schedule.schedule_id);
Ok(())
}use qstash_rs::{Receiver, ReceiverConfig, VerifyRequest};
fn verify(signature: &str, body: &[u8], url: &str) -> Result<(), qstash_rs::Error> {
let receiver = Receiver::new(ReceiverConfig {
current_signing_key: std::env::var("QSTASH_CURRENT_SIGNING_KEY")
.expect("missing QSTASH_CURRENT_SIGNING_KEY"),
next_signing_key: std::env::var("QSTASH_NEXT_SIGNING_KEY")
.expect("missing QSTASH_NEXT_SIGNING_KEY"),
});
receiver.verify(VerifyRequest {
signature,
body,
url: Some(url),
clock_tolerance_seconds: 0,
})
}Default test suite is deterministic and mock-server based:
cargo test
cargo clippy --all-targets --all-features -- -D warningsOptional live smoke tests:
QSTASH_TOKEN=... QSTASH_RUN_LIVE_TESTS=1 cargo test live_ -- --nocapture