0.0
No release in over 3 years
A production-ready pub/sub library for NATS JetStream with Rails integration. Features declarative subscribers, auto-discovery, middleware support, Web UI for monitoring Inbox/Outbox events, and production-ready patterns including Inbox/Outbox, DLQ, and automatic retries with backoff.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
 Dependencies

Runtime

>= 6.0, < 9
>= 6.0, < 9
~> 2.5
~> 3.16
>= 3, < 5
 Project Readme

NatsPubsub Logo

NatsPubsub

Declarative Pub/Sub messaging for NATS JetStream

A production-ready pub/sub library with a familiar, declarative API. Features declarative subscribers, middleware support, and battle-tested reliability patterns including Inbox/Outbox, DLQ, and automatic retries with backoff.

Ruby CI JavaScript CI License Gem Version npm Version Gem Downloads npm Downloads

Implementations for Ruby and TypeScript with full interoperability


๐Ÿ“‹ Table of Contents

  • Documentation
  • Quick Start
  • Packages
  • Features
  • Real-World Examples
  • Performance
  • Development
  • Contributing
  • License

๐Ÿ“– Documentation

๐Ÿ“š View Full Documentation Site โ†’

Comprehensive documentation is available in the docs/ directory:

Getting Started

Guides

Patterns

Integrations

Reference

Advanced

Troubleshooting

Examples

๐Ÿ“š Start with the Documentation Index โ†’


๐Ÿš€ Quick Start

Start the full development environment with Docker Compose:

git clone https://github.com/attaradev/nats_pubsub.git
cd nats-pubsub
docker compose up -d

This starts NATS, PostgreSQL, Prometheus, and Grafana with preconfigured monitoring.

For package-specific setup:


๐Ÿ“ฆ Packages

Rails-integrated Pub/Sub library with Web UI, Inbox/Outbox, and ActiveRecord support.

gem "nats_pubsub", "~> 0.1"

๐Ÿ“– Full Ruby Docs โ†’


Node.js Pub/Sub library with full TypeScript support and enterprise monitoring.

pnpm add nats-pubsub

๐Ÿ“– Full JavaScript Docs โ†’


โœจ Features

Core Capabilities

  • ๐ŸŽฏ Topic-Based Messaging - Flexible pub/sub with hierarchical topics (e.g., order.created, notification.email)
  • ๐Ÿ”Œ Declarative Subscriber API - Clean, decorator-based subscription patterns
  • ๐ŸŒฒ Hierarchical Topics - Organize messages with dot notation and wildcard subscriptions (* for single level, > for multiple levels)
  • ๐Ÿงจ Dead Letter Queue (DLQ) - Automatic handling of failed messages
  • โš™๏ธ Durable Pull Consumers - Reliable message delivery with exponential backoff
  • ๐ŸŽญ Middleware System - Extensible processing pipeline for cross-cutting concerns
  • ๐Ÿ”„ Auto-Topology Management - Automatic JetStream stream and consumer creation

Ruby-Specific

  • ๐Ÿ›ก๏ธ Inbox/Outbox reliability patterns
  • ๐Ÿ“Š Web UI for monitoring
  • ๐Ÿ”— ActiveRecord integration
  • ๐Ÿš‚ Rails generators

JavaScript-Specific

  • ๐Ÿ“Š Prometheus metrics
  • โค๏ธ Health check endpoints
  • ๐Ÿ“ฆ Batch publishing API
  • ๐Ÿš€ Full TypeScript support

Cross-Language

Both implementations use identical event formats, enabling seamless interoperability between Ruby and JavaScript services.

๐Ÿ“– Learn More:


๐Ÿ“š Real-World Examples

E-Commerce Order Processing

Scenario: Process orders across multiple services (inventory, shipping, notifications)

Ruby Publisher (Order Service):

# app/services/order_service.rb
class OrderService
  def create_order(user_id, items)
    Order.transaction do
      order = Order.create!(user_id: user_id, status: 'pending')

      # Publish order created event
      NatsPubsub.publish(
        topic: 'order.created',
        message: {
          order_id: order.id,
          user_id: user_id,
          item: items,
          total: calculate_total(items)
        }
      )
    end
  end
end

JavaScript Subscriber (Inventory Service):

// subscribers/inventory-subscriber.ts
import { Subscriber, TopicMetadata } from "nats-pubsub";

class InventorySubscriber extends Subscriber {
  constructor() {
    super("production.order-service.order.created");
  }

  async handle(message: Record<string, unknown>, metadata: TopicMetadata) {
    const { order_id, item } = message;

    // Reserve inventory
    await this.reserveInventory(item as Array<any>);

    // Publish inventory reserved event
    await NatsPubsub.publish("inventory.reserved", {
      order_id,
      reserved_at: new Date(),
    });
  }

  private async reserveInventory(item: any[]) {
    // Inventory reservation logic
  }
}

User Notification System

Scenario: Send welcome emails, SMS, and push notifications when users sign up

JavaScript Publisher (Auth Service):

// services/auth.service.ts
async function createUser(userData: UserData) {
  const user = await User.create(userData);

  // Publish to multiple notification topics
  await NatsPubsub.publish({
    topics: ["notification.email", "notification.sms", "audit.user-created"],
    message: {
      user_id: user.id,
      email: user.email,
      phone: user.phone,
      name: user.name,
    },
  });

  return user;
}

Ruby Subscribers (Notification Service):

# app/subscribers/email_notification_subscriber.rb
class EmailNotificationSubscriber < NatsPubsub::Subscriber
  subscribe_to "notification.email"

  def handle(message, context)
    WelcomeMailer.welcome_email(
      email: message['email'],
      name: message['name']
    ).deliver_later
  end
end

# app/subscribers/sms_notification_subscriber.rb
class SmsNotificationSubscriber < NatsPubsub::Subscriber
  subscribe_to "notification.sms"

  def handle(message, context)
    TwilioService.send_welcome_sms(
      phone: message['phone'],
      name: message['name']
    )
  end
end

Microservices Communication

Scenario: Payment service notifies order and analytics services

// Payment Service (Publisher)
await NatsPubsub.publish("payment.completed", {
  payment_id: payment.id,
  order_id: payment.order_id,
  amount: payment.amount,
  currency: "USD",
});

// Order Service (Subscriber)
class PaymentCompletedSubscriber extends Subscriber {
  constructor() {
    super("production.payment-service.payment.completed");
  }

  async handle(message: Record<string, unknown>) {
    await Order.update(message.order_id, { status: "paid" });
  }
}

// Analytics Service (Subscriber with wildcard)
class PaymentAnalyticsSubscriber extends Subscriber {
  constructor() {
    super("production.payment-service.payment.*"); // Wildcard subscription
  }

  async handle(message: Record<string, unknown>, metadata: TopicMetadata) {
    await Analytics.track({
      event: metadata.topic,
      properties: message,
    });
  }
}

Audit Logging

Scenario: Centralized audit log for all system events

# Any service publishing events
NatsPubsub.publish(
  topic: 'user.login',
  message: { user_id: user.id, ip: request.ip },
  trace_id: request_id
)

# Audit Service (Subscriber) - Wildcard to capture all events
class AuditLogSubscriber < NatsPubsub::Subscriber
  subscribe_to "production.*.>" # Subscribe to ALL topics from all services

  def handle(message, context)
    AuditLog.create!(
      event_type: context.topic,
      user_id: message['user_id'],
      data: message,
      trace_id: context.trace_id,
      occurred_at: context.occurred_at
    )
  end
end

๐Ÿ“Š Performance

Benchmarks

NatsPubsub leverages NATS JetStream for high-performance messaging:

Metric Performance Configuration
Latency (p50) < 1ms Single server, local network
Latency (p99) < 5ms Single server, local network
Throughput 1M+ msg/sec Single NATS server, in-memory
Throughput 500K+ msg/sec With JetStream persistence
Message Size Up to 1MB Default, configurable
Concurrent Consumers 1000+ Per stream

Real-World Performance

Typical Microservices Setup:

  • Latency: 2-10ms (including network + processing)
  • Throughput: 50K-200K msg/sec per service
  • Concurrency: 10-50 concurrent message processors

Factors Affecting Performance:

  • ๐Ÿ”ธ Network latency between services
  • ๐Ÿ”ธ Message size and serialization
  • ๐Ÿ”ธ Subscriber processing time
  • ๐Ÿ”ธ Database operations in subscribers
  • ๐Ÿ”ธ JetStream storage type (memory vs file)

Performance Tuning

1. Optimize Concurrency

// JavaScript
NatsPubsub.configure({
  concurrency: 20, // Adjust based on workload
});
# Ruby
NatsPubsub.configure do |config|
  config.concurrency = 20
end

Guidelines:

  • CPU-bound tasks: concurrency = CPU cores
  • I/O-bound tasks: concurrency = 2-4x CPU cores
  • Database-heavy: concurrency โ‰ค DB pool size

2. Batch Processing

class BatchProcessor extends BaseSubscriber {
  private batch: any[] = [];
  private batchSize = 100;
  private flushInterval = 1000; // 1 second

  async call(message: Record<string, unknown>) {
    this.batch.push(message);

    if (this.batch.length >= this.batchSize) {
      await this.flush();
    }
  }

  private async flush() {
    if (this.batch.length === 0) return;

    // Process batch
    await Database.bulkInsert(this.batch);
    this.batch = [];
  }
}

3. Connection Pooling

// Reuse database connections
const pool = new Pool({
  max: 20, // Match or exceed concurrency
  min: 5,
});

4. Message Size Optimization

// Keep messages small
await NatsPubsub.publish("user.created", {
  id: user.id, // โœ… Reference
  // user: fullUserObject  // โŒ Avoid embedding large objects
});

Monitoring Performance

// Add timing middleware
class PerformanceMiddleware {
  async call(event: any, metadata: any, next: () => Promise<void>) {
    const start = Date.now();

    await next();

    const duration = Date.now() - start;
    if (duration > 1000) {
      console.warn(`Slow processing: ${metadata.subject} took ${duration}ms`);
    }
  }
}

NatsPubsub.use(new PerformanceMiddleware());

Scaling Guidelines

Messages/Day Setup Estimated Cost
< 1M Single NATS server ~$50/month
1M - 10M Single NATS server + monitoring ~$100/month
10M - 100M NATS cluster (3 nodes) ~$300/month
100M - 1B NATS cluster (5 nodes) + replicas ~$1000/month
> 1B Contact for architecture review Varies

Note: Based on AWS/GCP pricing for comparable instances.


๐Ÿ› ๏ธ Development

# Install dependencies (monorepo root)
pnpm install

# Or install per package
cd packages/ruby && bundle install
cd packages/javascript && pnpm install

# Run tests
cd packages/ruby && bundle exec rspec
cd packages/javascript && pnpm test

# Run with coverage
cd packages/javascript && pnpm test -- --coverage
cd packages/ruby && bundle exec rspec --format documentation

# Build all packages
pnpm build

# Lint all packages
pnpm lint

# Documentation website
pnpm start:website    # Start dev server
pnpm build:website    # Build for production

Git Hooks

This repository uses Husky for Git hooks:

  • pre-commit โ†’ runs lint-staged
  • commit-msg โ†’ validates Conventional Commits
  • pre-push โ†’ runs tests before pushing

Set up hooks after cloning:

pnpm install  # Automatically configures hooks

More guides:


๐Ÿค Contributing

We welcome contributions!

See CONTRIBUTING.md for details.

Quick Steps:

  1. Fork the repo
  2. Create a branch (git checkout -b feat/awesome-feature)
  3. Add tests and implement changes
  4. Commit using Conventional Commits
  5. Open a Pull Request

๐Ÿ“„ License

MIT License - Copyright (c) 2025 Mike Attara