The project is in a healthy, maintained state
Postgres replication protocol
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
 Dependencies

Runtime

~> 1.0
 Project Readme

PG::Replication

Adds support to pg for listening to replication slots

Usage

Add to your Gemfile:

gem "pg-replication-protocol", require: "pg/replication"

Demo

require "pg"
require "pg/replication"

# It is important to create a connection with the `replication: "database"` option
connection = PG.connect(..., replication: "database")

# Create a publication and a slot (in a real use case the slot will not be temporary)
connection.query("CREATE PUBLICATION some_publication FOR ALL TABLES")
connection.query('CREATE_REPLICATION_SLOT some_slot TEMPORARY LOGICAL "pgoutput"')

# Just a storage for our table relation data
tables = {}

# Start a pgoutput plugin replication slot message stream
# The `messages: true` option is required to be able to decode `PG::Replication::PGOutput::Message`
connection.start_pgoutput_replication_slot(slot, publications, messages: true).each do |msg|
  case msg
  in PG::Replication::Protocol::XLogData(data: PG::Replication::PGOutput::Relation(oid:, name:, columns:))
    # This message is received on the first row of each table, or when there are schema changes
    tables[oid] = { name:, columns: }

  in PG::Replication::Protocol::XLogData(data: PG::Replication::PGOutput::Begin)
    puts "Transaction start"

  in PG::Replication::Protocol::XLogData(data: PG::Replication::PGOutput::Commit)
    puts "Transaction end"

  in PG::Replication::Protocol::XLogData(data: PG::Replication::PGOutput::Insert(oid:, new:))
    puts "Insert #{tables[oid][:name]}"
    new.zip(tables[oid][:columns]).each do |tuple, col|
      puts "#{col.name}: #{tuple.data || "NULL"}"
    end

  in PG::Replication::Protocol::XLogData(data: PG::Replication::PGOutput::Update(oid:, new:, old:))
    puts "Update #{tables[oid][:name]}"
    if !old.empty? && new != old
      new.zip(old, tables[oid][:columns]).each do |new, old, col|
        if new != old
          puts "Changed #{col.name}: #{old.data || "NULL"} > #{new.data || "NULL"}"
        end
      end
    end

  in PG::Replication::Protocol::XLogData(data: PG::Replication::PGOutput::Delete(oid:))
    puts "Delete #{tables[oid][:name]}"

  in PG::Replication::Protocol::XLogData(data: PG::Replication::PGOutput::Message(prefix:, content:))
    puts "Message #{prefix}: #{content}"

  else
    nil
  end
end