Project

pgmq-ruby

0.01
No release in over 3 years
There's a lot of open issues
A Ruby driver for PGMQ - a lightweight message queue built on PostgreSQL. Like AWS SQS and RSMQ, but on Postgres.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
 Dependencies

Runtime

~> 1.5
~> 2.6
 Project Readme

PGMQ-Ruby

Gem Version Build Status

Ruby client for PGMQ - PostgreSQL Message Queue

What is PGMQ-Ruby?

PGMQ-Ruby is a Ruby client for PGMQ (PostgreSQL Message Queue). It provides direct access to all PGMQ operations with a clean, minimal API - similar to how rdkafka-ruby relates to Kafka.

Think of it as:

  • Like AWS SQS - but running entirely in PostgreSQL with no external dependencies
  • Like Sidekiq/Resque - but without Redis, using PostgreSQL for both data and queues
  • Like rdkafka-ruby - a thin, efficient wrapper around the underlying system (PGMQ SQL functions)

Architecture Note: This library follows the rdkafka-ruby/Karafka pattern - pgmq-ruby is the low-level foundation, while higher-level features (job processing, Rails integration, retry strategies) will live in pgmq-framework (similar to how Karafka builds on rdkafka-ruby).

Table of Contents

  • PGMQ Feature Support
  • Requirements
  • Installation
  • Quick Start
  • Configuration
  • API Reference
    • Queue Management
    • Sending Messages
    • Reading Messages
    • Grouped Round-Robin Reading
    • Message Lifecycle
    • Monitoring
    • Transaction Support
    • Topic Routing
  • Message Object
  • Working with JSON
  • Development
  • Author

PGMQ Feature Support

This gem provides complete support for all core PGMQ SQL functions. Based on the official PGMQ API:

Category Method Description Status
Producing produce Send single message with optional delay and headers
produce_batch Send multiple messages atomically with headers
Reading read Read single message with visibility timeout
read_batch Read multiple messages with visibility timeout
read_with_poll Long-polling for efficient message consumption
read_grouped_rr Round-robin reading across message groups
read_grouped_rr_with_poll Round-robin with long-polling
pop Atomic read + delete operation
pop_batch Atomic batch read + delete operation
Deleting/Archiving delete Delete single message
delete_batch Delete multiple messages
archive Archive single message for long-term storage
archive_batch Archive multiple messages
purge_queue Remove all messages from queue
Queue Management create Create standard queue
create_partitioned Create partitioned queue (requires pg_partman)
create_unlogged Create unlogged queue (faster, no crash recovery)
drop_queue Delete queue and all messages
Topic Routing bind_topic Bind topic pattern to queue (AMQP-like)
unbind_topic Remove topic binding
produce_topic Send message via routing key
produce_batch_topic Batch send via routing key
list_topic_bindings List all topic bindings
test_routing Test which queues match a routing key
Utilities set_vt Update visibility timeout (integer or Time)
set_vt_batch Batch update visibility timeouts
set_vt_multi Update visibility timeouts across multiple queues
list_queues List all queues with metadata
metrics Get queue metrics (length, age, total messages)
metrics_all Get metrics for all queues
enable_notify_insert Enable PostgreSQL NOTIFY on insert
disable_notify_insert Disable notifications
Ruby Enhancements Transaction Support Atomic operations via client.transaction do |txn|
Conditional Filtering Server-side JSONB filtering with conditional:
Multi-Queue Ops Read/pop/delete/archive from multiple queues
Queue Validation 48-character limit and name validation
Connection Pooling Thread-safe connection pool for concurrency
Pluggable Serializers JSON (default) with custom serializer support

Requirements

  • Ruby 3.2+
  • PostgreSQL 14-18 with PGMQ extension installed

Installing PGMQ Extension

PGMQ can be installed on your PostgreSQL instance in several ways:

Standard Installation (Self-hosted PostgreSQL)

For self-hosted PostgreSQL instances with filesystem access, install via PGXN:

pgxn install pgmq

Or build from source:

git clone https://github.com/pgmq/pgmq.git
cd pgmq/pgmq-extension
make && make install

Then enable the extension:

CREATE EXTENSION pgmq;

Managed PostgreSQL Services (AWS RDS, Aurora, etc.)

For managed PostgreSQL services that don't allow native extension installation, PGMQ provides a SQL-only installation that works without filesystem access:

git clone https://github.com/pgmq/pgmq.git
cd pgmq
psql -f pgmq-extension/sql/pgmq.sql postgres://user:pass@your-rds-host:5432/database

This creates a pgmq schema with all required functions. See PGMQ Installation Guide for details.

Comparison:

Feature Extension SQL-only
Version tracking Yes No
Upgrade path Yes Manual
Filesystem access Required Not needed
Managed cloud services Limited Full support

Using pg_tle (Trusted Language Extensions)

If your managed PostgreSQL service supports pg_tle (available on AWS RDS PostgreSQL 14.5+ and Aurora), you can potentially install PGMQ as a Trusted Language Extension since PGMQ is written in PL/pgSQL and SQL (both supported by pg_tle).

To use pg_tle:

  1. Enable pg_tle on your instance (add to shared_preload_libraries)
  2. Create the pg_tle extension: CREATE EXTENSION pg_tle;
  3. Use pgtle.install_extension() to install PGMQ's SQL functions

See AWS pg_tle documentation for setup instructions.

Note: The SQL-only installation is simpler and recommended for most managed service use cases. pg_tle provides additional version management and extension lifecycle features if needed.

Installation

Add to your Gemfile:

gem 'pgmq-ruby'

Or install directly:

gem install pgmq-ruby

Quick Start

Basic Usage

require 'pgmq'

# Connect to database
client = PGMQ::Client.new(
  host: 'localhost',
  port: 5432,
  dbname: 'mydb',
  user: 'postgres',
  password: 'secret'
)

# Create a queue
client.create('orders')

# Send a message (must be JSON string)
msg_id = client.produce('orders', '{"order_id":123,"total":99.99}')

# Read a message (30 second visibility timeout)
msg = client.read('orders', vt: 30)
puts msg.message  # => "{\"order_id\":123,\"total\":99.99}" (raw JSON string)

# Parse and process (you handle deserialization)
data = JSON.parse(msg.message)
process_order(data)
client.delete('orders', msg.msg_id)

# Or archive for long-term storage
client.archive('orders', msg.msg_id)

# Clean up
client.drop_queue('orders')
client.close

Rails Integration (Reusing ActiveRecord Connection)

# config/initializers/pgmq.rb or in your model
class OrderProcessor
  def initialize
    # Reuse Rails' connection pool - no separate connection needed!
    @client = PGMQ::Client.new(-> { ActiveRecord::Base.connection.raw_connection })
  end

  def process_orders
    loop do
      msg = @client.read('orders', vt: 30)
      break unless msg

      # Parse JSON yourself
      data = JSON.parse(msg.message)
      process_order(data)
      @client.delete('orders', msg.msg_id)
    end
  end
end

Connection Options

PGMQ-Ruby supports multiple ways to connect:

Connection Hash

client = PGMQ::Client.new(
  host: 'localhost',
  port: 5432,
  dbname: 'mydb',
  user: 'postgres',
  password: 'secret',
  pool_size: 5,        # Default: 5
  pool_timeout: 5      # Default: 5 seconds
)

Connection String

client = PGMQ::Client.new('postgres://user:pass@localhost:5432/dbname')

Rails ActiveRecord (Recommended for Rails apps)

# Reuses Rails connection pool - no additional connections needed
client = PGMQ::Client.new(-> { ActiveRecord::Base.connection.raw_connection })

Custom Connection Pool

# Bring your own connection management
connection = PGMQ::Connection.new('postgres://localhost/mydb', pool_size: 10)
client = PGMQ::Client.new(connection)

Connection Pool Features

PGMQ-Ruby includes connection pooling with resilience:

# Configure pool size and timeouts
client = PGMQ::Client.new(
  'postgres://localhost/mydb',
  pool_size: 10,           # Number of connections (default: 5)
  pool_timeout: 5,         # Timeout in seconds (default: 5)
  auto_reconnect: true     # Auto-reconnect on connection loss (default: true)
)

# Monitor connection pool health
stats = client.stats
puts "Pool size: #{stats[:size]}"           # => 10
puts "Available: #{stats[:available]}"      # => 8 (2 in use)

# Disable auto-reconnect if you prefer explicit error handling
client = PGMQ::Client.new(
  'postgres://localhost/mydb',
  auto_reconnect: false
)

Connection Pool Benefits:

  • Thread-safe - Multiple threads can safely share a single client
  • Fiber-aware - Works with Ruby 3.0+ Fiber Scheduler for non-blocking I/O (tested with the async gem)
  • Auto-reconnect - Recovers from lost connections (configurable)
  • Health checks - Verifies connections before use to prevent stale connection errors
  • Monitoring - Track pool utilization with client.stats

API Reference

Queue Management

# Create a queue (returns true if created, false if already exists)
client.create("queue_name")      # => true
client.create("queue_name")      # => false (idempotent)

# Create partitioned queue (requires pg_partman)
client.create_partitioned("queue_name",
  partition_interval: "daily",
  retention_interval: "7 days"
)  # => true/false

# Create unlogged queue (faster, no crash recovery)
client.create_unlogged("queue_name")  # => true/false

# Drop queue (returns true if dropped, false if didn't exist)
client.drop_queue("queue_name")  # => true/false

# List all queues
queues = client.list_queues
# => [#<PGMQ::QueueMetadata queue_name="orders" created_at=...>, ...]

Queue Naming Rules

Queue names must follow PostgreSQL identifier rules with PGMQ-specific constraints:

  • Maximum 48 characters (PGMQ enforces this limit for table prefixes)
  • Must start with a letter or underscore
  • Can contain only letters, digits, and underscores
  • Case-sensitive

Valid Queue Names:

client.create("orders")           # ✓ Simple name
client.create("high_priority")    # ✓ With underscore
client.create("Queue123")         # ✓ With numbers
client.create("_internal")        # ✓ Starts with underscore
client.create("a" * 47)          # ✓ Maximum length (47 chars)

Invalid Queue Names:

client.create("123orders")        # ✗ Starts with number
client.create("my-queue")         # ✗ Contains hyphen
client.create("my.queue")         # ✗ Contains period
client.create("a" * 48)          # ✗ Too long (48+ chars)
# Raises PGMQ::Errors::InvalidQueueNameError

Sending Messages

# Send single message (must be JSON string)
msg_id = client.produce("queue_name", '{"data":"value"}')

# Send with delay (seconds)
msg_id = client.produce("queue_name", '{"data":"value"}', delay: 60)

# Send with headers (for routing, tracing, correlation)
msg_id = client.produce("queue_name", '{"data":"value"}',
  headers: '{"trace_id":"abc123","priority":"high"}')

# Send with headers and delay
msg_id = client.produce("queue_name", '{"data":"value"}',
  headers: '{"correlation_id":"req-456"}',
  delay: 60)

# Send batch (array of JSON strings)
msg_ids = client.produce_batch("queue_name", [
  '{"order":1}',
  '{"order":2}',
  '{"order":3}'
])
# => ["101", "102", "103"]

# Send batch with headers (one per message)
msg_ids = client.produce_batch("queue_name",
  ['{"order":1}', '{"order":2}'],
  headers: ['{"priority":"high"}', '{"priority":"low"}'])

Reading Messages

# Read single message
msg = client.read("queue_name", vt: 30)
# => #<PGMQ::Message msg_id="1" message="{...}">

# Read batch
messages = client.read_batch("queue_name", vt: 30, qty: 10)

# Read with long-polling
msg = client.read_with_poll("queue_name",
  vt: 30,
  qty: 1,
  max_poll_seconds: 5,
  poll_interval_ms: 100
)

# Pop (atomic read + delete)
msg = client.pop("queue_name")

# Pop batch (atomic read + delete for multiple messages)
messages = client.pop_batch("queue_name", 10)

# Grouped round-robin reading (fair processing across entities)
# Messages are grouped by the first key in their JSON payload
messages = client.read_grouped_rr("queue_name", vt: 30, qty: 10)

# Grouped round-robin with long-polling
messages = client.read_grouped_rr_with_poll("queue_name",
  vt: 30,
  qty: 10,
  max_poll_seconds: 5,
  poll_interval_ms: 100
)

Grouped Round-Robin Reading

When processing messages from multiple entities (users, orders, tenants), regular FIFO ordering can cause starvation - one entity with many messages can monopolize workers.

Grouped round-robin ensures fair processing by interleaving messages from different groups:

# Queue contains messages for different users:
# user_a: 5 messages, user_b: 2 messages, user_c: 1 message

# Regular read would process all user_a messages first (unfair)
messages = client.read_batch("tasks", vt: 30, qty: 8)
# => [user_a_1, user_a_2, user_a_3, user_a_4, user_a_5, user_b_1, user_b_2, user_c_1]

# Grouped round-robin ensures fair distribution
messages = client.read_grouped_rr("tasks", vt: 30, qty: 8)
# => [user_a_1, user_b_1, user_c_1, user_a_2, user_b_2, user_a_3, user_a_4, user_a_5]

How it works:

  • Messages are grouped by the first key in their JSON payload
  • The first key should be your grouping identifier (e.g., user_id, tenant_id, order_id)
  • PGMQ rotates through groups, taking one message from each before repeating

Message format for grouping:

# Good - user_id is first key, used for grouping
client.produce("tasks", '{"user_id":"user_a","task":"process"}')

# The grouping key should come first in your JSON

Conditional Message Filtering

Filter messages by JSON payload content using server-side JSONB queries:

# Filter by single condition
msg = client.read("orders", vt: 30, conditional: { status: "pending" })

# Filter by multiple conditions (AND logic)
msg = client.read("orders", vt: 30, conditional: {
  status: "pending",
  priority: "high"
})

# Filter by nested properties
msg = client.read("orders", vt: 30, conditional: {
  user: { role: "admin" }
})

# Works with read_batch
messages = client.read_batch("orders",
  vt: 30,
  qty: 10,
  conditional: { type: "priority" }
)

# Works with long-polling
messages = client.read_with_poll("orders",
  vt: 30,
  max_poll_seconds: 5,
  conditional: { status: "ready" }
)

How Filtering Works:

  • Filtering happens in PostgreSQL using JSONB containment operator (@>)
  • Only messages matching ALL conditions are returned (AND logic)
  • The qty parameter applies after filtering
  • Empty conditions {} means no filtering (same as omitting parameter)

Performance Tip: For frequently filtered fields, add JSONB indexes:

CREATE INDEX idx_orders_status
  ON pgmq.q_orders USING gin ((message->'status'));

Message Lifecycle

# Delete message
client.delete("queue_name", msg_id)

# Delete batch
deleted_ids = client.delete_batch("queue_name", [101, 102, 103])

# Archive message
client.archive("queue_name", msg_id)

# Archive batch
archived_ids = client.archive_batch("queue_name", [101, 102, 103])

# Update visibility timeout with integer offset (seconds from now)
msg = client.set_vt("queue_name", msg_id, vt: 60)

# Update visibility timeout with absolute Time (PGMQ v1.11.0+)
future_time = Time.now + 300  # 5 minutes from now
msg = client.set_vt("queue_name", msg_id, vt: future_time)

# Batch update visibility timeout
updated_msgs = client.set_vt_batch("queue_name", [101, 102, 103], vt: 60)

# Batch update with absolute Time
updated_msgs = client.set_vt_batch("queue_name", [101, 102, 103], vt: Time.now + 120)

# Update visibility timeout across multiple queues
client.set_vt_multi({
  "orders" => [1, 2, 3],
  "notifications" => [5, 6]
}, vt: 120)

# Purge all messages
count = client.purge_queue("queue_name")

# Enable PostgreSQL NOTIFY for a queue (for LISTEN-based consumers)
client.enable_notify_insert("queue_name", throttle_interval_ms: 250)

# Disable notifications
client.disable_notify_insert("queue_name")

Monitoring

# Get queue metrics
metrics = client.metrics("queue_name")
puts metrics.queue_length        # => 42
puts metrics.oldest_msg_age_sec  # => 120
puts metrics.newest_msg_age_sec  # => 5
puts metrics.total_messages      # => 1000

# Get all queue metrics
all_metrics = client.metrics_all
all_metrics.each do |m|
  puts "#{m.queue_name}: #{m.queue_length} messages"
end

Transaction Support

Low-level PostgreSQL transaction support for atomic operations. Transactions are a database primitive provided by PostgreSQL - this is a thin wrapper for convenience.

Execute atomic operations across multiple queues or combine queue operations with application data updates:

# Atomic operations across multiple queues
client.transaction do |txn|
  # Send to multiple queues atomically
  txn.produce("orders", '{"order_id":123}')
  txn.produce("notifications", '{"user_id":456,"type":"order_created"}')
  txn.produce("analytics", '{"event":"order_placed"}')
end

# Process message and update application state atomically
client.transaction do |txn|
  # Read and process message
  msg = txn.read("orders", vt: 30)

  if msg
    # Parse and update your database
    data = JSON.parse(msg.message)
    Order.create!(external_id: data["order_id"])

    # Delete message only if database update succeeds
    txn.delete("orders", msg.msg_id)
  end
end

# Automatic rollback on errors
client.transaction do |txn|
  txn.produce("queue1", '{"data":"message1"}')
  txn.produce("queue2", '{"data":"message2"}')

  raise "Something went wrong!"
  # Both messages are rolled back - neither queue receives anything
end

# Move messages between queues atomically
client.transaction do |txn|
  msg = txn.read("pending_orders", vt: 30)

  if msg
    data = JSON.parse(msg.message)
    if data["priority"] == "high"
      # Move to high-priority queue
      txn.produce("priority_orders", msg.message)
      txn.delete("pending_orders", msg.msg_id)
    end
  end
end

How Transactions Work:

  • Wraps PostgreSQL's native transaction support (similar to rdkafka-ruby providing Kafka transactions)
  • All operations within the block execute in a single PostgreSQL transaction
  • If any operation fails, the entire transaction is rolled back automatically
  • The transactional client delegates all PGMQ::Client methods for convenience

Use Cases:

  • Multi-queue coordination: Send related messages to multiple queues atomically
  • Exactly-once processing: Combine message deletion with application state updates
  • Message routing: Move messages between queues without losing data
  • Batch operations: Ensure all-or-nothing semantics for bulk operations

Important Notes:

  • Transactions hold database locks - keep them short to avoid blocking
  • Long transactions can impact queue throughput
  • Read operations with long visibility timeouts may cause lock contention
  • Consider using pop() for atomic read+delete in simple cases

Topic Routing (AMQP-like Patterns)

PGMQ v1.11.0+ supports AMQP-style topic routing, allowing messages to be delivered to multiple queues based on pattern matching.

Topic Patterns

Topic patterns support wildcards:

  • * matches exactly one word (e.g., orders.* matches orders.new but not orders.new.priority)
  • # matches zero or more words (e.g., orders.# matches orders, orders.new, and orders.new.priority)
# Create queues for different purposes
client.create("new_orders")
client.create("order_updates")
client.create("all_orders")
client.create("audit_log")

# Bind topic patterns to queues
client.bind_topic("orders.new", "new_orders")        # Exact match
client.bind_topic("orders.update", "order_updates")  # Exact match
client.bind_topic("orders.*", "all_orders")          # Single-word wildcard
client.bind_topic("#", "audit_log")                  # Catch-all

# Send messages via routing key
# Message is delivered to ALL queues with matching patterns
count = client.produce_topic("orders.new", '{"order_id":123}')
# => 3 (delivered to: new_orders, all_orders, audit_log)

count = client.produce_topic("orders.update", '{"order_id":123,"status":"shipped"}')
# => 3 (delivered to: order_updates, all_orders, audit_log)

# Send with headers and delay
count = client.produce_topic("orders.new.priority",
  '{"order_id":456}',
  headers: '{"trace_id":"abc123"}',
  delay: 0
)

# Batch send via topic routing
results = client.produce_batch_topic("orders.new", [
  '{"order_id":1}',
  '{"order_id":2}',
  '{"order_id":3}'
])
# => [{ queue_name: "new_orders", msg_id: "1" }, ...]

# List all topic bindings
bindings = client.list_topic_bindings
bindings.each do |b|
  puts "#{b[:pattern]} -> #{b[:queue_name]}"
end

# List bindings for specific queue
bindings = client.list_topic_bindings(queue_name: "all_orders")

# Test which queues a routing key would match (for debugging)
matches = client.test_routing("orders.new.priority")
# => [{ pattern: "orders.#", queue_name: "all_orders" }, ...]

# Validate routing keys and patterns
client.validate_routing_key("orders.new.priority")  # => true
client.validate_routing_key("orders.*")             # => false (wildcards not allowed in keys)
client.validate_topic_pattern("orders.*")           # => true
client.validate_topic_pattern("orders.#")           # => true

# Remove bindings when done
client.unbind_topic("orders.new", "new_orders")
client.unbind_topic("orders.*", "all_orders")

Use Cases:

  • Event broadcasting: Send events to multiple consumers based on event type
  • Multi-tenant routing: Route messages to tenant-specific queues
  • Log aggregation: Capture all messages in an audit queue while routing to specific handlers
  • Fan-out patterns: Deliver one message to multiple processing pipelines

Message Object

PGMQ-Ruby is a low-level transport library - it returns raw values from PostgreSQL without any transformation. You are responsible for parsing JSON and type conversion.

msg = client.read("queue", vt: 30)

# All values are strings as returned by PostgreSQL
msg.msg_id          # => "123" (String, not Integer)
msg.id              # => "123" (alias for msg_id)
msg.read_ct         # => "1" (String, not Integer)
msg.enqueued_at     # => "2025-01-15 10:30:00+00" (String, not Time)
msg.last_read_at    # => "2025-01-15 10:30:15+00" (String, or nil if never read)
msg.vt              # => "2025-01-15 10:30:30+00" (String, not Time)
msg.message         # => "{\"data\":\"value\"}" (Raw JSONB as JSON string)
msg.headers         # => "{\"trace_id\":\"abc123\"}" (Raw JSONB as JSON string, optional)
msg.queue_name      # => "my_queue" (only present for multi-queue operations, otherwise nil)

# You handle JSON parsing
data = JSON.parse(msg.message)  # => { "data" => "value" }
metadata = JSON.parse(msg.headers) if msg.headers  # => { "trace_id" => "abc123" }

# You handle type conversion if needed
id = msg.msg_id.to_i           # => 123
read_count = msg.read_ct.to_i  # => 1
enqueued = Time.parse(msg.enqueued_at)  # => 2025-01-15 10:30:00 UTC
last_read = Time.parse(msg.last_read_at) if msg.last_read_at  # => Time or nil

Message Headers

PGMQ supports optional message headers via the headers JSONB column. Headers are useful for metadata like routing information, correlation IDs, and distributed tracing:

# Sending a message with headers
message = '{"order_id":123}'
headers = '{"trace_id":"abc123","priority":"high","correlation_id":"req-456"}'

msg_id = client.produce("orders", message, headers: headers)

# Sending with headers and delay
msg_id = client.produce("orders", message, headers: headers, delay: 60)

# Batch produce with headers (one header object per message)
messages = ['{"id":1}', '{"id":2}', '{"id":3}']
headers = [
  '{"priority":"high"}',
  '{"priority":"medium"}',
  '{"priority":"low"}'
]
msg_ids = client.produce_batch("orders", messages, headers: headers)

# Reading messages with headers
msg = client.read("orders", vt: 30)
if msg.headers
  metadata = JSON.parse(msg.headers)
  trace_id = metadata["trace_id"]
  priority = metadata["priority"]
  correlation_id = metadata["correlation_id"]
end

Common header use cases:

  • Distributed tracing: trace_id, span_id, parent_span_id
  • Request correlation: correlation_id, causation_id
  • Routing: priority, region, tenant_id
  • Content metadata: content_type, encoding, version

Why Raw Values?

This library follows the rdkafka-ruby philosophy - provide a thin, performant wrapper around the underlying system:

  1. No assumptions - Your application decides how to parse timestamps, convert types, etc.
  2. Framework-agnostic - Works equally well with Rails, Sinatra, or plain Ruby
  3. Zero overhead - No hidden type conversion or object allocation
  4. Explicit control - You see exactly what PostgreSQL returns

Higher-level features (automatic deserialization, type conversion, instrumentation) belong in framework layers built on top of this library.

Working with JSON

PGMQ stores messages as JSONB in PostgreSQL. You must handle JSON serialization yourself:

Sending Messages

# Simple hash
msg = { order_id: 123, status: "pending" }
client.produce("orders", msg.to_json)

# Using JSON.generate for explicit control
client.produce("orders", JSON.generate(order_id: 123, status: "pending"))

# Pre-serialized JSON string
json_str = '{"order_id":123,"status":"pending"}'
client.produce("orders", json_str)

Reading Messages

msg = client.read("orders", vt: 30)

# Parse JSON yourself
data = JSON.parse(msg.message)
puts data["order_id"]  # => 123
puts data["status"]    # => "pending"

# Handle parsing errors
begin
  data = JSON.parse(msg.message)
rescue JSON::ParserError => e
  logger.error "Invalid JSON in message #{msg.msg_id}: #{e.message}"
  client.delete("orders", msg.msg_id)  # Remove invalid message
end

Helper Pattern (Optional)

For convenience, you can wrap the client in your own helper:

class QueueHelper
  def initialize(client)
    @client = client
  end

  def produce(queue, data)
    @client.produce(queue, data.to_json)
  end

  def read(queue, vt:)
    msg = @client.read(queue, vt: vt)
    return nil unless msg

    OpenStruct.new(
      id: msg.msg_id.to_i,
      data: JSON.parse(msg.message),
      read_count: msg.read_ct.to_i,
      raw: msg
    )
  end
end

helper = QueueHelper.new(client)
helper.produce("orders", { order_id: 123 })
msg = helper.read("orders", vt: 30)
puts msg.data["order_id"]  # => 123

Development

# Clone repository
git clone https://github.com/mensfeld/pgmq-ruby.git
cd pgmq-ruby

# Install dependencies
bundle install

# Start PostgreSQL with PGMQ
docker compose up -d

# Run tests
bundle exec rake test

# Run console
bundle exec bin/console

Author

Maintained by Maciej Mensfeld

Also check out Karafka - High-performance Apache Kafka framework for Ruby.