Jetstream Bridge
Production-safe realtime data bridge between systems using NATS JetStream. Includes durable consumers, backpressure, retries, DLQ, optional Inbox/Outbox, and overlap-safe stream provisioning.
β¨ Features
- π Simple Publisher and Consumer interfaces
- π‘ Outbox (reliable send) & Inbox (idempotent receive), opt-in
- 𧨠DLQ for poison messages
- βοΈ Durable
pull_subscribewith backoff &max_deliver - π― Clear source/destination subject conventions
- π§± Overlap-safe stream ensure (prevents βsubjects overlapβ BadRequest)
- π Rails generators for initializer & migrations, plus an install rake task
- β‘οΈ Eager-loaded models via Railtie (production)
- π Configurable logging with sensible defaults
π¦ Install
# Gemfile
gem "jetstream_bridge", "~> 2.10"bundle installπ§° Rails Generators & Rake Task
From your Rails app:
# Create initializer + migrations
bin/rails g jetstream_bridge:install
# Or run them separately:
bin/rails g jetstream_bridge:initializer
bin/rails g jetstream_bridge:migrations
# Rake task (does both initializer + migrations)
bin/rake jetstream_bridge:installThen:
bin/rails db:migrateThe generators create:
config/initializers/jetstream_bridge.rbdb/migrate/*_create_jetstream_outbox_events.rbdb/migrate/*_create_jetstream_inbox_events.rb
π§ Configure (Rails)
# config/initializers/jetstream_bridge.rb
JetstreamBridge.configure do |config|
# NATS connection
config.nats_urls = ENV.fetch("NATS_URLS", "nats://localhost:4222")
config.env = ENV.fetch("NATS_ENV", "development")
config.app_name = ENV.fetch("APP_NAME", "app")
config.destination_app = ENV["DESTINATION_APP"] # required
# Consumer tuning
config.max_deliver = 5
config.ack_wait = "30s"
config.backoff = %w[1s 5s 15s 30s 60s]
# Reliability features (opt-in)
config.use_outbox = true
config.use_inbox = true
config.use_dlq = true
# Models (override if you use custom AR classes/table names)
config.outbox_model = "JetstreamBridge::OutboxEvent"
config.inbox_model = "JetstreamBridge::InboxEvent"
# Logging
# config.logger = Rails.logger
endDefaults:
stream_nameβ#{env}-jetstream-bridge-streamdlq_subjectβ#{env}.data.sync.dlq
Logging
JetstreamBridge logs through config.logger when set, falling back to Rails.logger or STDOUT. Provide any Logger-compatible instance in the initializer to integrate with your application's logging setup.
π‘ Subject Conventions
| Direction | Subject Pattern |
|---|---|
| Publish | {env}.{app}.sync.{dest} |
| Subscribe | {env}.{dest}.sync.{app} |
| DLQ | {env}.sync.dlq |
-
{app}:app_name -
{dest}:destination_app -
{env}:env
π§± Stream Topology (auto-ensure and overlap-safe)
On first connection, Jetstream Bridge ensures a single stream exists for your env and that it covers:
-
source_subject({env}.{app}.sync.{dest}) -
destination_subject({env}.{dest}.sync.{app}) -
dlq_subject(if enabled)
Itβs overlap-safe:
- Skips adding subjects already covered by existing wildcards
- Pre-filters subjects owned by other streams to avoid
BadRequest: subjects overlap with an existing stream - Retries once on concurrent races, then logs and continues safely
π Database Setup (Inbox / Outbox)
Inbox/Outbox are optional. The library detects columns at runtime and only sets what exists, so you can start minimal and evolve later.
Generator-created tables (recommended)
# jetstream_outbox_events
create_table :jetstream_outbox_events do |t|
t.string :event_id, null: false
t.string :subject, null: false
t.jsonb :payload, null: false, default: {}
t.jsonb :headers, null: false, default: {}
t.string :status, null: false, default: "pending" # pending|publishing|sent|failed
t.integer :attempts, null: false, default: 0
t.text :last_error
t.datetime :enqueued_at
t.datetime :sent_at
t.timestamps
end
add_index :jetstream_outbox_events, :event_id, unique: true
add_index :jetstream_outbox_events, :status
# jetstream_inbox_events
create_table :jetstream_inbox_events do |t|
t.string :event_id # preferred dedupe key
t.string :subject, null: false
t.jsonb :payload, null: false, default: {}
t.jsonb :headers, null: false, default: {}
t.string :stream
t.bigint :stream_seq
t.integer :deliveries
t.string :status, null: false, default: "received" # received|processing|processed|failed
t.text :last_error
t.datetime :received_at
t.datetime :processed_at
t.timestamps
end
add_index :jetstream_inbox_events, :event_id, unique: true, where: 'event_id IS NOT NULL'
add_index :jetstream_inbox_events, [:stream, :stream_seq], unique: true, where: 'stream IS NOT NULL AND stream_seq IS NOT NULL'
add_index :jetstream_inbox_events, :statusAlready have different table names? Point the config to your AR classes via
config.outbox_model/config.inbox_model.
π€ Publish Events
publisher = JetstreamBridge::Publisher.new
publisher.publish(
resource_type: "user",
event_type: "created",
payload: { id: "01H...", name: "Ada" }, # resource_id inferred from payload[:id] / payload["id"]
# optional:
# event_id: "uuid-or-ulid",
# trace_id: "hex",
# occurred_at: Time.now.utc
)If Outbox is enabled, the publish call:
- Upserts an outbox row by
event_id - Publishes with
nats-msg-id(idempotent) - Marks status
sentor recordsfailedwithlast_error
π₯ Consume Events
JetstreamBridge::Consumer.new do |event, subject, deliveries|
# Your idempotent domain logic here
# `event` is the parsed envelope hash
UserCreatedHandler.call(event["payload"])
end.run!durable_name and batch_size default to the configured values and can be
overridden if needed:
JetstreamBridge::Consumer.new(durable_name: 'my-durable', batch_size: 10) do |event, subject, deliveries|
# ...
end.run!If Inbox is enabled, the consumer:
- Dedupes by
event_id(falls back to stream sequence if needed) - Records processing state, errors, and timestamps
- Skips already-processed messages (acks immediately)
π¬ Envelope Format
{
"event_id": "01H1234567890ABCDEF",
"schema_version": 1,
"event_type": "created",
"producer": "myapp",
"resource_type": "user",
"resource_id": "01H1234567890ABCDEF",
"occurred_at": "2025-08-13T21:00:00Z",
"trace_id": "abc123",
"payload": { "id": "01H...", "name": "Ada" }
}-
resource_idis inferred frompayload.idwhen publishing.
𧨠Dead-Letter Queue (DLQ)
When enabled, the topology ensures the DLQ subject exists:
{env}.data.sync.dlq
You may run a separate process to subscribe and triage messages that exceed max_deliver or are NAKβed to the DLQ.
π Operations Guide
Monitoring
-
Consumer lag:
nats consumer info <stream> <durable> -
DLQ volume: subscribe/metrics on
{env}.data.sync.dlq -
Outbox backlog: alert on
jetstream_outbox_eventswithstatus != 'sent'and growing count
Scaling
- Run consumers in separate processes/containers
- Scale consumers independently of web
- Tune
batch_size,ack_wait,max_deliver, andbackoff
Health check
-
Force-connect & ensure topology at boot or in a check:
# Returns JetStream context if successful JetstreamBridge.ensure_topology!
When to Use
- Inbox: you need idempotent processing and replay safety
- Outbox: you want βDB commit β event published (or recorded for retry)β guarantees
π§© Troubleshooting
-
subjects overlap with an existing streamThe library pre-filters overlapping subjects and retries once. If another team owns a broad wildcard (e.g.,env.data.sync.>), coordinate subject boundaries. -
Consumer exists with mismatched filter The library detects and recreates the durable with the desired filter subject.
-
Repeated redeliveries Increase
ack_wait, review handler acks/NACKs, or move poison messages to DLQ.
π Getting Started
- Add the gem & run
bundle install bin/rails g jetstream_bridge:installbin/rails db:migrate- Start publishing/consuming!