0.0
The project is in a healthy, maintained state
Transport-only PostgreSQL logical replication client for receiving pgoutput CopyData payloads.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
 Dependencies

Runtime

~> 1.6
 Project Readme

pgoutput-client

Gem Version CI Coverage Status Ruby Version License: MIT

A transport-only PostgreSQL logical replication client for receiving raw pgoutput payloads in Ruby.

pgoutput-client connects to PostgreSQL using logical replication, starts a pgoutput replication stream, receives CopyData messages, handles keepalives, sends standby feedback, and yields raw pgoutput payload bytes to downstream gems such as pgoutput-parser and pgoutput-decoder.

It intentionally does not parse row-change messages or decode PostgreSQL values.


Requirements

  • Ruby 3.4+
  • PostgreSQL 10+
  • pg gem
  • PostgreSQL publication and logical replication slot

Ecosystem Position

PostgreSQL logical replication
        │
        ▼
pgoutput-client
        │
        ▼
CopyData / pgoutput payloads
        │
        ▼
pgoutput-parser
        │
        ▼
Protocol messages
        │
        ▼
pgoutput-decoder
        │
        ▼
Decoded row events

pgoutput-client is the transport layer only.


Features

  • Opens PostgreSQL logical replication connections
  • Builds replication commands
  • Supports CREATE_REPLICATION_SLOT
  • Supports DROP_REPLICATION_SLOT
  • Supports START_REPLICATION SLOT ... LOGICAL ...
  • Parses XLogData envelopes
  • Parses primary keepalive messages
  • Builds standby feedback messages
  • Provides LSN parse/format helpers
  • Yields raw pgoutput payload bytes
  • Includes RBS signatures
  • Includes Minitest coverage
  • No audit, parser, or decoder concerns

Installation

gem "pgoutput-client"

Then:

bundle install

Require:

require "pgoutput-client"

Quick Start

require "pgoutput-client"

client =
  Pgoutput::Client::Runner.new(
    database_url: ENV.fetch("DATABASE_URL"),
    slot_name: "my_slot",
    publication_names: ["my_publication"],
    auto_create_slot: true
  )

client.start do |payload, metadata|
  puts "WAL end: #{metadata.wal_end_lsn}"
  puts "Raw pgoutput payload bytes: #{payload.bytesize}"
end

Using With pgoutput-parser

require "pgoutput-client"
require "pgoutput"

client = Pgoutput::Client::Runner.new(
  database_url: ENV.fetch("DATABASE_URL"),
  slot_name: "my_slot",
  publication_names: ["my_publication"]
)

tracker = Pgoutput::RelationTracker.new

client.start do |payload, metadata|
  message = tracker.process(payload)
  p [metadata.wal_end_lsn, message]
end

Using With pgoutput-decoder

require "pgoutput-client"
require "pgoutput"
require "pgoutput/decoder"

tracker = Pgoutput::RelationTracker.new
decoder = Pgoutput::Decoder.new

client.start do |payload, metadata|
  protocol_message = tracker.process(payload)
  event = decoder.decode(protocol_message)
  p [metadata.wal_end_lsn, event]
end

What This Gem Does

PostgreSQL replication connection
        │
        ▼
CopyData stream
        │
        ▼
XLogData / Keepalive handling
        │
        ▼
Raw pgoutput payloads

It owns:

  • Replication connection setup
  • Replication command generation
  • CopyData reading
  • XLogData envelope parsing
  • Keepalive handling
  • Standby status feedback
  • LSN conversion

What This Gem Does Not Do

It does not:

  • Parse pgoutput row messages
  • Decode PostgreSQL OIDs
  • Build application events
  • Group transactions
  • Run processor pipelines
  • Manage Ractor worker pools
  • Store audit records

Those responsibilities belong to higher layers.


Logical Replication Setup

Example PostgreSQL setup:

ALTER SYSTEM SET wal_level = logical;

CREATE PUBLICATION my_publication FOR TABLE users, posts;

Create a slot automatically:

Pgoutput::Client::Runner.new(
  database_url: ENV.fetch("DATABASE_URL"),
  slot_name: "my_slot",
  publication_names: ["my_publication"],
  auto_create_slot: true
)

Or create the slot yourself:

SELECT * FROM pg_create_logical_replication_slot('my_slot', 'pgoutput');

Public API

Pgoutput::Client::Runner

High-level facade.

client = Pgoutput::Client::Runner.new(...)
client.start { |payload, metadata| ... }

Pgoutput::Client::Configuration

Immutable configuration object.

Pgoutput::Client::Connection

Thin wrapper around PG::Connection for replication commands.

Pgoutput::Client::Stream

Consumes CopyData messages and yields pgoutput payloads.

Pgoutput::Client::LSN

Pgoutput::Client::LSN.parse("0/16B6C50")
Pgoutput::Client::LSN.format(23_817_296)

Pgoutput::Client::XLogData

Represents a WAL data envelope.

Pgoutput::Client::Keepalive

Represents a primary keepalive message.

Pgoutput::Client::Feedback

Builds standby status update payloads.


Ractor Position

The replication connection itself is stateful and ordered. It should normally run as a single reader.

Downstream parsing, decoding, and processing can be parallelized with Ractors:

pgoutput-client reader
        │
        ▼
Ractor-safe queue
        │
        ▼
parser / decoder / processor pools

Rake Tasks

Default

Run them all

bundle exec rake

Code Linting and Formatting

bundle exec rake rubocop

Testing

bundle exec rake test

With coverage:

COVERAGE=true bundle exec rake test

Type Checking

bundle exec rbs:validate

Documentation

bundle exec rake yard

License

MIT.