The project is in a healthy, maintained state
pgoutput-source-adapter provides source adapters that normalize decoded PostgreSQL pgoutput events into downstream event models. The gem currently includes a CDC::Core adapter that transforms pgoutput decoder events into ChangeEvent and TransactionEnvelope primitives while preserving transaction and metadata context. This package forms the normalization boundary between the pgoutput family of gems and downstream change-event platforms.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
 Dependencies

Runtime

 Project Readme

pgoutput-source-adapter

Gem Version CI Ruby Version License: MIT

pgoutput-source-adapter adapts decoded pgoutput events into downstream change-event platform primitives.

The first supported target is the CDC Ecosystem:

Pgoutput::SourceAdapter::Cdc

It normalizes Pgoutput::Decoder::Events into CDC::Core::ChangeEvent and CDC::Core::TransactionEnvelope objects.

Boundary

The pgoutput family remains standalone:

pgoutput-client   -> PostgreSQL logical replication transport
pgoutput-parser   -> pgoutput protocol messages
pgoutput-decoder  -> typed Ruby row-change events

This gem is the adapter layer:

Pgoutput::Decoder::Events
        |
        v
Pgoutput::SourceAdapter::Cdc
        |
        v
CDC::Core::ChangeEvent / TransactionEnvelope

That keeps the lower-level pgoutput gems usable outside the CDC Ecosystem while still providing a clean bridge into cdc-core for users building CDC platforms.

Installation

gem "pgoutput-source-adapter"
require "pgoutput/source_adapter"

The generated bundle gem require path also works:

require "pgoutput/source/adapter"

Usage

Normalize a decoded insert event:

adapter = Pgoutput::SourceAdapter::Cdc.new
change_event = adapter.normalize(decoded_insert)

change_event.operation
# => :insert

change_event.schema
change_event.table
change_event.new_values

Normalize a transaction-shaped batch:

results = adapter.normalize_many([
  decoded_begin,
  decoded_insert,
  decoded_update,
  decoded_commit
])

envelope = results.first
# => CDC::Core::TransactionEnvelope

Primary keys

For update and delete events, pgoutput may provide an old-key tuple. When it does, that tuple is used as the CDC::Core::ChangeEvent#primary_key.

For insert events, or for sources without old-key tuples, the adapter defaults to id / "id" when present.

You can provide your own resolver:

adapter = Pgoutput::SourceAdapter::Cdc.new(
  primary_key_resolver: ->(_event, values) { { "uuid" => values.fetch("uuid") } }
)

Metadata

Each normalized event includes pgoutput metadata:

{
  "source" => "pgoutput",
  "relation_id" => 123,
  "pgoutput_event" => "Insert"
}

Additional metadata can be injected:

adapter = Pgoutput::SourceAdapter::Cdc.new(
  metadata_builder: ->(_event) { { pipeline: "default" } }
)

Public namespace

Pgoutput::SourceAdapter
Pgoutput::SourceAdapter::Cdc

A compatibility alias is also provided for the generated gem path:

Pgoutput::Source::Adapter

Non-goals

This gem does not:

  • connect to PostgreSQL
  • parse pgoutput protocol messages
  • decode PostgreSQL values
  • run processors
  • manage replication slots
  • persist sink data

Those responsibilities belong to pgoutput-client, pgoutput-parser, pgoutput-decoder, runtime gems, or application code.

Development

bundle exec rake

License

MIT.