Jetstream Bridge
Production-safe realtime data bridge between systems using NATS JetStream. Includes durable consumers, backpressure, retries, DLQ, optional Inbox/Outbox, and overlap-safe stream provisioning.
β¨ Features
- π Simple Publisher and Consumer interfaces
- π‘ Outbox (reliable send) & Inbox (idempotent receive), opt-in
- 𧨠DLQ for poison messages
- βοΈ Durable
pull_subscribe
with backoff &max_deliver
- π― Clear source/destination subject conventions
- π§± Overlap-safe stream ensure (prevents βsubjects overlapβ BadRequest)
- π Rails generators for initializer & migrations, plus an install rake task
- β‘οΈ Eager-loaded models via Railtie (production)
- π Configurable logging with sensible defaults
π¦ Install
# Gemfile
gem "jetstream_bridge", "~> 2.10"
bundle install
π§° Rails Generators & Rake Task
From your Rails app:
# Create initializer + migrations
bin/rails g jetstream_bridge:install
# Or run them separately:
bin/rails g jetstream_bridge:initializer
bin/rails g jetstream_bridge:migrations
# Rake task (does both initializer + migrations)
bin/rake jetstream_bridge:install
Then:
bin/rails db:migrate
The generators create:
config/initializers/jetstream_bridge.rb
db/migrate/*_create_jetstream_outbox_events.rb
db/migrate/*_create_jetstream_inbox_events.rb
π§ Configure (Rails)
# config/initializers/jetstream_bridge.rb
JetstreamBridge.configure do |config|
# NATS connection
config.nats_urls = ENV.fetch("NATS_URLS", "nats://localhost:4222")
config.env = ENV.fetch("NATS_ENV", "development")
config.app_name = ENV.fetch("APP_NAME", "app")
config.destination_app = ENV["DESTINATION_APP"] # required
# Consumer tuning
config.max_deliver = 5
config.ack_wait = "30s"
config.backoff = %w[1s 5s 15s 30s 60s]
# Reliability features (opt-in)
config.use_outbox = true
config.use_inbox = true
config.use_dlq = true
# Models (override if you use custom AR classes/table names)
config.outbox_model = "JetstreamBridge::OutboxEvent"
config.inbox_model = "JetstreamBridge::InboxEvent"
# Logging
# config.logger = Rails.logger
end
Defaults:
stream_name
β#{env}-jetstream-bridge-stream
dlq_subject
β#{env}.data.sync.dlq
Logging
JetstreamBridge logs through config.logger
when set, falling back to Rails.logger
or STDOUT. Provide any Logger
-compatible instance in the initializer to integrate with your application's logging setup.
π‘ Subject Conventions
Direction | Subject Pattern |
---|---|
Publish | {env}.{app}.sync.{dest} |
Subscribe | {env}.{dest}.sync.{app} |
DLQ | {env}.sync.dlq |
-
{app}
:app_name
-
{dest}
:destination_app
-
{env}
:env
π§± Stream Topology (auto-ensure and overlap-safe)
On first connection, Jetstream Bridge ensures a single stream exists for your env
and that it covers:
-
source_subject
({env}.{app}.sync.{dest}
) -
destination_subject
({env}.{dest}.sync.{app}
) -
dlq_subject
(if enabled)
Itβs overlap-safe:
- Skips adding subjects already covered by existing wildcards
- Pre-filters subjects owned by other streams to avoid
BadRequest: subjects overlap with an existing stream
- Retries once on concurrent races, then logs and continues safely
π Database Setup (Inbox / Outbox)
Inbox/Outbox are optional. The library detects columns at runtime and only sets what exists, so you can start minimal and evolve later.
Generator-created tables (recommended)
# jetstream_outbox_events
create_table :jetstream_outbox_events do |t|
t.string :event_id, null: false
t.string :subject, null: false
t.jsonb :payload, null: false, default: {}
t.jsonb :headers, null: false, default: {}
t.string :status, null: false, default: "pending" # pending|publishing|sent|failed
t.integer :attempts, null: false, default: 0
t.text :last_error
t.datetime :enqueued_at
t.datetime :sent_at
t.timestamps
end
add_index :jetstream_outbox_events, :event_id, unique: true
add_index :jetstream_outbox_events, :status
# jetstream_inbox_events
create_table :jetstream_inbox_events do |t|
t.string :event_id # preferred dedupe key
t.string :subject, null: false
t.jsonb :payload, null: false, default: {}
t.jsonb :headers, null: false, default: {}
t.string :stream
t.bigint :stream_seq
t.integer :deliveries
t.string :status, null: false, default: "received" # received|processing|processed|failed
t.text :last_error
t.datetime :received_at
t.datetime :processed_at
t.timestamps
end
add_index :jetstream_inbox_events, :event_id, unique: true, where: 'event_id IS NOT NULL'
add_index :jetstream_inbox_events, [:stream, :stream_seq], unique: true, where: 'stream IS NOT NULL AND stream_seq IS NOT NULL'
add_index :jetstream_inbox_events, :status
Already have different table names? Point the config to your AR classes via
config.outbox_model
/config.inbox_model
.
π€ Publish Events
publisher = JetstreamBridge::Publisher.new
publisher.publish(
resource_type: "user",
event_type: "created",
payload: { id: "01H...", name: "Ada" }, # resource_id inferred from payload[:id] / payload["id"]
# optional:
# event_id: "uuid-or-ulid",
# trace_id: "hex",
# occurred_at: Time.now.utc
)
If Outbox is enabled, the publish call:
- Upserts an outbox row by
event_id
- Publishes with
nats-msg-id
(idempotent) - Marks status
sent
or recordsfailed
withlast_error
π₯ Consume Events
JetstreamBridge::Consumer.new do |event, subject, deliveries|
# Your idempotent domain logic here
# `event` is the parsed envelope hash
UserCreatedHandler.call(event["payload"])
end.run!
durable_name
and batch_size
default to the configured values and can be
overridden if needed:
JetstreamBridge::Consumer.new(durable_name: 'my-durable', batch_size: 10) do |event, subject, deliveries|
# ...
end.run!
If Inbox is enabled, the consumer:
- Dedupes by
event_id
(falls back to stream sequence if needed) - Records processing state, errors, and timestamps
- Skips already-processed messages (acks immediately)
π¬ Envelope Format
{
"event_id": "01H1234567890ABCDEF",
"schema_version": 1,
"event_type": "created",
"producer": "myapp",
"resource_type": "user",
"resource_id": "01H1234567890ABCDEF",
"occurred_at": "2025-08-13T21:00:00Z",
"trace_id": "abc123",
"payload": { "id": "01H...", "name": "Ada" }
}
-
resource_id
is inferred frompayload.id
when publishing.
𧨠Dead-Letter Queue (DLQ)
When enabled, the topology ensures the DLQ subject exists:
{env}.data.sync.dlq
You may run a separate process to subscribe and triage messages that exceed max_deliver
or are NAKβed to the DLQ.
π Operations Guide
Monitoring
-
Consumer lag:
nats consumer info <stream> <durable>
-
DLQ volume: subscribe/metrics on
{env}.data.sync.dlq
-
Outbox backlog: alert on
jetstream_outbox_events
withstatus != 'sent'
and growing count
Scaling
- Run consumers in separate processes/containers
- Scale consumers independently of web
- Tune
batch_size
,ack_wait
,max_deliver
, andbackoff
Health check
-
Force-connect & ensure topology at boot or in a check:
# Returns JetStream context if successful JetstreamBridge.ensure_topology!
When to Use
- Inbox: you need idempotent processing and replay safety
- Outbox: you want βDB commit β event published (or recorded for retry)β guarantees
π§© Troubleshooting
-
subjects overlap with an existing stream
The library pre-filters overlapping subjects and retries once. If another team owns a broad wildcard (e.g.,env.data.sync.>
), coordinate subject boundaries. -
Consumer exists with mismatched filter The library detects and recreates the durable with the desired filter subject.
-
Repeated redeliveries Increase
ack_wait
, review handler acks/NACKs, or move poison messages to DLQ.
π Getting Started
- Add the gem & run
bundle install
bin/rails g jetstream_bridge:install
bin/rails db:migrate
- Start publishing/consuming!