一个低延迟、零拷贝、无锁的发布-订阅消息中间件。
- 低延迟: 单次读写延迟约 62ns
- 高吞吐: 约 376,545 消息/秒
- 零拷贝: 基于共享内存的高效数据传输
- 无锁设计: 使用原子操作实现线程安全
- 云原生: 支持容器内共享内存和容器间 UDP 通信
- 多语言支持: 提供 C++、Go、Java 语言绑定
- Rust 核心 (
zenith_core):- 共享内存传输 (
shm.rs) - 无锁环形队列 (
queue.rs) - UDP 传输 (
udp.rs) - 消息协议 (
protocol.rs) - 发布/订阅抽象 (
pubsub.rs)
- 共享内存传输 (
每条消息包含以下字段:
- Magic Number: 0x5A454E49 (ZENI)
- 版本号
- 时间戳(微秒)
- 序列号(用于检测消息丢失)
- 负载长度
- Topic Hash
- C++: 基于 FFI 的高性能绑定
- Go: CGo 绑定
- Java: JNI 绑定
- Rust 1.70+
- CMake 3.10+ (C++ 绑定)
- Go 1.20+ (Go 绑定)
- JDK 11+ (Java 绑定)
cd zenith_core
cargo build --releasecargo testcargo bench- 读写延迟: ~62.1 ns (256 字节消息)
- 吞吐量: ~376,545 消息/秒 (64 字节消息)
use zenith_core::pubsub::{Publisher, Subscriber};
use zenith_core::queue::SharedQueue;
use zenith_core::shm::ShmTransport;
// 创建发布者
let shm = ShmTransport::new("test.bin", 4096)?;
let queue = SharedQueue::new(shm)?;
let mut publisher = Publisher::new(queue);
// 发布消息
publisher.publish(b"Hello, Zenith!")?;
// 创建订阅者
let shm2 = ShmTransport::new("test.bin", 4096)?;
let queue2 = SharedQueue::new(shm2)?;
let mut subscriber = Subscriber::new(queue2);
// 接收消息
let (header, payload) = subscriber.receive()?;
println!("Received: {:?}", payload);#include "zenith_mq.h"
zenith::ZenithMQ mq_pub("test.bin", 1024 * 1024);
zenith::Publisher pub = mq_pub.createPublisher();
pub.publish("Hello from C++");
zenith::ZenithMQ mq_sub("test.bin", 1024 * 1024);
zenith::Subscriber sub = mq_sub.createSubscriber();
zenith::Message msg;
if (sub.receive(msg)) {
std::cout << "Received: " << std::string(msg.payload.begin(), msg.payload.end()) << std::endl;
}import "zenith_mq/zenith"
mqPub, _ := zenith.NewZenithMQ("test.bin", 1024*1024)
pub, _ := mqPub.CreatePublisher()
pub.Publish([]byte("Hello from Go"))
mqSub, _ := zenith.NewZenithMQ("test.bin", 1024*1024)
sub, _ := mqSub.CreateSubscriber()
msg, _ := sub.Receive()
fmt.Printf("Received: %s\n", msg.Payload)- 缓存行对齐:
RingBufferHeader使用填充避免伪共享 - 序列号: 支持消息丢失检测
- 时间戳: 每条消息包含微秒级时间戳
MIT License