Skip to content

drsh4dow/qstash-rs

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

38 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

qstash-rs

Crates.io

A hand-written Rust SDK for Upstash QStash.

Scope

Current crate focus:

  • publish and batch publish
  • messages and bulk cancellation
  • logs
  • dead letter queue (DLQ)
  • queues and enqueue
  • schedules
  • URL groups
  • signing keys
  • inbound signature verification via Receiver

Flow-control management endpoints and product-specific convenience integrations are intentionally out of scope for now.

Install

cargo add qstash-rs

Quick start

Publish JSON

use qstash_rs::{Client, Destination, PublishRequest};

#[tokio::main]
async fn main() -> Result<(), qstash_rs::Error> {
    let client = Client::new(std::env::var("QSTASH_TOKEN").expect("missing QSTASH_TOKEN"))?;

    let response = client
        .publish(
            PublishRequest::builder(Destination::url("https://example.com/tasks")?)
                .json_body(&serde_json::json!({ "hello": "world" }))?
                .retries(3)
                .label("demo")
                .build(),
        )
        .await?;

    println!("published: {:?}", response);
    Ok(())
}

Batch publish

use qstash_rs::{BatchRequest, Client, Destination, PublishRequest};

#[tokio::main]
async fn main() -> Result<(), qstash_rs::Error> {
    let client = Client::new(std::env::var("QSTASH_TOKEN").expect("missing QSTASH_TOKEN"))?;

    let batch = [
        BatchRequest::new(
            PublishRequest::builder(Destination::url("https://example.com/first")?)
                .body("first")
                .build(),
        ),
        BatchRequest::new(
            PublishRequest::builder(Destination::url_group("workers"))
                .body("second")
                .build(),
        )
        .queue_name("critical"),
    ];

    let response = client.batch(batch).await?;
    println!("batched: {:?}", response);
    Ok(())
}

Queue operations

use qstash_rs::{Client, Destination, PublishRequest, QueueUpsertRequest};

#[tokio::main]
async fn main() -> Result<(), qstash_rs::Error> {
    let client = Client::new(std::env::var("QSTASH_TOKEN").expect("missing QSTASH_TOKEN"))?;

    client
        .queues()
        .upsert(QueueUpsertRequest {
            queue_name: "emails".into(),
            parallelism: 1,
        })
        .await?;

    client
        .queues()
        .enqueue(
            "emails",
            PublishRequest::builder(Destination::url("https://example.com/email")?)
                .json_body(&serde_json::json!({ "kind": "welcome" }))?
                .build(),
        )
        .await?;

    Ok(())
}

Schedule a recurring delivery

use qstash_rs::{Client, Destination, ScheduleRequest};

#[tokio::main]
async fn main() -> Result<(), qstash_rs::Error> {
    let client = Client::new(std::env::var("QSTASH_TOKEN").expect("missing QSTASH_TOKEN"))?;

    let schedule = client
        .schedules()
        .create(
            ScheduleRequest::builder(
                Destination::url("https://example.com/report")?,
                "*/15 * * * *",
            )
            .json_body(&serde_json::json!({ "report": "daily" }))?
            .queue_name("reports")
            .build(),
        )
        .await?;

    println!("schedule: {}", schedule.schedule_id);
    Ok(())
}

Verify inbound signatures

use qstash_rs::{Receiver, ReceiverConfig, VerifyRequest};

fn verify(signature: &str, body: &[u8], url: &str) -> Result<(), qstash_rs::Error> {
    let receiver = Receiver::new(ReceiverConfig {
        current_signing_key: std::env::var("QSTASH_CURRENT_SIGNING_KEY")
            .expect("missing QSTASH_CURRENT_SIGNING_KEY"),
        next_signing_key: std::env::var("QSTASH_NEXT_SIGNING_KEY")
            .expect("missing QSTASH_NEXT_SIGNING_KEY"),
    });

    receiver.verify(VerifyRequest {
        signature,
        body,
        url: Some(url),
        clock_tolerance_seconds: 0,
    })
}

Testing

Default test suite is deterministic and mock-server based:

cargo test
cargo clippy --all-targets --all-features -- -D warnings

Optional live smoke tests:

QSTASH_TOKEN=... QSTASH_RUN_LIVE_TESTS=1 cargo test live_ -- --nocapture

Releases

No releases published

Packages

 
 
 

Languages