Low commit activity in last 3 years
Thread-safe in-memory event store for CQRS patterns. Supports named streams, event appending, stream reading, subscriber notifications, and state projections.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
 Dependencies
 Project Readme

philiprehberger-event_store

Tests Gem Version Last updated

In-memory event store with streams, projections, subscriptions, snapshots, and replay

Requirements

  • Ruby >= 3.1

Installation

Add to your Gemfile:

gem "philiprehberger-event_store"

Or install directly:

gem install philiprehberger-event_store

Usage

require "philiprehberger/event_store"

store = Philiprehberger::EventStore.new

store.append(:orders, { type: 'OrderPlaced', id: 1, total: 99.99 })
store.append(:orders, { type: 'OrderShipped', id: 1 })

store.read(:orders)
# => [{ type: 'OrderPlaced', ... }, { type: 'OrderShipped', ... }]

Subscriptions

store.subscribe(:orders) do |event|
  puts "New order event: #{event[:type]}"
end

store.append(:orders, { type: 'OrderCancelled', id: 2 })
# prints: New order event: OrderCancelled

Projections

total = store.project(:orders, initial: 0) do |sum, event|
  event[:type] == 'OrderPlaced' ? sum + event[:total] : sum
end
# => 99.99

Event Querying

# Filter by stream
store.query(stream: :orders)

# Filter by event type
store.query(type: 'OrderPlaced')

# Combine filters with time range and limit
store.query(stream: :orders, after: 1.hour.ago, limit: 10)

Snapshots

Save aggregate state at a point in time, then rebuild from the snapshot plus newer events:

# Build state from events
state = store.project(:orders, initial: { count: 0, total: 0 }) do |s, e|
  { count: s[:count] + 1, total: s[:total] + (e[:total] || 0) }
end

# Save a snapshot
store.snapshot(:orders, state)

# Later, after more events have been appended:
store.append(:orders, { type: 'OrderPlaced', total: 50 })

# Rebuild from snapshot + new events (avoids replaying entire history)
rebuilt = store.load_from_snapshot(:orders) do |s, e|
  { count: s[:count] + 1, total: s[:total] + (e[:total] || 0) }
end

Replay

Re-emit past events to current subscribers:

# Replay all events in a stream
store.replay(:orders)

# Replay from a specific version (0-based index)
store.replay(:orders, from_version: 5)

# Replay all events across all streams
store.replay_all

# Replay from a global position
store.replay_all(from_position: 100)

Clearing streams

Remove events (and snapshots) while keeping subscribers registered:

received = []
store.subscribe(:orders) { |e| received << e }
store.append(:orders, { type: 'OrderPlaced' })

# Clear a single stream — subscribers stay attached
store.clear(:orders)
store.append(:orders, { type: 'OrderPlaced' })
# subscriber still fires for the new event

# Clear everything — streams and snapshots wiped, subscribers retained,
# global position reset to zero
store.clear

Reading All Events

store.read_all        # => all events across all streams, ordered by position
store.streams         # => ['orders', ...]
store.version(:orders) # => 5 (event count in stream)
store.total_events    # => 8 (sum of events across all streams)
store.last(:orders)   # => last appended event, or nil if empty/missing

API

Method Description
.new Create a new event store
#append(stream, event) Append an event to a stream
#read(stream) Read all events from a stream
#last(stream) Return the most recently appended event, or nil for an empty/missing stream
#read_all Read all events across all streams
#query(stream:, type:, after:, before:, limit:) Query events with filters
#subscribe(stream) { |e| } Subscribe to new events on a stream
#project(stream, initial:) { |state, e| } Project events into accumulated state
#snapshot(stream, state) Save aggregate state at current stream version
#load_from_snapshot(stream, initial:) { |state, e| } Rebuild state from snapshot + newer events
#replay(stream, from_version:) Replay stream events to subscribers
#replay_all(from_position:) Replay all events across streams to subscribers
#version(stream) Return event count for a stream
#total_events Total number of events across all streams
#streams List all stream names
#clear(stream = nil) Remove events and snapshot for a stream, or everything when no stream is passed (subscribers retained)

Development

bundle install
bundle exec rspec
bundle exec rubocop

Support

If you find this project useful:

Star the repo

🐛 Report issues

💡 Suggest features

❤️ Sponsor development

🌐 All Open Source Projects

💻 GitHub Profile

🔗 LinkedIn Profile

License

MIT