Project

cdc-core

0.0
The project is in a healthy, maintained state
CDC Core provides immutable, Ractor-safe Change Data Capture contracts and domain primitives, including source adapters, change events, processors, routing, ordering policies, and pipelines.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
 Dependencies
 Project Readme

cdc-core

Gem Version CI Ruby Version License: MIT

Shared Change Data Capture vocabulary for Ruby.

cdc-core provides immutable, Ractor-safe event objects and processor contracts for building CDC systems. It intentionally does not connect to databases, parse wire protocols, decode PostgreSQL OIDs, run schedulers, or integrate with Rails.

Requirements

  • Ruby 3.4+

Features

  • SourceAdapter normalization contract
  • Immutable ChangeEvent objects
  • Transaction grouping via TransactionEnvelope
  • Column-level change objects
  • Ordering vocabulary
  • Processor, composite processor, processor chain, and pipeline contracts
  • Event filters
  • Small pipeline orchestration object
  • Router for supported work item shapes
  • Observer hooks and canonical metric names
  • Ractor-safe event and transaction objects
  • RBS signatures
  • YARD-compatible documentation
  • No runtime dependencies

Ecosystem Position

upstream source
      |
      v
source adapter
      |
      v
cdc-core
      |
      +--> cdc-parallel       CPU-bound processing
      |
      +--> cdc-concurrent     I/O-bound processing
      |
      +--> application sinks / processors

cdc-core is the shared vocabulary layer. It defines what a change event, transaction, processor, ordering policy, observer notification, and processor result mean without caring where the event came from or how it will be executed.

Boundary Summary

cdc-core is for vocabulary.

Runtime gems are for execution.

Sinks are for persistence or side effects.

source adapter -> cdc-core vocabulary -> runtime gem -> sink

Source Adapters

CDC::Core::SourceAdapter defines the normalization contract used to translate source-specific payloads into cdc-core vocabulary objects.

It translates source-specific payloads into:

  • CDC::Core::ChangeEvent
  • CDC::Core::TransactionEnvelope
  • batches of core work items

The current PostgreSQL-oriented path is:

pgoutput-client -> pgoutput-parser -> pgoutput-decoder -> source adapter -> cdc-core

The pgoutput* family handles PostgreSQL transport, protocol parsing, and type decoding. The source-adapter boundary is where those source-specific details become generic cdc-core objects.

Other adapters can normalize logs, API payloads, application events, or other database streams into the same vocabulary.

Downstream Runtime Gems

cdc-parallel and cdc-concurrent are downstream consumers of cdc-core events.

cdc-parallel

Use cdc-parallel for heavy CPU-bound processing.

Examples:

  • transformations
  • enrichment
  • encoding
  • compression
  • scoring
  • in-memory calculations

It is the Ractor-oriented runtime path.

cdc-concurrent

Use cdc-concurrent for I/O-heavy processing.

Examples:

  • HTTP calls
  • webhook delivery
  • Redis writes
  • search indexing
  • object storage writes
  • database sink writes

It is the fiber-friendly runtime path.

Installation

gem 'cdc-core'
require 'cdc/core'

Change Events

event = CDC::Core::ChangeEvent.new(
  operation: :update,
  schema: "public",
  table: "users",
  old_values: { "email" => "old@example.com" },
  new_values: { "email" => "new@example.com" },
  primary_key: { "id" => 7 },
  transaction_id: 789,
  commit_lsn: "0/16B6C50"
)

event.update?
# => true

event.qualified_table_name
# => "public.users"

event.changes.map(&:name)
# => ["email"]

Transactions

transaction = CDC::Core::TransactionEnvelope.new(
  transaction_id: 789,
  events: [event],
  commit_lsn: "0/16B6C50",
  committed_at: Time.now.utc
)

A transaction envelope preserves database transaction boundaries. Runtime gems may use that boundary when they need ordering, batching, or parallel execution decisions.

Processors

class AuditProcessor < CDC::Core::Processor
  def process(event)
    puts event.to_h
    CDC::Core::ProcessorResult.success(event)
  end
end

Ractor-safe processor intent

class AnalyticsProcessor < CDC::Core::Processor
  ractor_safe!

  def process(event)
    CDC::Core::ProcessorResult.success(event)
  end
end

AnalyticsProcessor.new.ractor_safe?
# => true

This declares intent only. cdc-core does not execute processors in Ractors. cdc-parallel can use this signal before moving processor work across Ractors.

Downstream Workflow Primitives

cdc-core defines three small workflow primitives. Runtime gems and application-specific integrations can execute these primitives without inventing their own composition vocabulary.

CompositeProcessor

Use CompositeProcessor when many independent processors should receive the same input.

event
  ├─ AuditProcessor
  ├─ AnalyticsProcessor
  └─ WebhookProcessor
processor = CDC::Core::CompositeProcessor.new([
  AuditProcessor.new,
  AnalyticsProcessor.new
])

results = processor.process(event)

Pipeline

Use Pipeline when one processor should run only after filters match.

event
  ↓
filters
  ↓
processor
pipeline = CDC::Core::Pipeline.new(
  processor: AuditProcessor.new,
  filters: [
    CDC::Core::Filter.schema("public"),
    CDC::Core::Filter.table("users")
  ]
)

result = pipeline.process(event)

ProcessorChain

Use ProcessorChain when each processor depends on the previous processor's successful value.

user_ids
  ↓
LoadUsersProcessor
  ↓
users
  ↓
SendNotificationsProcessor
class LoadUsersProcessor < CDC::Core::Processor
  def process(user_ids)
    users = User.where(id: user_ids).to_a
    CDC::Core::ProcessorResult.success(user_ids, value: users)
  end
end

class SendNotificationsProcessor < CDC::Core::Processor
  def process(users)
    users.each { |user| NotificationMailer.notice(user).deliver_later }
    CDC::Core::ProcessorResult.success(users, value: users.size)
  end
end

chain = CDC::Core::ProcessorChain.new([
  LoadUsersProcessor.new,
  SendNotificationsProcessor.new
])

result = chain.process([1, 2, 3])
result.value
# => 3

Non-goals

cdc-core does not:

  • Connect to PostgreSQL
  • Parse pgoutput
  • Decode PostgreSQL values
  • Manage replication slots
  • Implement concrete source adapters
  • Run Ractor pools
  • Run fiber schedulers
  • Persist audit records
  • Integrate with ActiveRecord
  • Publish to Kafka, Redis, HTTP, or other sinks

Documentation

The YARD documentation uses docs/index.md as its readme and includes the Markdown files under docs/.

--title "cdc-core API Documentation"
--readme docs/index.md
--markup markdown
--output-dir doc
lib/**/*.rb
-
docs/**/*.md

Development

bundle exec rake
bundle exec rake rbs:validate
bundle exec yard doc

License

MIT