omq — ZeroMQ CLI
Command-line tool for sending and receiving ZeroMQ messages on any socket type.
Like nngcat from libnng, but with Ruby eval, Ractor parallelism, and message handlers.
Built on omq — pure Ruby ZeroMQ, no C dependencies.
Install
gem install omq-cliQuick Start
# Echo server
omq rep -b tcp://:5555 --echo
# Client
echo "hello" | omq req -c tcp://localhost:5555
# Upcase server — -e evals Ruby on each incoming message
omq rep -b tcp://:5555 -e '$F.map(&:upcase)'Usage: omq TYPE [options]
Types: req, rep, pub, sub, push, pull, pair, dealer, router
Draft: client, server, radio, dish, scatter, gather, channel, peer
Virtual: pipe (PULL → eval → PUSH)
Connection
Every socket needs at least one --bind or --connect:
omq pull --bind tcp://:5557 # listen on port 5557
omq push --connect tcp://host:5557 # connect to host
omq pull -b ipc:///tmp/feed.sock # IPC (unix socket)
omq push -c ipc://@abstract # IPC (abstract namespace, Linux)Multiple endpoints are allowed — omq pull -b tcp://:5557 -b tcp://:5558 binds both.
Pipe takes two positional endpoints (input, output) or uses --in/--out for multiple per side.
Socket types
Unidirectional (send-only / recv-only)
| Send | Recv | Pattern |
|---|---|---|
push |
pull |
Pipeline — round-robin to workers |
pub |
sub |
Publish/subscribe — fan-out with topic filtering |
scatter |
gather |
Pipeline (draft, single-frame only) |
radio |
dish |
Group messaging (draft, single-frame only) |
Send-only sockets read from stdin (or --data/--file) and send. Recv-only sockets receive and write to stdout.
echo "task" | omq push -c tcp://worker:5557
omq pull -b tcp://:5557Bidirectional (request-reply)
| Type | Behavior |
|---|---|
req |
Sends a request, waits for reply, prints reply |
rep |
Receives request, sends reply (from --echo, -e, --data, --file, or stdin) |
client |
Like req (draft, single-frame) |
server |
Like rep (draft, single-frame, routing-ID aware) |
# echo server
omq rep -b tcp://:5555 --echo
# upcase server
omq rep -b tcp://:5555 -e '$F.map(&:upcase)'
# client
echo "hello" | omq req -c tcp://localhost:5555Bidirectional (concurrent send + recv)
| Type | Behavior |
|---|---|
pair |
Exclusive 1-to-1 — concurrent send and recv tasks |
dealer |
Like pair but round-robin send to multiple peers |
channel |
Like pair (draft, single-frame) |
These spawn two concurrent tasks: a receiver (prints incoming) and a sender (reads stdin).
-e transforms incoming, -E transforms outgoing.
Routing sockets
| Type | Behavior |
|---|---|
router |
Receives with peer identity prepended; sends to peer by identity |
server |
Like router but draft, single-frame, uses routing IDs |
peer |
Like server (draft, single-frame) |
# monitor mode — just print what arrives
omq router -b tcp://:5555
# reply to specific peer
omq router -b tcp://:5555 --target worker-1 -D "reply"
# dynamic routing via send-eval (first element = identity)
omq router -b tcp://:5555 -E '["worker-1", $_.upcase]'--target and --send-eval are mutually exclusive on routing sockets.
Pipe (virtual)
Pipe creates an internal PULL → eval → PUSH pipeline:
omq pipe -c ipc://@work -c ipc://@sink -e '$F.map(&:upcase)'
# with Ractor workers for CPU parallelism
omq pipe -c ipc://@work -c ipc://@sink -P 4 -r./fib.rb -e 'fib(Integer($_)).to_s'The first endpoint is the pull-side (input), the second is the push-side (output).
Both must use -c.
Eval: -e and -E
-e (alias --recv-eval) runs a Ruby expression for each incoming message.
-E (alias --send-eval) runs a Ruby expression for each outgoing message.
Globals
| Variable | Value |
|---|---|
$F |
Message parts (Array<String>) |
$_ |
First part ($F.first) — works in inline expressions |
Return value
| Return | Effect |
|---|---|
Array |
Used as the message parts |
String |
Wrapped in [result]
|
nil |
Message is skipped (filtered) |
self (the socket) |
Signals "I already sent" (REP only) |
Control flow
# skip messages matching a pattern
omq pull -b tcp://:5557 -e 'next if /^#/; $F'
# stop on "quit"
omq pull -b tcp://:5557 -e 'break if /quit/; $F'BEGIN/END blocks
Like awk — BEGIN{} runs once before the message loop, END{} runs after:
omq pull -b tcp://:5557 -e 'BEGIN{ @sum = 0 } @sum += Integer($_); next END{ puts @sum }'Local variables won't work to share state between the blocks. Use @ivars instead.
Which sockets accept which flag
| Socket |
-E (send) |
-e (recv) |
|---|---|---|
| push, pub, scatter, radio | transforms outgoing | error |
| pull, sub, gather, dish | error | transforms incoming |
| req, client | transforms request | transforms reply |
| rep, server (reply mode) | error | transforms request → return = reply |
| pair, dealer, channel | transforms outgoing | transforms incoming |
| router, server, peer (monitor) | routes outgoing (first element = identity) | transforms incoming |
| pipe | error | transforms in pipeline |
Examples
# upcase echo server
omq rep -b tcp://:5555 -e '$F.map(&:upcase)'
# transform before sending
echo hello | omq push -c tcp://localhost:5557 -E '$F.map(&:upcase)'
# filter incoming
omq pull -b tcp://:5557 -e '$F.first.include?("error") ? $F : nil'
# REQ: different transforms per direction
echo hello | omq req -c tcp://localhost:5555 \
-E '$F.map(&:upcase)' -e '$F.map(&:reverse)'
# generate messages without stdin
omq pub -c tcp://localhost:5556 -E 'Time.now.to_s' -i 1
# use gems
omq sub -c tcp://localhost:5556 -s "" -rjson -e 'JSON.parse($F.first)["temperature"]'Script handlers (-r)
For non-trivial transforms, put the logic in a Ruby file and load it with -r:
# handler.rb
db = PG.connect("dbname=app")
OMQ.outgoing { |msg| msg.map(&:upcase) }
OMQ.incoming { |msg| db.exec(msg.first).values.flatten }
at_exit { db.close }omq req -c tcp://localhost:5555 -r./handler.rbRegistration API
| Method | Effect |
|---|---|
| `OMQ.outgoing { | msg |
| `OMQ.incoming { | msg |
- use explicit block variable (like
msg) instead of$F/$_ - Setup: use local variables and closures at the top of the script
- Teardown: use Ruby's
at_exit { ... } - CLI flags (
-e/-E) override script-registered handlers for the same direction - A script can register one direction while the CLI handles the other:
# handler.rb registers recv_eval, CLI adds send_eval
omq req -c tcp://localhost:5555 -r./handler.rb -E '$F.map(&:upcase)'Script handler examples
# count.rb — count messages, print total on exit
count = 0
OMQ.incoming { |msg| count += 1; msg }
at_exit { $stderr.puts "processed #{count} messages" }# json_transform.rb — parse JSON, extract field
require "json"
OMQ.incoming { |first_part, _| [JSON.parse(first_part)["value"]] }# rate_limit.rb — skip messages arriving too fast
last = 0
OMQ.incoming do |msg|
now = Async::Clock.now # monotonic clock
if now - last >= 0.1
last = now
msg
end
end# enrich.rb — add timestamp to outgoing messages
OMQ.outgoing { |msg| [*msg, Time.now.iso8601] }Data sources
| Flag | Behavior |
|---|---|
| (stdin) | Read lines from stdin, one message per line |
-D "text" |
Send literal string (one-shot or repeated with -i) |
-F file |
Read message from file (-F - reads stdin as blob) |
--echo |
Echo received messages back (REP only) |
-D and -F are mutually exclusive.
Formats
| Flag | Format |
|---|---|
-A / --ascii
|
Tab-separated frames, non-printable → dots (default) |
-Q / --quoted
|
C-style escapes, lossless round-trip |
--raw |
Raw ZMTP binary (pipe to hexdump -C for debugging) |
-J / --jsonl
|
JSON Lines — ["frame1","frame2"] per line |
--msgpack |
MessagePack arrays (binary stream) |
-M / --marshal
|
Ruby Marshal (binary stream of Array<String> objects) |
Multipart messages: in ASCII/quoted mode, frames are tab-separated. In JSONL mode, each message is a JSON array.
# send multipart via tabs
printf "key\tvalue" | omq push -c tcp://localhost:5557
# JSONL
echo '["key","value"]' | omq push -c tcp://localhost:5557 -J
omq pull -b tcp://:5557 -JTiming
| Flag | Effect |
|---|---|
-i SECS |
Repeat send every N seconds (wall-clock aligned) |
-n COUNT |
Max messages to send/receive (0 = unlimited) |
-d SECS |
Delay before first send |
-t SECS |
Send/receive timeout |
-l SECS |
Linger time on close (default 5s) |
--reconnect-ivl |
Reconnect interval: SECS or MIN..MAX (default 0.1) |
--heartbeat-ivl SECS |
ZMTP heartbeat interval (detects dead peers) |
# publish a tick every second, 10 times
omq pub -c tcp://localhost:5556 -D "tick" -i 1 -n 10 -d 1
# receive with 5s timeout
omq pull -b tcp://:5557 -t 5Compression
Both sides must use --compress (-z). Uses LZ4 frame format, provided
by the rlz4 gem (Ractor-safe, Rust extension via lz4_flex).
omq push -c tcp://remote:5557 -z < data.txt
omq pull -b tcp://:5557 -zKey generation
Generate a persistent CURVE keypair:
omq keygen
# OMQ_SERVER_PUBLIC='...'
# OMQ_SERVER_SECRET='...'
omq keygen --crypto nuckle # pure Ruby backend (DANGEROUS — not audited)Export the vars, then use --curve-server (server) or --curve-server-key (client).
CURVE encryption
End-to-end encryption using CurveZMQ. Requires system libsodium:
apt install libsodium-dev # Debian/Ubuntu
brew install libsodium # macOSTo use nuckle (pure Ruby, DANGEROUS — not audited) instead:
omq rep -b tcp://:5555 --echo --curve-server --crypto nuckle
# or: OMQ_CRYPTO=nuckle omq rep -b tcp://:5555 --echo --curve-server# server (prints OMQ_SERVER_KEY=...)
omq rep -b tcp://:5555 --echo --curve-server
# client (paste the key)
echo "secret" | omq req -c tcp://localhost:5555 \
--curve-server-key '<key from server>'Persistent keys via env vars: OMQ_SERVER_PUBLIC + OMQ_SERVER_SECRET (server), OMQ_SERVER_KEY (client).
Subscription and groups
# subscribe to topic prefix
omq sub -b tcp://:5556 -s "weather."
# subscribe to all (default)
omq sub -b tcp://:5556
# multiple subscriptions
omq sub -b tcp://:5556 -s "weather." -s "sports."
# RADIO/DISH groups
omq dish -b tcp://:5557 -j "weather" -j "sports"
omq radio -c tcp://localhost:5557 -g "weather" -D "72F"Identity and routing
# DEALER with identity
echo "hello" | omq dealer -c tcp://localhost:5555 --identity worker-1
# ROUTER receives identity + message as tab-separated
omq router -b tcp://:5555
# ROUTER sends to specific peer
omq router -b tcp://:5555 --target worker-1 -D "reply"
# ROUTER dynamic routing via -E (first element = routing identity)
omq router -b tcp://:5555 -E '["worker-1", $_.upcase]'
# binary routing IDs (0x prefix)
omq router -b tcp://:5555 --target 0xdeadbeef -D "reply"Pipe
Pipe creates an in-process PULL → eval → PUSH pipeline:
# basic pipe (positional: first = input, second = output)
omq pipe -c ipc://@work -c ipc://@sink -e '$F.map(&:upcase)'
# parallel Ractor workers (default: all CPUs)
omq pipe -c ipc://@work -c ipc://@sink -P -r./fib.rb -e 'fib(Integer($_)).to_s'
# fixed number of workers
omq pipe -c ipc://@work -c ipc://@sink -P 4 -e '$F.map(&:upcase)'
# exit when producer disconnects
omq pipe -c ipc://@work -c ipc://@sink --transient -e '$F.map(&:upcase)'Multi-peer pipe with --in/--out
Use --in and --out to attach multiple endpoints per side. These are modal switches — subsequent
-b/-c flags attach to the current side:
# fan-in: 2 producers → 1 consumer
omq pipe --in -c ipc://@work1 -c ipc://@work2 --out -c ipc://@sink -e '$F'
# fan-out: 1 producer → 2 consumers (round-robin)
omq pipe --in -b tcp://:5555 --out -c ipc://@sink1 -c ipc://@sink2 -e '$F'
# bind on input, connect on output
omq pipe --in -b tcp://:5555 -b tcp://:5556 --out -c tcp://sink:5557 -e '$F'
# parallel workers with fan-in (all must be -c)
omq pipe --in -c ipc://@a -c ipc://@b --out -c ipc://@sink -P 4 -e '$F'-P/--parallel requires all endpoints to be --connect. In parallel mode, each Ractor worker
gets its own PULL/PUSH pair connecting to all endpoints.
Transient mode
--transient makes the socket exit when all peers disconnect. Useful for pipeline workers and sinks:
# worker exits when producer is done
omq pipe -c ipc://@work -c ipc://@sink --transient -e '$F.map(&:upcase)'
# sink exits when all workers disconnect
omq pull -b tcp://:5557 --transientExit codes
| Code | Meaning |
|---|---|
| 0 | Success |
| 1 | Error (connection, argument, runtime) |
| 2 | Timeout |
| 3 | Eval error (-e/-E expression raised) |