OMQ! Where did the C dependency go?!
Pure Ruby implementation of the ZMTP 3.1 wire protocol (ZeroMQ) using the Async gem. No native libraries required.
145k msg/s inproc | 40k msg/s ipc | 32k msg/s tcp
15 µs inproc latency | 62 µs ipc | 88 µs tcp
Ruby 4.0 + YJIT on a Linux VM on a 2019 MacBook Pro (Intel) — 223k msg/s with io_uring
Highlights
- Pure Ruby — no C extensions, no FFI, no libzmq/libczmq dependency
- All socket types — req/rep, pub/sub, push/pull, dealer/router, xpub/xsub, pair
- Async-native — built on Async fibers, also works with plain threads
-
Ruby-idiomatic API — messages as
Array<String>, errors as exceptions, timeouts asIO::TimeoutError - All transports — tcp, ipc, inproc
Why pure Ruby?
Modern Ruby has closed the gap:
- YJIT — JIT-compiled hot paths close the throughput gap with C extensions
-
Fiber Scheduler — non-blocking I/O without callbacks or threads (
Asyncbuilds on this) -
io-stream— buffered I/O with read-ahead, from the Async ecosystem
When CZTop was written, none of this existed. Today, a pure Ruby ZMTP implementation is fast enough for production use — and you get gem install with no compiler toolchain, no system packages, and no segfaults.
Install
No system libraries needed — just Ruby:
gem install omq
# or in Gemfile
gem 'omq'Learning ZeroMQ
New to ZeroMQ? See ZGUIDE_SUMMARY.md — a ~30 min read covering all major patterns with working OMQ code examples.
Quick Start
Request / Reply
require 'omq'
require 'async'
Async do |task|
rep = OMQ::REP.bind('inproc://example')
req = OMQ::REQ.connect('inproc://example')
task.async do
msg = rep.receive
rep << msg.map(&:upcase)
end
req << 'hello'
puts req.receive.inspect # => ["HELLO"]
ensure
req&.close
rep&.close
endPub / Sub
Async do |task|
pub = OMQ::PUB.bind('inproc://pubsub')
sub = OMQ::SUB.connect('inproc://pubsub')
sub.subscribe('') # subscribe to all
task.async { pub << 'news flash' }
puts sub.receive.inspect # => ["news flash"]
ensure
pub&.close
sub&.close
endPush / Pull (Pipeline)
Async do
push = OMQ::PUSH.connect('inproc://pipeline')
pull = OMQ::PULL.bind('inproc://pipeline')
push << 'work item'
puts pull.receive.inspect # => ["work item"]
ensure
push&.close
pull&.close
endSocket Types
| Pattern | Classes | Direction |
|---|---|---|
| Request/Reply |
REQ, REP
|
bidirectional |
| Publish/Subscribe |
PUB, SUB, XPUB, XSUB
|
unidirectional |
| Pipeline |
PUSH, PULL
|
unidirectional |
| Routing |
DEALER, ROUTER
|
bidirectional |
| Exclusive pair | PAIR |
bidirectional |
All classes live under OMQ::. For the purists, ØMQ is an alias:
req = ØMQ::REQ.new(">tcp://localhost:5555")Performance
Benchmarked with benchmark-ips on Linux x86_64 (Ruby 4.0.2 +YJIT):
Throughput (push/pull, 64 B messages)
| inproc | ipc | tcp |
|---|---|---|
| 145k/s | 40k/s | 32k/s |
Latency (req/rep roundtrip)
| inproc | ipc | tcp |
|---|---|---|
| 15 µs | 62 µs | 88 µs |
See bench/ for full results and scripts.
omqcat — CLI tool
omqcat is a command-line tool for sending and receiving messages on any OMQ socket. Like nngcat from libnng, but with Ruby superpowers.
# Echo server
omqcat rep -b tcp://:5555 --echo
# Upcase server in one line
omqcat rep -b tcp://:5555 -e '$F.map(&:upcase)'
# Client
echo "hello" | omqcat req -c tcp://localhost:5555
# => HELLO
# PUB/SUB
omqcat sub -b tcp://:5556 -s "weather." &
echo "weather.nyc 72F" | omqcat pub -c tcp://localhost:5556 -d 0.3
# Pipeline with filtering
tail -f /var/log/syslog | omqcat push -c tcp://collector:5557
omqcat pull -b tcp://:5557 -e '$F.first.include?("error") ? $F : nil'
# Multipart messages via tabs
printf "routing-key\tpayload data" | omqcat push -c tcp://localhost:5557
omqcat pull -b tcp://:5557
# => routing-key payload data
# JSONL for structured data
echo '["key","value"]' | omqcat push -c tcp://localhost:5557 -J
omqcat pull -b tcp://:5557 -J
# Zstandard compression
omqcat push -c tcp://remote:5557 -z < data.txt
omqcat pull -b tcp://:5557 -z
# CURVE encryption
omqcat rep -b tcp://:5555 -D "secret" --curve-server
# prints: OMQ_SERVER_KEY='...'
omqcat req -c tcp://localhost:5555 --curve-server-key '...'The -e flag runs Ruby inside the socket instance — the full socket API (self <<, send, subscribe, ...) is available. Use -r to require gems:
omqcat sub -c tcp://localhost:5556 -s "" -r json \
-e 'JSON.parse($F.first)["temperature"]'Formats: --ascii (default, tab-separated), --quoted, --raw, --jsonl, --msgpack. See omqcat --help for all options.
Interop with native ZMQ
OMQ speaks ZMTP 3.1 on the wire and interoperates with libzmq, CZMQ, pyzmq, etc. over tcp and ipc. The inproc:// transport is OMQ-internal (in-process Ruby queues) and is not visible to native ZMQ running in the same process — use ipc:// to talk across library boundaries.
Development
bundle install
bundle exec rake