NatsPubsub
Declarative Pub/Sub messaging for NATS JetStream
A production-ready pub/sub library with a familiar, declarative API. Features declarative subscribers, middleware support, and battle-tested reliability patterns including Inbox/Outbox, DLQ, and automatic retries with backoff.
Implementations for and
with full interoperability
๐ Table of Contents
- Documentation
- Quick Start
- Packages
- Features
- Real-World Examples
- Performance
- Development
- Contributing
- License
๐ Documentation
๐ View Full Documentation Site โ
Comprehensive documentation is available in the docs/ directory:
Getting Started
- Introduction - What is NatsPubsub and why use it
- Installation - Setup for JavaScript and Ruby
- JavaScript Quick Start - Get running in 5 minutes
- Ruby Quick Start - Get running in 5 minutes
- Core Concepts - Topics, subscribers, reliability patterns
Guides
- Publishing Messages - Complete publishing guide
- Creating Subscribers - Subscriber patterns and best practices
- Middleware System - Add cross-cutting concerns
- Testing Strategies - Unit, integration, and E2E testing
- Deployment Guide - Docker, Kubernetes, production setup
- Performance Tuning - Optimization strategies
Patterns
- Inbox/Outbox Pattern - Guaranteed delivery and exactly-once processing
- Dead Letter Queue - Handle failed messages
- Schema Validation - Validate messages with Zod
- Event Sourcing - Build event-sourced systems
Integrations
- Ruby on Rails - Rails integration guide
- Express.js - Express integration
- NestJS - NestJS integration
- Databases - PostgreSQL, MySQL, SQLite
Reference
- JavaScript API - Complete TypeScript/JavaScript API
- Ruby API - Complete Ruby API
- Configuration - All configuration options
- CLI Reference - Command-line tools
Advanced
- Architecture - System design and components
- Internals - How NatsPubsub works
- Custom Repositories - Implement custom storage
- Security - Security best practices
Troubleshooting
- Common Issues - Solutions to frequent problems
- Debugging Guide - Debug message flow
- FAQ - Frequently asked questions
Examples
- Example Projects - Complete working examples
- Microservices Example - Multi-service architecture
- JavaScript Examples - TypeScript/JavaScript examples
- Ruby Examples - Ruby examples
๐ Start with the Documentation Index โ
๐ Quick Start
Start the full development environment with Docker Compose:
git clone https://github.com/attaradev/nats_pubsub.git
cd nats-pubsub
docker compose up -dThis starts NATS, PostgreSQL, Prometheus, and Grafana with preconfigured monitoring.
For package-specific setup:
๐ฆ Packages
Rails-integrated Pub/Sub library with Web UI, Inbox/Outbox, and ActiveRecord support.
gem "nats_pubsub", "~> 0.1"Node.js Pub/Sub library with full TypeScript support and enterprise monitoring.
pnpm add nats-pubsubโจ Features
Core Capabilities
- ๐ฏ Topic-Based Messaging - Flexible pub/sub with hierarchical topics (e.g.,
order.created,notification.email) - ๐ Declarative Subscriber API - Clean, decorator-based subscription patterns
- ๐ฒ Hierarchical Topics - Organize messages with dot notation and wildcard subscriptions (
*for single level,>for multiple levels) - ๐งจ Dead Letter Queue (DLQ) - Automatic handling of failed messages
- โ๏ธ Durable Pull Consumers - Reliable message delivery with exponential backoff
- ๐ญ Middleware System - Extensible processing pipeline for cross-cutting concerns
- ๐ Auto-Topology Management - Automatic JetStream stream and consumer creation
Ruby-Specific
- ๐ก๏ธ Inbox/Outbox reliability patterns
- ๐ Web UI for monitoring
- ๐ ActiveRecord integration
- ๐ Rails generators
JavaScript-Specific
- ๐ Prometheus metrics
- โค๏ธ Health check endpoints
- ๐ฆ Batch publishing API
- ๐ Full TypeScript support
Cross-Language
Both implementations use identical event formats, enabling seamless interoperability between Ruby and JavaScript services.
๐ Learn More:
- Ruby Package Docs - Full Ruby documentation with Rails integration
- JavaScript Package Docs - Full TypeScript/JavaScript documentation
- Performance Benchmarks - Throughput and latency metrics
- Contributing Guide - How to contribute to the project
๐ Real-World Examples
E-Commerce Order Processing
Scenario: Process orders across multiple services (inventory, shipping, notifications)
Ruby Publisher (Order Service):
# app/services/order_service.rb
class OrderService
def create_order(user_id, items)
Order.transaction do
order = Order.create!(user_id: user_id, status: 'pending')
# Publish order created event
NatsPubsub.publish(
topic: 'order.created',
message: {
order_id: order.id,
user_id: user_id,
item: items,
total: calculate_total(items)
}
)
end
end
endJavaScript Subscriber (Inventory Service):
// subscribers/inventory-subscriber.ts
import { Subscriber, TopicMetadata } from "nats-pubsub";
class InventorySubscriber extends Subscriber {
constructor() {
super("production.order-service.order.created");
}
async handle(message: Record<string, unknown>, metadata: TopicMetadata) {
const { order_id, item } = message;
// Reserve inventory
await this.reserveInventory(item as Array<any>);
// Publish inventory reserved event
await NatsPubsub.publish("inventory.reserved", {
order_id,
reserved_at: new Date(),
});
}
private async reserveInventory(item: any[]) {
// Inventory reservation logic
}
}User Notification System
Scenario: Send welcome emails, SMS, and push notifications when users sign up
JavaScript Publisher (Auth Service):
// services/auth.service.ts
async function createUser(userData: UserData) {
const user = await User.create(userData);
// Publish to multiple notification topics
await NatsPubsub.publish({
topics: ["notification.email", "notification.sms", "audit.user-created"],
message: {
user_id: user.id,
email: user.email,
phone: user.phone,
name: user.name,
},
});
return user;
}Ruby Subscribers (Notification Service):
# app/subscribers/email_notification_subscriber.rb
class EmailNotificationSubscriber < NatsPubsub::Subscriber
subscribe_to "notification.email"
def handle(message, context)
WelcomeMailer.welcome_email(
email: message['email'],
name: message['name']
).deliver_later
end
end
# app/subscribers/sms_notification_subscriber.rb
class SmsNotificationSubscriber < NatsPubsub::Subscriber
subscribe_to "notification.sms"
def handle(message, context)
TwilioService.send_welcome_sms(
phone: message['phone'],
name: message['name']
)
end
endMicroservices Communication
Scenario: Payment service notifies order and analytics services
// Payment Service (Publisher)
await NatsPubsub.publish("payment.completed", {
payment_id: payment.id,
order_id: payment.order_id,
amount: payment.amount,
currency: "USD",
});
// Order Service (Subscriber)
class PaymentCompletedSubscriber extends Subscriber {
constructor() {
super("production.payment-service.payment.completed");
}
async handle(message: Record<string, unknown>) {
await Order.update(message.order_id, { status: "paid" });
}
}
// Analytics Service (Subscriber with wildcard)
class PaymentAnalyticsSubscriber extends Subscriber {
constructor() {
super("production.payment-service.payment.*"); // Wildcard subscription
}
async handle(message: Record<string, unknown>, metadata: TopicMetadata) {
await Analytics.track({
event: metadata.topic,
properties: message,
});
}
}Audit Logging
Scenario: Centralized audit log for all system events
# Any service publishing events
NatsPubsub.publish(
topic: 'user.login',
message: { user_id: user.id, ip: request.ip },
trace_id: request_id
)
# Audit Service (Subscriber) - Wildcard to capture all events
class AuditLogSubscriber < NatsPubsub::Subscriber
subscribe_to "production.*.>" # Subscribe to ALL topics from all services
def handle(message, context)
AuditLog.create!(
event_type: context.topic,
user_id: message['user_id'],
data: message,
trace_id: context.trace_id,
occurred_at: context.occurred_at
)
end
end๐ Performance
Benchmarks
NatsPubsub leverages NATS JetStream for high-performance messaging:
| Metric | Performance | Configuration |
|---|---|---|
| Latency (p50) | < 1ms | Single server, local network |
| Latency (p99) | < 5ms | Single server, local network |
| Throughput | 1M+ msg/sec | Single NATS server, in-memory |
| Throughput | 500K+ msg/sec | With JetStream persistence |
| Message Size | Up to 1MB | Default, configurable |
| Concurrent Consumers | 1000+ | Per stream |
Real-World Performance
Typical Microservices Setup:
- Latency: 2-10ms (including network + processing)
- Throughput: 50K-200K msg/sec per service
- Concurrency: 10-50 concurrent message processors
Factors Affecting Performance:
- ๐ธ Network latency between services
- ๐ธ Message size and serialization
- ๐ธ Subscriber processing time
- ๐ธ Database operations in subscribers
- ๐ธ JetStream storage type (memory vs file)
Performance Tuning
1. Optimize Concurrency
// JavaScript
NatsPubsub.configure({
concurrency: 20, // Adjust based on workload
});# Ruby
NatsPubsub.configure do |config|
config.concurrency = 20
endGuidelines:
- CPU-bound tasks:
concurrency = CPU cores - I/O-bound tasks:
concurrency = 2-4x CPU cores - Database-heavy:
concurrency โค DB pool size
2. Batch Processing
class BatchProcessor extends BaseSubscriber {
private batch: any[] = [];
private batchSize = 100;
private flushInterval = 1000; // 1 second
async call(message: Record<string, unknown>) {
this.batch.push(message);
if (this.batch.length >= this.batchSize) {
await this.flush();
}
}
private async flush() {
if (this.batch.length === 0) return;
// Process batch
await Database.bulkInsert(this.batch);
this.batch = [];
}
}3. Connection Pooling
// Reuse database connections
const pool = new Pool({
max: 20, // Match or exceed concurrency
min: 5,
});4. Message Size Optimization
// Keep messages small
await NatsPubsub.publish("user.created", {
id: user.id, // โ
Reference
// user: fullUserObject // โ Avoid embedding large objects
});Monitoring Performance
// Add timing middleware
class PerformanceMiddleware {
async call(event: any, metadata: any, next: () => Promise<void>) {
const start = Date.now();
await next();
const duration = Date.now() - start;
if (duration > 1000) {
console.warn(`Slow processing: ${metadata.subject} took ${duration}ms`);
}
}
}
NatsPubsub.use(new PerformanceMiddleware());Scaling Guidelines
| Messages/Day | Setup | Estimated Cost |
|---|---|---|
| < 1M | Single NATS server | ~$50/month |
| 1M - 10M | Single NATS server + monitoring | ~$100/month |
| 10M - 100M | NATS cluster (3 nodes) | ~$300/month |
| 100M - 1B | NATS cluster (5 nodes) + replicas | ~$1000/month |
| > 1B | Contact for architecture review | Varies |
Note: Based on AWS/GCP pricing for comparable instances.
๐ ๏ธ Development
# Install dependencies (monorepo root)
pnpm install
# Or install per package
cd packages/ruby && bundle install
cd packages/javascript && pnpm install
# Run tests
cd packages/ruby && bundle exec rspec
cd packages/javascript && pnpm test
# Run with coverage
cd packages/javascript && pnpm test -- --coverage
cd packages/ruby && bundle exec rspec --format documentation
# Build all packages
pnpm build
# Lint all packages
pnpm lint
# Documentation website
pnpm start:website # Start dev server
pnpm build:website # Build for productionGit Hooks
This repository uses Husky for Git hooks:
- pre-commit โ runs lint-staged
- commit-msg โ validates Conventional Commits
- pre-push โ runs tests before pushing
Set up hooks after cloning:
pnpm install # Automatically configures hooksMore guides:
๐ค Contributing
We welcome contributions!
See CONTRIBUTING.md for details.
Quick Steps:
- Fork the repo
- Create a branch (
git checkout -b feat/awesome-feature) - Add tests and implement changes
- Commit using Conventional Commits
- Open a Pull Request
๐ License
MIT License - Copyright (c) 2025 Mike Attara