Skip to content

naonaoyh/zenith_core

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 

Repository files navigation

Zenith_MQ

一个低延迟、零拷贝、无锁的发布-订阅消息中间件。

特性

  • 低延迟: 单次读写延迟约 62ns
  • 高吞吐: 约 376,545 消息/秒
  • 零拷贝: 基于共享内存的高效数据传输
  • 无锁设计: 使用原子操作实现线程安全
  • 云原生: 支持容器内共享内存和容器间 UDP 通信
  • 多语言支持: 提供 C++、Go、Java 语言绑定

架构

核心组件

  • Rust 核心 (zenith_core):
    • 共享内存传输 (shm.rs)
    • 无锁环形队列 (queue.rs)
    • UDP 传输 (udp.rs)
    • 消息协议 (protocol.rs)
    • 发布/订阅抽象 (pubsub.rs)

消息协议

每条消息包含以下字段:

  • Magic Number: 0x5A454E49 (ZENI)
  • 版本号
  • 时间戳(微秒)
  • 序列号(用于检测消息丢失)
  • 负载长度
  • Topic Hash

语言绑定

  • C++: 基于 FFI 的高性能绑定
  • Go: CGo 绑定
  • Java: JNI 绑定

构建

前置要求

  • Rust 1.70+
  • CMake 3.10+ (C++ 绑定)
  • Go 1.20+ (Go 绑定)
  • JDK 11+ (Java 绑定)

构建 Rust 核心

cd zenith_core
cargo build --release

运行测试

cargo test

运行基准测试

cargo bench

性能

  • 读写延迟: ~62.1 ns (256 字节消息)
  • 吞吐量: ~376,545 消息/秒 (64 字节消息)

使用示例

Rust

use zenith_core::pubsub::{Publisher, Subscriber};
use zenith_core::queue::SharedQueue;
use zenith_core::shm::ShmTransport;

// 创建发布者
let shm = ShmTransport::new("test.bin", 4096)?;
let queue = SharedQueue::new(shm)?;
let mut publisher = Publisher::new(queue);

// 发布消息
publisher.publish(b"Hello, Zenith!")?;

// 创建订阅者
let shm2 = ShmTransport::new("test.bin", 4096)?;
let queue2 = SharedQueue::new(shm2)?;
let mut subscriber = Subscriber::new(queue2);

// 接收消息
let (header, payload) = subscriber.receive()?;
println!("Received: {:?}", payload);

C++

#include "zenith_mq.h"

zenith::ZenithMQ mq_pub("test.bin", 1024 * 1024);
zenith::Publisher pub = mq_pub.createPublisher();

pub.publish("Hello from C++");

zenith::ZenithMQ mq_sub("test.bin", 1024 * 1024);
zenith::Subscriber sub = mq_sub.createSubscriber();

zenith::Message msg;
if (sub.receive(msg)) {
    std::cout << "Received: " << std::string(msg.payload.begin(), msg.payload.end()) << std::endl;
}

Go

import "zenith_mq/zenith"

mqPub, _ := zenith.NewZenithMQ("test.bin", 1024*1024)
pub, _ := mqPub.CreatePublisher()
pub.Publish([]byte("Hello from Go"))

mqSub, _ := zenith.NewZenithMQ("test.bin", 1024*1024)
sub, _ := mqSub.CreateSubscriber()
msg, _ := sub.Receive()
fmt.Printf("Received: %s\n", msg.Payload)

优化

  • 缓存行对齐: RingBufferHeader 使用填充避免伪共享
  • 序列号: 支持消息丢失检测
  • 时间戳: 每条消息包含微秒级时间戳

许可证

MIT License

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors