Skip to content
This repository was archived by the owner on Aug 12, 2025. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ storb_gateway = { version = "*", path = "crates/storb_gateway" }
storb_node = { version = "*", path = "crates/storb_node" }
storb_protocol = { version = "*", path = "crates/storb_protocol" }
storb_storage = { version = "*", path = "crates/storb_storage" }
storb_rpc = { version = "*", path = "crates/storb_rpc" }
#
storb_client = { version = "*", path = "crates/storb_client" }
#
Expand Down
14 changes: 1 addition & 13 deletions crates/storb_protocol/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,7 @@ enum RpcError {
pub struct RPCClientOptions {
/// Disable TLS
pub insecure: bool,
pub server_addr: SocketAddr
}

/// Configuration options for the RPC server.
pub struct RPCServerOptions {
pub addr: SocketAddr
pub server_addr: SocketAddr,
}

// TODO: remove unwraps
Expand Down Expand Up @@ -62,10 +57,3 @@ pub async fn init_client_rpc(rpc_options: RpcClientOptions) -> Result<RpcSystem<
let rpc_system = RpcSystem::new(Box::new(network), None);
Ok(rpc_system)
}

// pub async fn init_server_rpc(rpc_options: RPCServerOptions) -> {
// let port = rpc_options.addr.port();
// let ip = rpc_options.addr.ip();
// let socket = UdpSocket::bind(("0.0.0.0", quic_port))?;
// let server_config = configure_server(ip).unwrap();
// }
24 changes: 24 additions & 0 deletions crates/storb_rpc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "storb_rpc"
description = "QUIC and Cap'n'Proto RPC framework "
version.workspace = true
edition.workspace = true

[dependencies]
capnp.workspace = true
capnp-rpc.workspace = true
futures.workspace = true
multiaddr.workspace = true
quinn.workspace = true
rcgen.workspace = true
rustls.workspace = true
serde.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true

[dev-dependencies]
tracing-subscriber = "0.3"
tokio-util = { version = "0.7", features = ["compat"] }
rustls = { version = "0.23", features = ["aws-lc-rs"] }
3 changes: 3 additions & 0 deletions crates/storb_rpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# `storb_rpc`

RPC over Quic Library
6 changes: 6 additions & 0 deletions crates/storb_rpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub mod macros;
pub mod server;
pub mod service;

pub use server::{AsyncRuntime, ServerBuilder, ServerOptions, TokioRuntime};
pub use service::{CapnpServiceWrapper, GeneratedCapnpService, Service, ServiceBuilder};
8 changes: 8 additions & 0 deletions crates/storb_rpc/src/macros.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
pub mod include_proto {
#[macro_export]
macro_rules! include_proto {
($schema:expr) => {
include!(concat!(env!("OUT_DIR"), "/", $schema, "_capnp.rs"))
};
}
}
133 changes: 133 additions & 0 deletions crates/storb_rpc/src/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use std::collections::HashMap;
use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;

use capnp_rpc::{twoparty, RpcSystem};
use quinn::{Endpoint, ServerConfig as QuinnServerConfig};

use crate::service::Service;

pub trait AsyncRuntime: Send + Sync + 'static + Clone {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static;
}

#[derive(Clone)]
pub struct TokioRuntime;

impl AsyncRuntime for TokioRuntime {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
tokio::spawn(future);
}
}

#[derive(Clone)]
pub struct ServerOptions {
pub addr: SocketAddr,
pub tls: Option<quinn::ServerConfig>,
}

impl Default for ServerOptions {
fn default() -> Self {
Self {
addr: "127.0.0.1:5000".parse().unwrap(),
tls: None,
}
}
}

pub struct ServerBuilder<R = TokioRuntime> {
opts: ServerOptions,
services: Vec<Arc<dyn Service>>,
runtime: R,
}

impl ServerBuilder<TokioRuntime> {
pub fn new() -> Self {
Self {
opts: ServerOptions::default(),
services: Vec::new(),
runtime: TokioRuntime,
}
}
}

impl<R> ServerBuilder<R>
where
R: AsyncRuntime + Clone,
{
pub fn bind_addr(mut self, addr: SocketAddr) -> Self {
self.opts.addr = addr;
self
}

pub fn tls_config(mut self, cfg: Option<quinn::ServerConfig>) -> Self {
self.opts.tls = cfg;
self
}

pub fn add_service<S: Service>(mut self, svc: S) -> Self {
self.services.push(Arc::new(svc));
self
}

pub fn with_runtime<NewR: AsyncRuntime>(self, runtime: NewR) -> ServerBuilder<NewR> {
ServerBuilder {
opts: self.opts,
services: self.services,
runtime,
}
}

pub async fn serve(self) -> anyhow::Result<()> {
let tls_cfg = if let Some(cfg) = self.opts.tls {
cfg
} else {
let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()])?;
let key = cert.serialize_der()?;
let cert = cert.serialize_der()?;
let mut server_cfg = quinn::ServerConfig::with_single_cert(vec![cert], key)?;
Arc::make_mut(&mut server_cfg.transport).max_concurrent_uni_streams(0_u8.into());
server_cfg
};

let (endpoint, mut incoming) = Endpoint::server(tls_cfg, self.opts.addr)?;
println!("Listening on {}", self.opts.addr);

while let Some(conn) = incoming.next().await {
let services = self.services.clone();
let runtime = self.runtime.clone();

runtime.spawn(async move {
if let Ok(new_conn) = conn.await {
println!("Connection from {}", new_conn.remote_address());

while let Ok((send, recv)) = new_conn.accept_bi().await {
let mut rpc_sys = RpcSystem::new(
Box::new(twoparty::VatNetwork::new(
recv,
send,
twoparty::Side::Server,
Default::default(),
)),
None,
);

for svc in &services {
svc.register_methods(&mut rpc_sys);
}

tokio::task::spawn_local(rpc_sys.map(|_| ()));
}
}
});
}

Ok(())
}
}
149 changes: 149 additions & 0 deletions crates/storb_rpc/src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use std::collections::HashMap;
use std::fmt;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;

use capnp_rpc::twoparty::{Side, VatNetwork};
use capnp_rpc::RpcSystem;
use quinn::{RecvStream, SendStream};

pub struct MetadataMap(pub HashMap<String, String>);

impl MetadataMap {
pub fn keys(&self) -> impl Iterator<Item = &String> {
self.0.keys()
}
}

pub struct RequestContext {
pub peer_addr: SocketAddr,
pub metadata: MetadataMap,
}

impl fmt::Debug for RequestContext {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RequestContext")
.field("peer_addr", &self.peer_addr)
.field("metadata_keys", &self.metadata.keys().collect::<Vec<_>>())
.finish()
}
}

pub trait Middleware: Send + Sync + 'static {
fn handle(
&self,
ctx: RequestContext,
next: Next,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
}

impl fmt::Debug for dyn Middleware {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Middleware")
}
}

pub struct Next<'a> {
idx: usize,
middlewares: &'a [Arc<dyn Middleware>],
service_impl: &'a dyn GeneratedCapnpService,
rpc_system: &'a mut RpcSystem<VatNetwork<RecvStream, SendStream>>,
}

impl<'a> Next<'a> {
pub async fn run(mut self, ctx: RequestContext) {
if self.idx < self.middlewares.len() {
let mw = &self.middlewares[self.idx];
self.idx += 1;
mw.handle(ctx, self).await;
} else {
self.service_impl.register_methods_impl(self.rpc_system);
}
}
}

pub trait Service: Send + Sync + 'static {
/// Register all RPC endpoints on the given system.
fn register_methods(&self, rpc_system: &mut RpcSystem<VatNetwork<RecvStream, SendStream>>);

/// A stable name for logging
fn name(&self) -> &'static str;

/// Health check for graceful shutdown and monitoring.
fn is_healthy(&self) -> bool {
true
}
}

pub struct ServiceBuilder {
interceptors: Vec<Arc<dyn Interceptor>>,
}

pub trait Interceptor: Send + Sync + 'static {
fn intercept(&self, ctx: &mut RequestContext);
}

impl ServiceBuilder {
pub fn new() -> Self {
ServiceBuilder {
interceptors: Vec::new(),
}
}

pub fn add_interceptor<I>(mut self, interceptor: I) -> Self
where
I: Interceptor,
{
self.interceptors.push(Arc::new(interceptor));
self
}

pub fn wrap<T>(self, impl_obj: T) -> CapnpServiceWrapper<T>
where
T: GeneratedCapnpService,
{
CapnpServiceWrapper {
inner: Arc::new(impl_obj),
interceptors: self.interceptors,
}
}
}

pub trait GeneratedCapnpService: Send + Sync + 'static {
fn register_methods_impl(&self, rpc_system: &mut RpcSystem<VatNetwork<RecvStream, SendStream>>);

fn service_name() -> &'static str
where
Self: Sized;

fn is_healthy(&self) -> bool {
true
}
}

pub struct CapnpServiceWrapper<T>
where
T: GeneratedCapnpService,
{
inner: Arc<T>,
interceptors: Vec<Arc<dyn Interceptor>>,
}

impl<T> Service for CapnpServiceWrapper<T>
where
T: GeneratedCapnpService,
{
fn register_methods(&self, rpc_system: &mut RpcSystem<VatNetwork<RecvStream, SendStream>>) {
self.inner.register_methods_impl(rpc_system);
}

fn name(&self) -> &'static str {
T::service_name()
}

fn is_healthy(&self) -> bool {
self.inner.is_healthy()
}
}
Loading