The project is in a healthy, maintained state
Postgres replication protocol
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
 Dependencies

Runtime

~> 1.0
 Project Readme

PG::Replication

Usage

Add to your Gemfile:

gem "pg-replication-protocol", require: "pg/replication"

Demo

require "pg"
require "pg/replication"

# Important to create a connection with the `replication: "database"` flag
connection = PG.connect(..., replication: "database")

# Create a publication and a slot (in a real use case the slot will not be temporary)
connection.query("CREATE PUBLICATION some_publication FOR ALL TABLES")
connection.query('CREATE_REPLICATION_SLOT some_slot TEMPORARY LOGICAL "pgoutput"')

# Just a storage for our table relation data
tables = {}

# Start a pgoutput plugin replication slot message stream
connection.start_pgoutput_replication_slot(slot, publications).each do |msg|
  case msg
  in PG::Replication::PGOutput::Relation(oid:, name:, columns:)
    # We receive this message on the first row of each table
    tables[oid] = { name:, columns: }

  in PG::Replication::PGOutput::Begin
    puts "Transaction start"

  in PG::Replication::PGOutput::Commit
    puts "Transaction end"

  in PG::Replication::PGOutput::Insert(oid:, new:)
    puts "Insert #{tables[oid][:name]}"
    new.zip(tables[oid][:columns]).each do |tuple, col|
      puts "#{col.name}: #{tuple.data || "NULL"}"
    end

  in PG::Replication::PGOutput::Update(oid:, new:, old:)
    puts "Update #{tables[oid][:name]}"
    if !old.empty? && new != old
      new.zip(old, tables[oid][:columns]).each do |new, old, col|
        if new != old
          puts "Changed #{col.name}: #{old.data || "NULL"} > #{new.data || "NULL"}"
        end
      end
    end

  in PG::Replication::PGOutput::Delete(oid:)
    puts "Delete #{tables[oid][:name]}"

  else
    nil
  end
end