Skip to content

zeromq/omq

Repository files navigation

OMQ — Where did the C dependency go!?

CI Gem Version License: ISC Ruby

gem install omq — that's it. No libzmq, no compiler, no system packages. Just Ruby.

OMQ builds ZeroMQ socket patterns on top of protocol-zmtp (a pure Ruby ZMTP 3.1 codec) using Async fibers. It speaks native ZeroMQ on the wire and interoperates with libzmq, pyzmq, CZMQ, and everything else in the ZMQ ecosystem.

980k msg/s inproc | 38k msg/s ipc | 31k msg/s tcp

10 µs inproc latency | 71 µs ipc | 82 µs tcp

Ruby 4.0 + YJIT on a Linux VM — see bench/ for full results


What is ZeroMQ?

Brokerless message-oriented middleware. No central server, no extra hop — processes talk directly to each other, cutting latency in half compared to broker-based systems. You get the patterns you'd normally build on top of RabbitMQ or Redis — pub/sub, work distribution, request/reply, fan-out — but decentralized, with no single point of failure.

Networking is hard. ZeroMQ abstracts away reconnection, queuing, load balancing, and framing so you can focus on what your system actually does. Start with threads talking over inproc://, split into processes with ipc://, scale across machines with tcp:// — same code, same API, just change the URL.

If you've ever wired up services with raw TCP, HTTP polling, or Redis pub/sub and wished it was simpler, this is what you've been looking for.

See GETTING_STARTED.md for a ~30 min walkthrough of all major patterns with working code.

Highlights

  • Zero dependencies on C — no extensions, no FFI, no libzmq. gem install just works everywhere
  • Fast — YJIT-optimized hot paths, batched sends, recv prefetching, direct-pipe inproc bypass. 980k msg/s inproc, 10 µs latency
  • omq CLIgem install omq-cli for a command-line tool with Ruby eval, Ractor parallelism, and script handlers
  • Every socket pattern — req/rep, pub/sub, push/pull, dealer/router, xpub/xsub, pair, and all draft types
  • Every transport — tcp, ipc (Unix domain sockets), inproc (in-process queues)
  • Async-native — built on fibers, non-blocking from the ground up. A shared IO thread handles sockets outside of Async — no reactor needed for simple scripts
  • Wire-compatible — interoperates with libzmq, pyzmq, CZMQ over tcp and ipc
  • Bind/connect order doesn't matter — connect before bind, bind before connect, peers come and go. ZeroMQ reconnects automatically and queued messages drain when peers arrive

For architecture internals, see DESIGN.md.

Install

No system libraries needed — just Ruby:

gem install omq
# or in Gemfile
gem 'omq'

Quick Start

Request / Reply

require 'omq'
require 'async'

Async do |task|
  rep = OMQ::REP.bind('inproc://example')
  req = OMQ::REQ.connect('inproc://example')

  task.async do
    msg = rep.receive
    rep << msg.map(&:upcase)
  end

  req << 'hello'
  p req.receive  # => ["HELLO"]
ensure
  req&.close
  rep&.close
end

Pub / Sub

Async do |task|
  pub = OMQ::PUB.bind('inproc://pubsub')
  sub = OMQ::SUB.connect('inproc://pubsub')
  sub.subscribe('')  # subscribe to all

  task.async { pub << 'news flash' }
  p sub.receive  # => ["news flash"]
ensure
  pub&.close
  sub&.close
end

Push / Pull (Pipeline)

Async do
  push = OMQ::PUSH.connect('inproc://pipeline')
  pull = OMQ::PULL.bind('inproc://pipeline')

  push << 'work item'
  p pull.receive  # => ["work item"]
ensure
  push&.close
  pull&.close
end

Without Async (IO thread)

OMQ spawns a shared omq-io thread when used outside an Async reactor — no boilerplate needed:

require 'omq'

push = OMQ::PUSH.bind('tcp://127.0.0.1:5557')
pull = OMQ::PULL.connect('tcp://127.0.0.1:5557')

push << 'hello'
p pull.receive  # => ["hello"]

push.close
pull.close

The IO thread runs all pumps, reconnection, and heartbeating in the background. When you're inside an Async block, OMQ uses the existing reactor instead.

Queue Interface

All sockets expose an Async::Queue-inspired interface:

Async::Queue OMQ Socket Notes
enqueue(item) / push(item) enqueue(msg) / push(msg) Also: send(msg), << msg
dequeue(timeout:) / pop(timeout:) dequeue(timeout:) / pop(timeout:) Defaults to socket's read_timeout
wait wait Blocks indefinitely (ignores read_timeout)
each each Yields messages; returns on close or timeout
pull = OMQ::PULL.bind('inproc://work')

# iterate messages like a queue
pull.each do |msg|
  puts msg.first
end

Socket Types

All sockets are thread-safe. Default HWM is 1000 messages per socket. Classes live under OMQ:: (alias: ØMQ).

Standard (multipart messages)

Pattern Send Receive When HWM full
REQ / REP Round-robin / route-back Fair-queue Block
PUB / SUB Fan-out to subscribers Subscription filter Drop
PUSH / PULL Round-robin to workers Fair-queue Block
DEALER / ROUTER Round-robin / identity-route Fair-queue Block
XPUB / XSUB Fan-out (subscription events) Fair-queue Drop
PAIR Exclusive 1-to-1 Exclusive 1-to-1 Block

Draft (single-frame only)

Pattern Send Receive When HWM full
CLIENT / SERVER Round-robin / routing-ID Fair-queue Block
RADIO / DISH Group fan-out Group filter Drop
SCATTER / GATHER Round-robin Fair-queue Block
PEER Routing-ID Fair-queue Block
CHANNEL Exclusive 1-to-1 Exclusive 1-to-1 Block

CLI

Install omq-cli for a command-line tool that sends, receives, pipes, and transforms ZeroMQ messages from the terminal:

gem install omq-cli

omq rep -b tcp://:5555 --echo
echo "hello" | omq req -c tcp://localhost:5555

See the omq-cli README for full documentation.

Companion Gems

  • omq-ffi — libzmq FFI backend. Same OMQ socket API, but backed by libzmq instead of the pure Ruby ZMTP stack. Useful for interop testing and when you need libzmq-specific features. Requires libzmq installed.
  • omq-ractor — bridge OMQ sockets into Ruby Ractors for true parallel processing across cores. I/O stays on the main Ractor, worker Ractors do pure computation.

Development

bundle install
bundle exec rake

License

ISC

About

Pure Ruby ZMQ with all socket types

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages