0.0
No release in over 3 years
Much like ActiveRecord abstracts the model as an ORM from the backend data-store, SmartMessage abstracts the message from its backend transport processes.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
 Project Readme

SmartMessage Logo

SmartMessage

Gem Version Ruby

SmartMessage is a message abstraction framework that decouples business logic from message transports and serialization formats. Much like ActiveRecord abstracts models from database implementations, SmartMessage abstracts messages from their backend transport and serialization mechanisms.

Features

  • Transport Abstraction: Plugin architecture supporting multiple message transports (Redis, RabbitMQ, Kafka, etc.)
  • Serialization Flexibility: Pluggable serialization formats (JSON, MessagePack, etc.)
  • Entity-to-Entity Addressing: Built-in FROM/TO/REPLY_TO addressing for point-to-point and broadcast messaging patterns
  • Advanced Message Filtering: Filter subscriptions using exact strings, regular expressions, or mixed arrays for precise message routing
  • Schema Versioning: Built-in version management with automatic compatibility validation
  • Comprehensive Validation: Property validation with custom error messages and automatic validation before publishing
  • Message Documentation: Built-in documentation support for message classes and properties with automatic defaults
  • Flexible Message Handlers: Multiple subscription patterns - default methods, custom methods, blocks, procs, and lambdas
  • Dual-Level Configuration: Class and instance-level plugin overrides for gateway patterns
  • Concurrent Processing: Thread-safe message routing using Concurrent::CachedThreadPool
  • Advanced Logging System: Comprehensive logging with colorized console output, JSON structured logging, and file rolling
  • Built-in Statistics: Message processing metrics and monitoring
  • Message Deduplication: Handler-scoped deduplication queues (DDQ) with memory or Redis storage for preventing duplicate message processing
  • Development Tools: STDOUT and in-memory transports for testing
  • Production Ready: Redis transport with automatic reconnection and error handling
  • Dead Letter Queue: File-based DLQ with JSON Lines format for failed message capture and replay
  • Circuit Breaker Integration: Production-grade reliability with BreakerMachines for automatic fallback and recovery

Installation

Add this line to your application's Gemfile:

gem 'smart_message'

And then execute:

bundle install

Or install it yourself as:

gem install smart_message

Quick Start

1. Define a Message Class

require 'smart_message'

class OrderMessage < SmartMessage::Base
  # Declare schema version for compatibility tracking
  version 2

  # Add a description for the message class
  description "Represents customer order data for processing and fulfillment"

  # Configure entity addressing (Method 1: Direct methods)
  from 'order-service'
  to 'fulfillment-service'  # Point-to-point message
  reply_to 'order-service'  # Responses come back here

  # Alternative Method 2: Using header block
  # header do
  #   from 'order-service'
  #   to 'fulfillment-service'
  #   reply_to 'order-service'
  # end

  # Required properties with validation
  property :order_id,
    required: true,
    message: "Order ID is required",
    validate: ->(v) { v.is_a?(String) && v.length > 0 },
    validation_message: "Order ID must be a non-empty string",
    description: "Unique order identifier"

  property :customer_id,
    required: true,
    message: "Customer ID is required",
    description: "Customer's unique ID"

  property :amount,
    required: true,
    message: "Amount is required",
    validate: ->(v) { v.is_a?(Numeric) && v > 0 },
    validation_message: "Amount must be a positive number",
    description: "Total order amount in dollars"

  property :items,
    default: [],
    description: "Array of ordered items"

  # Configure transport and serializer at class level
  config do
    transport SmartMessage::Transport.create(:stdout, loopback: true)
    serializer SmartMessage::Serializer::JSON.new
  end

  # Business logic for processing received messages
  def self.process(message_instance)
    # Message instance is already decoded and validated
    puts "Processing order #{message_instance.order_id} for customer #{message_instance.customer_id}"
    puts "Amount: $#{message_instance.amount}"

    # Your business logic here
    process_order(message_instance)
  end

  private

  def self.process_order(order)
    # Implementation specific to your domain
  end
end

2. Publish Messages

# Create and publish a message (automatically validated before publishing)
order = OrderMessage.new(
  order_id: "ORD-123",
  customer_id: "CUST-456",
  amount: 99.99,
  items: ["Widget A", "Widget B"]
)

# Message is automatically validated before publishing
order.publish  # Validates all properties, header, and version compatibility

# Or validate manually
if order.valid?
  order.publish
else
  errors = order.validation_errors
  errors.each { |err| puts "#{err[:property]}: #{err[:message]}" }
end

3. Subscribe to Messages

SmartMessage supports multiple ways to handle incoming messages:

# 1. Default handler (uses self.process method)
OrderMessage.subscribe

# 2. Custom method handler
OrderMessage.subscribe("PaymentService.process_order")

# 3. Block handler (NEW!)
OrderMessage.subscribe do |wrapper|
  header, payload = wrapper.split
  order_data = JSON.parse(payload)
  puts "Quick processing: Order #{order_data['order_id']}"
end

# 4. Proc handler (NEW!)
order_processor = proc do |wrapper|
  header, payload = wrapper.split
  order_data = JSON.parse(payload)
  EmailService.send_confirmation(order_data['customer_id'])
end
OrderMessage.subscribe(order_processor)

# 5. Lambda handler (NEW!)
audit_handler = lambda do |wrapper|
  header, payload = wrapper.split
  AuditLog.record("Order processed at #{header.published_at}")
end
OrderMessage.subscribe(audit_handler)

4. Message Filtering (NEW!)

SmartMessage supports powerful message filtering using exact strings, regular expressions, or arrays:

# Filter by exact sender
OrderMessage.subscribe(from: 'payment-service')

# Filter by sender pattern (all payment services)
OrderMessage.subscribe(from: /^payment-.*/)

# Filter by multiple senders
OrderMessage.subscribe(from: ['admin', 'system', 'monitoring'])

# Mixed exact and pattern matching
OrderMessage.subscribe(from: ['admin', /^system-.*/, 'legacy-service'])

# Filter by recipient patterns
OrderMessage.subscribe(to: /^(dev|staging)-.*/)

# Combined filtering
OrderMessage.subscribe(
  from: /^admin-.*/,
  to: ['order-service', /^fulfillment-.*/]
)

# Environment-based routing
DevService.subscribe(to: /^(dev|staging)-.*/)
ProdService.subscribe(to: /^prod-.*/)

5. Message Deduplication

SmartMessage provides handler-scoped message deduplication to prevent duplicate processing of messages with the same UUID. Each handler gets its own Deduplication Queue (DDQ) that tracks recently processed message UUIDs.

Basic Deduplication Setup

class OrderMessage < SmartMessage::Base
  version 1
  property :order_id, required: true
  property :amount, required: true

  from "order-service"

  # Configure deduplication
  ddq_size 100              # Track last 100 message UUIDs
  ddq_storage :memory       # Use memory storage (or :redis for distributed)
  enable_deduplication!     # Enable deduplication for this message class

  def self.process(message_instance)
    puts "Processing order: #{message_instance.order_id}"
    # Business logic here
  end
end

Handler-Scoped Isolation

Each handler gets its own DDQ scope, preventing cross-contamination between different subscribers:

# Each handler gets separate deduplication tracking
OrderMessage.subscribe('PaymentService.process')     # DDQ: "OrderMessage:PaymentService.process"
OrderMessage.subscribe('FulfillmentService.handle')  # DDQ: "OrderMessage:FulfillmentService.handle"
OrderMessage.subscribe('AuditService.log_order')     # DDQ: "OrderMessage:AuditService.log_order"

# Same handler across message classes = separate DDQs
PaymentMessage.subscribe('PaymentService.process')   # DDQ: "PaymentMessage:PaymentService.process"
InvoiceMessage.subscribe('PaymentService.process')   # DDQ: "InvoiceMessage:PaymentService.process"

Storage Options

# Memory-based DDQ (single process)
class LocalMessage < SmartMessage::Base
  ddq_size 50
  ddq_storage :memory
  enable_deduplication!
end

# Redis-based DDQ (distributed/multi-process)
class DistributedMessage < SmartMessage::Base
  ddq_size 1000
  ddq_storage :redis, redis_url: 'redis://localhost:6379', redis_db: 1
  enable_deduplication!
end

DDQ Statistics and Management

# Check deduplication configuration
config = OrderMessage.ddq_config
puts "Enabled: #{config[:enabled]}"
puts "Size: #{config[:size]}"
puts "Storage: #{config[:storage]}"

# Get DDQ statistics
stats = OrderMessage.ddq_stats
puts "Current count: #{stats[:current_count]}"
puts "Utilization: #{stats[:utilization]}%"

# Clear DDQ if needed
OrderMessage.clear_ddq!

# Check if specific UUID is duplicate
OrderMessage.duplicate_uuid?("some-uuid-123")

How Deduplication Works

  1. Message Receipt: When a message arrives, the dispatcher checks the handler's DDQ for the message UUID
  2. Duplicate Detection: If UUID exists in DDQ, the message is ignored (logged but not processed)
  3. Processing: If UUID is new, the message is processed by the handler
  4. UUID Storage: After successful processing, the UUID is added to the handler's DDQ
  5. Circular Buffer: When DDQ reaches capacity, oldest UUIDs are evicted to make room for new ones

Benefits

  • Handler Isolation: Each handler maintains independent deduplication state
  • Cross-Process Support: Redis DDQ enables deduplication across multiple processes
  • Memory Efficient: Circular buffer with configurable size limits memory usage
  • High Performance: O(1) UUID lookup using hybrid array + set data structure
  • Automatic Integration: Seamlessly works with existing subscription patterns

6. Entity Addressing

SmartMessage supports entity-to-entity addressing with FROM/TO/REPLY_TO fields for advanced message routing. You can configure addressing using three different approaches:

Method 1: Direct Class Methods

class PaymentMessage < SmartMessage::Base
  version 1
  from 'payment-service'     # Required: sender identity
  to 'bank-gateway'          # Optional: specific recipient
  reply_to 'payment-service' # Optional: where responses go

  property :amount, required: true
  property :account_id, required: true
end

Method 2: Header Block DSL

class PaymentMessage < SmartMessage::Base
  version 1

  # Configure all addressing in a single block
  header do
    from 'payment-service'
    to 'bank-gateway'
    reply_to 'payment-service'
  end

  property :amount, required: true
  property :account_id, required: true
end

Method 3: Instance-Level Configuration

# Create payment instance
payment = PaymentMessage.new(amount: 100.00, account_id: "ACCT-123")

# Override addressing at instance level
payment.to('backup-gateway')  # Method chaining supported
payment.from('urgent-processor')

# Alternative setter syntax
payment.from = 'urgent-processor'
payment.to = 'backup-gateway'

# Access addressing (shortcut methods)
puts payment.from      # => 'urgent-processor'
puts payment.to        # => 'backup-gateway'
puts payment.reply_to  # => 'payment-service'

# Access via header (full path)
puts payment._sm_header.from      # => 'urgent-processor'
puts payment._sm_header.to        # => 'backup-gateway'
puts payment._sm_header.reply_to  # => 'payment-service'

# Publish with updated addressing
payment.publish

Broadcast Messaging Example

class SystemAnnouncementMessage < SmartMessage::Base
  version 1

  # Using header block for broadcast configuration
  header do
    from 'admin-service'  # Required: sender identity
    # No 'to' field = broadcast to all subscribers
  end

  property :message, required: true
  property :priority, default: 'normal'
end

Messaging Patterns Supported

  • Point-to-Point: Set to field for direct entity targeting
  • Broadcast: Omit to field (nil) for message broadcast to all subscribers
  • Request-Reply: Use reply_to field to specify response routing
  • Gateway Patterns: Override addressing at instance level for message forwarding

Logging Configuration

SmartMessage includes a comprehensive logging system with support for multiple output formats, colorization, and file rolling capabilities.

Basic Logging Configuration

# Configure SmartMessage logging
SmartMessage.configure do |config|
  config.logger = STDOUT              # Output destination (file path, STDOUT, STDERR)
  config.log_level = :info           # Log level (:debug, :info, :warn, :error, :fatal)
  config.log_format = :text          # Output format (:text, :json)
  config.log_colorize = true         # Enable colorized console output
  config.log_include_source = false  # Include source file/line information
  config.log_structured_data = false # Enable structured data logging
end

# Access the configured logger in your application
logger = SmartMessage.configuration.default_logger
logger.info("Application started", component: "main", pid: Process.pid)

Advanced Logging Features

Colorized Console Output

SmartMessage.configure do |config|
  config.logger = STDOUT
  config.log_colorize = true
  config.log_format = :text
end

logger = SmartMessage.configuration.default_logger
logger.debug("Debug message")    # Green background, white text
logger.info("Info message")      # White text
logger.warn("Warning message")   # Yellow background, white bold text
logger.error("Error message")    # Light red background, white bold text
logger.fatal("Fatal message")    # Light red background, yellow bold text

JSON Structured Logging

SmartMessage.configure do |config|
  config.logger = "log/application.log"
  config.log_format = :json
  config.log_structured_data = true
  config.log_include_source = true
end

logger = SmartMessage.configuration.default_logger
logger.info("User action",
            user_id: 12345,
            action: "login",
            ip_address: "192.168.1.1")
# Output: {"timestamp":"2025-01-15T10:30:45.123Z","level":"INFO","message":"User action","user_id":12345,"action":"login","ip_address":"192.168.1.1","source":"app.rb:42:in `authenticate`"}

File Rolling Configuration

SmartMessage.configure do |config|
  config.logger = "log/application.log"
  config.log_options = {
    # Size-based rolling
    roll_by_size: true,
    max_file_size: 10 * 1024 * 1024,  # 10 MB
    keep_files: 5,                     # Keep 5 old files

    # Date-based rolling (alternative to size-based)
    roll_by_date: false,               # Set to true for date-based
    date_pattern: '%Y-%m-%d'           # Daily rolling pattern
  }
end

SmartMessage Integration

SmartMessage classes automatically use the configured logger:

class OrderMessage < SmartMessage::Base
  property :order_id, required: true
  property :amount, required: true

  def process
    # Logger is automatically available
    logger.info("Processing order",
                order_id: order_id,
                amount: amount,
                header: _sm_header.to_h,
                payload: _sm_payload)
  end
end

# Messages inherit the global logger configuration
message = OrderMessage.new(order_id: "123", amount: 99.99)
message.publish  # Uses configured logger for any internal logging

Architecture

Core Components

SmartMessage::Base

The foundation class that all messages inherit from. Built on Hashie::Dash with extensions for:

  • Property management and coercion
  • Multi-level plugin configuration
  • Message lifecycle management
  • Automatic header generation (UUID, timestamps, process tracking)

Transport Layer

Pluggable message delivery system with built-in implementations:

  • StdoutTransport: Development and testing transport
  • MemoryTransport: In-memory queuing for testing
  • RedisTransport: Redis pub/sub transport for production messaging
  • Custom Transports: Implement SmartMessage::Transport::Base

Serializer System

Pluggable message encoding/decoding:

  • JSON Serializer: Built-in JSON support
  • Custom Serializers: Implement SmartMessage::Serializer::Base

Dispatcher

Concurrent message routing engine that:

  • Uses thread pools for async processing
  • Routes messages to subscribed handlers with handler-scoped deduplication
  • Provides processing statistics and DDQ management
  • Handles graceful shutdown
  • Maintains separate DDQ instances per handler for isolated deduplication tracking

Plugin Architecture

SmartMessage supports two levels of plugin configuration:

# Class-level configuration (default for all instances)
class MyMessage < SmartMessage::Base
  config do
    transport MyTransport.new
    serializer MySerializer.new
    logger MyLogger.new
  end
end

# Instance-level configuration (overrides class defaults)
message = MyMessage.new
message.config do
  transport DifferentTransport.new  # Override for this instance
end

This enables gateway patterns where messages can be received from one transport/serializer and republished to another.

Transport Implementations

STDOUT Transport (Development)

# Basic STDOUT output
transport = SmartMessage::Transport.create(:stdout)

# With loopback for testing subscriptions
transport = SmartMessage::Transport.create(:stdout, loopback: true)

# Output to file
transport = SmartMessage::Transport.create(:stdout, output: "messages.log")

Memory Transport (Testing)

# Auto-process messages as they're published
transport = SmartMessage::Transport.create(:memory, auto_process: true)

# Store messages without processing
transport = SmartMessage::Transport.create(:memory, auto_process: false)

# Check stored messages
puts transport.message_count
puts transport.all_messages
transport.process_all  # Process all pending messages

Redis Transport (Production)

# Basic Redis configuration
transport = SmartMessage::Transport.create(:redis,
  url: 'redis://localhost:6379',
  db: 0
)

# Production configuration with custom options
transport = SmartMessage::Transport.create(:redis,
  url: 'redis://prod-redis:6379',
  db: 1,
  auto_subscribe: true,
  reconnect_attempts: 5,
  reconnect_delay: 2
)

# Configure message class to use Redis
MyMessage.config do
  transport SmartMessage::Transport.create(:redis, url: 'redis://localhost:6379')
  serializer SmartMessage::Serializer::JSON.new
end

# Subscribe to messages (uses message class name as Redis channel)
MyMessage.subscribe

# Publish messages (automatically publishes to Redis channel named "MyMessage")
message = MyMessage.new(data: "Hello Redis!")
message.publish

The Redis transport uses the message class name as the Redis channel name, enabling automatic routing of messages to their appropriate handlers.

Custom Transport

class WebhookTransport < SmartMessage::Transport::Base
  def default_options
    {
      webhook_url: "https://api.example.com/webhooks",
      timeout: 30,
      retries: 3
    }
  end

  def configure
    require 'net/http'
    @uri = URI(@options[:webhook_url])
  end

  def publish(message_header, message_payload)
    http = Net::HTTP.new(@uri.host, @uri.port)
    http.use_ssl = @uri.scheme == 'https'

    request = Net::HTTP::Post.new(@uri)
    request['Content-Type'] = 'application/json'
    request['X-Message-Class'] = message_header.message_class
    request.body = message_payload

    response = http.request(request)
    raise "Webhook failed: #{response.code}" unless response.code.to_i < 400
  end

  def subscribe(message_class, process_method)
    super
    # For webhooks, subscription would typically be configured
    # externally on the webhook provider's side
  end
end

# Register the transport
SmartMessage::Transport.register(:webhook, WebhookTransport)

# Use the transport
MyMessage.config do
  transport SmartMessage::Transport.create(:webhook,
    webhook_url: "https://api.myservice.com/messages"
  )
end

Message Lifecycle

  1. Definition: Create message class inheriting from SmartMessage::Base
  2. Configuration: Set transport, serializer, logger plugins, and entity addressing (from/to/reply_to)
  3. Validation: Messages are automatically validated before publishing (properties, header, addressing, version compatibility)
  4. Publishing: Message instance is encoded with addressing metadata and sent through transport
  5. Subscription: Message classes register handlers with dispatcher for processing
    • Default handlers (self.process method)
    • Custom method handlers ("ClassName.method_name")
    • Block handlers (subscribe do |h,p|...end)
    • Proc/Lambda handlers (subscribe(proc {...}))
  6. Routing: Dispatcher uses addressing metadata to route messages (point-to-point vs broadcast)
  7. Processing: Received messages are decoded and routed to registered handlers

Schema Versioning and Validation

SmartMessage includes comprehensive validation and versioning capabilities to ensure message integrity and schema evolution support.

Version Declaration

Declare your message schema version using the version class method:

class OrderMessage < SmartMessage::Base
  version 2  # Schema version 2

  property :order_id, required: true
  property :customer_email  # Added in version 2
end

Property Validation

Properties support multiple validation types with custom error messages:

class UserMessage < SmartMessage::Base
  version 1

  # Required field validation (Hashie built-in)
  property :user_id,
    required: true,
    message: "User ID is required and cannot be blank"

  # Custom validation with lambda
  property :age,
    validate: ->(v) { v.is_a?(Integer) && v.between?(1, 120) },
    validation_message: "Age must be an integer between 1 and 120"

  # Email validation with regex
  property :email,
    validate: /\A[\w+\-.]+@[a-z\d\-]+(\.[a-z\d\-]+)*\.[a-z]+\z/i,
    validation_message: "Must be a valid email address"

  # Inclusion validation with array
  property :status,
    validate: ['active', 'inactive', 'pending'],
    validation_message: "Status must be active, inactive, or pending"
end

Validation Methods

All message instances include validation methods:

user = UserMessage.new(user_id: "123", age: 25)

# Validate entire message (properties + header + version)
user.validate!           # Raises SmartMessage::Errors::ValidationError on failure
user.valid?              # Returns true/false

# Get detailed validation errors
errors = user.validation_errors
errors.each do |error|
  puts "#{error[:source]}.#{error[:property]}: #{error[:message]}"
  # Example output:
  # message.age: Age must be an integer between 1 and 120
  # header.version: Header version must be a positive integer
  # version_mismatch.version: Expected version 1, got: 2
end

Automatic Validation

Messages are automatically validated during publishing:

# This will raise ValidationError if invalid
message = UserMessage.new(user_id: "", age: 150)
message.publish  # Automatically validates before publishing

Version Compatibility

The framework automatically validates version compatibility:

class V2Message < SmartMessage::Base
  version 2
  property :data
end

message = V2Message.new(data: "test")
# Header automatically gets version: 2

# Simulate version mismatch (e.g., from older message)
message._sm_header.version = 1
message.validate!  # Raises: "V2Message expects version 2, but header has version 1"

Supported Validation Types

  • Proc/Lambda: validate: ->(v) { v.length > 5 }
  • Regexp: validate: /\A[a-z]+\z/
  • Class: validate: String (type checking)
  • Array: validate: ['red', 'green', 'blue'] (inclusion)
  • Range: validate: (1..100) (range checking)
  • Symbol: validate: :custom_validator_method

Message Documentation

SmartMessage provides built-in documentation capabilities for both message classes and their properties.

Class-Level Descriptions

Use the description DSL method to document what your message class represents:

class OrderMessage < SmartMessage::Base
  description "Represents customer order data for processing and fulfillment"

  property :order_id, required: true
  property :amount, required: true
end

class UserMessage < SmartMessage::Base
  description "Handles user management operations including registration and updates"

  property :user_id, required: true
  property :email, required: true
end

# Access descriptions
puts OrderMessage.description
# => "Represents customer order data for processing and fulfillment"

puts UserMessage.description
# => "Handles user management operations including registration and updates"

# Instance access to class description
order = OrderMessage.new(order_id: "123", amount: 99.99)
puts order.description
# => "Represents customer order data for processing and fulfillment"

Default Descriptions

Classes without explicit descriptions automatically get a default description:

class MyMessage < SmartMessage::Base
  property :data
end

puts MyMessage.description
# => "MyMessage is a SmartMessage"

Property Documentation

Combine class descriptions with property descriptions for comprehensive documentation:

class FullyDocumented < SmartMessage::Base
  description "A fully documented message class for demonstration purposes"

  property :id,
    description: "Unique identifier for the record"
  property :name,
    description: "Display name for the entity"
  property :status,
    description: "Current processing status",
    validate: ['active', 'inactive', 'pending']
end

# Access all documentation
puts FullyDocumented.description
# => "A fully documented message class for demonstration purposes"

puts FullyDocumented.property_description(:id)
# => "Unique identifier for the record"

puts FullyDocumented.property_descriptions
# => {:id=>"Unique identifier for the record", :name=>"Display name for the entity", ...}

Documentation in Config Blocks

You can also set descriptions within configuration blocks:

class ConfiguredMessage < SmartMessage::Base
  config do
    description "Set within config block"
    transport SmartMessage::Transport.create(:stdout)
    serializer SmartMessage::Serializer::JSON.new
  end
end

Advanced Usage

Statistics and Monitoring

SmartMessage includes built-in statistics collection:

# Access global statistics
puts SS.stat  # Shows all collected statistics

# Get specific counts
publish_count = SS.get("MyMessage", "publish")
process_count = SS.get("MyMessage", "MyMessage.process", "routed")

# Reset statistics
SS.reset  # Clear all stats
SS.reset("MyMessage", "publish")  # Reset specific stat

Dispatcher Status

dispatcher = SmartMessage::Dispatcher.new

# Check thread pool status
status = dispatcher.status
puts "Running: #{status[:running]}"
puts "Queue length: #{status[:queue_length]}"
puts "Completed tasks: #{status[:completed_task_count]}"

# Check subscriptions
puts dispatcher.subscribers

Message Properties and Headers

class MyMessage < SmartMessage::Base
  property :user_id, description: "User's unique identifier"
  property :action, description: "Action performed by the user"
  property :timestamp, default: -> { Time.now }, description: "When the action occurred"
end

message = MyMessage.new(user_id: 123, action: "login")

# Access message properties
puts message.user_id
puts message.fields  # Returns Set of property names (excluding internal _sm_ properties)

# Access property descriptions
puts MyMessage.property_description(:user_id)  # => "User's unique identifier"
puts MyMessage.property_descriptions            # => Hash of all descriptions

# Access message header
puts message._sm_header.uuid
puts message._sm_header.message_class
puts message._sm_header.published_at
puts message._sm_header.publisher_pid
puts message._sm_header.from
puts message._sm_header.to
puts message._sm_header.reply_to

Dead Letter Queue

SmartMessage includes a comprehensive file-based Dead Letter Queue system for handling failed messages:

# Configure global DLQ (optional - defaults to 'dead_letters.jsonl')
SmartMessage::DeadLetterQueue.configure_default('/var/log/app/dlq.jsonl')

# Or use environment-based configuration
SmartMessage::DeadLetterQueue.configure_default(
  ENV.fetch('SMART_MESSAGE_DLQ_PATH', 'dead_letters.jsonl')
)

# Access the default DLQ instance
dlq = SmartMessage::DeadLetterQueue.default

# Create a custom DLQ instance for specific needs
custom_dlq = SmartMessage::DeadLetterQueue.new('/tmp/critical_failures.jsonl')

DLQ Operations

# Messages are automatically captured when circuit breakers trip
# But you can also manually enqueue failed messages:
dlq.enqueue(
  message._sm_header,
  message_payload,
  error: "Connection timeout",
  transport: "Redis",
  retry_count: 3
)

# Inspect queue status
puts "Queue size: #{dlq.size}"
puts "Next message: #{dlq.peek}"  # Look without removing

# Get statistics
stats = dlq.statistics
puts "Total messages: #{stats[:total]}"
puts "By error type: #{stats[:by_error]}"
puts "By message class: #{stats[:by_class]}"

Message Replay

# Replay messages back through their original transport
dlq.replay_one           # Replay oldest message
dlq.replay_batch(10)      # Replay next 10 messages
dlq.replay_all            # Replay entire queue

# Replay with a different transport
redis_transport = SmartMessage::Transport.create(:redis)
dlq.replay_one(redis_transport)  # Override original transport

Administrative Functions

# Filter messages for analysis
failed_orders = dlq.filter_by_class('OrderMessage')
timeout_errors = dlq.filter_by_error_pattern(/timeout/i)

# Export messages within a time range
yesterday = Time.now - 86400
today = Time.now
recent_failures = dlq.export_range(yesterday, today)

# Clear the queue when needed
dlq.clear  # Remove all messages

Integration with Circuit Breakers

Dead Letter Queue is automatically integrated with circuit breakers:

class PaymentMessage < SmartMessage::Base
  config do
    transport SmartMessage::Transport.create(:redis)
    # Messages automatically go to DLQ when circuit breaker trips
  end
end

# Monitor circuit breaker status
transport = PaymentMessage.transport
stats = transport.transport_circuit_stats
if stats[:transport_publish][:open]
  puts "Circuit open - messages going to DLQ"
end

Development

After checking out the repo, run:

bin/setup      # Install dependencies
bin/console    # Start interactive console
rake test      # Run test suite

Testing

SmartMessage uses Minitest with Shoulda for testing:

rake test                           # Run all tests
ruby -Ilib:test test/base_test.rb  # Run specific test file

Test output and debug information is logged to test.log.

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/MadBomber/smart_message.

License

The gem is available as open source under the terms of the MIT License.