0.0
No release in over 3 years
Read API for the Yes event sourcing framework
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
 Dependencies

Runtime

~> 6.0
>= 7.1
~> 2.6
 Project Readme

Yes

Yes is a framework for building event-sourced systems, originally developed to power Switzerland's leading apprenticeship platform yousty.ch and its younger sibling professional.ch. It is designed to be used within Rails applications and relies on PgEventstore for event storage, which provides a robust PostgreSQL-based event store implementation.

Table of Contents

  • Quick Start
  • Naming Conventions
  • Aggregate DSL
    • attribute
    • command
    • Attribute Details
    • Command Details
    • Guards
    • Read Models
    • Parent Aggregates
    • Primary Context
    • Removable
    • Draftable
  • Authorization
    • Auth Adapter
    • Aggregate Authorization
    • Command Authorization
    • Cerbos Authorization
  • Command API
    • Command API Installation
    • Request Format
    • Command Class Resolution
    • Processing Pipeline
    • Using Commands Without the DSL
    • Real-Time Command Notifications
  • Read API
    • Read API Installation
    • Basic Queries
    • Advanced Queries
    • Filters
    • Read API Authorization
    • Serializers
  • Event Processing
    • Subscriptions
    • Process Managers
  • Configuration Reference
  • Development
    • Example Usage
    • Testing the APIs
    • Running Specs
    • Gem Installation and Release
  • Contributing

Quick Start

Installation

Add this line to your application's Gemfile:

gem 'yes-core'

Then execute:

bundle install

Basic Usage

At the core of Yes is the Yes::Core::Aggregate class, which provides a DSL for defining event-sourced aggregates:

module Users
  module User
    class Aggregate < Yes::Core::Aggregate
      # Define attributes with types
      attribute :name, :string, command: true # this will generate a change_name command
      attribute :email, :email, command: true # this will generate a change_email command

      attribute :company_id, :uuid # this will not generate a command, just an accessor

      # Define custom commands
      command :assign_to_company do
        payload company_id: :uuid # Payload keys need to match attributes

        guard :company_exists do
          CompanyReadModel.exists?(payload.company_id)
        end
      end
    end
  end
end

# Usage
user = Users::User::Aggregate.new
user.change_name("John Doe")
user.assign_to_company(company_id: "123e4567-e89b-12d3-a456-426614174000")

Naming Conventions

When defining an aggregate, use the following namespacing pattern: <Context>::<AggregateName>::Aggregate

For example: Users::User::Aggregate or Companies::Company::Aggregate

Aggregate DSL

attribute

The attribute method defines properties of your aggregate:

module Users
  module User
    class Aggregate < Yes::Core::Aggregate
      # Basic attributes without commands
      attribute :name, :string
      attribute :email, :email

      # Attributes with change commands
      attribute :age, :integer, command: true
      attribute :bio, :string, command: true
    end
  end
end

Important Options

  • command: true - Generates change commands for the attribute (not generated by default). It is not allowed for :aggregate attribute type.

When command: true is specified, Yes automatically creates:

  • Accessor methods (user.name, user.email)
  • Change commands (user.change_age(30))
  • Validation methods (user.can_change_bio?("New bio"))
  • Event classes for recording changes

Without command: true, only the accessor is created.

command

The command method defines custom operations on your aggregate:

module Companies
  module Company
    class Aggregate < Yes::Core::Aggregate
      # Define attributes that will be updated by the command
      attribute :user_ids, :uuids

      command :assign_user do
        # Define payload attributes
        payload user_id: :uuid

        guard :user_not_already_assigned do
          !user_ids.include?(payload.user_id)
        end

        # Custom state update logic
        update_state do
          user_ids { (user_ids || []) + [payload.user_id] }
        end
      end
    end
  end
end

Attribute Details

Attributes are the core properties of your aggregates.

Available Types

The attribute system supports various types:

  • :string - Text values
  • :email - Email addresses with validation
  • :uuid - UUID values
  • :integer - Numeric values
  • :boolean - True/false values
  • :date - Date values
  • :uuids - Arrays of UUIDs

For the complete list, see yes-core/lib/yes/core/type_lookup.rb

Custom Types

You can register application-specific types using the type registry:

# config/initializers/yes_types.rb
Yes::Core::Types.register(:subscription_type, Yes::Core::Types::String.enum('premium', 'basic'))
Yes::Core::Types.register(:team_role, Yes::Core::Types::String.enum('lead', 'member'))
Yes::Core::Types.register(:training_year, Yes::Core::Types::Coercible::Integer.constrained(gteq: 1, lteq: 4))

Registered types can then be used in aggregate definitions:

attribute :role, :team_role, command: true

Attribute Commands

If you specify command: true when defining an attribute, Yes generates:

change_<attribute> Method

Changes the attribute's value through an event:

user.change_age(30)
user.change_bio("Software developer")

You can also pass parameters as a hash:

user.change_age(age: 30)
can_change_<attribute>? Method

Validates a potential change without applying it:

# Valid change
if user.can_change_email?("user@example.com")
  user.change_email("user@example.com")
end

# Invalid change
user.can_change_email?("invalid-email") # => false
user.email_change_error # Contains the error message

Command Details

Commands define operations that can be performed on your aggregate.

Command Configuration Options

Payload

Define the input data for your command:

command :register_apprenticeship do
  payload title: :string,
          start_date: :date,
          location_id: :uuid
end

Make sure the payload keys are all defined as attributes on the aggregate if you don't supply an update_state block.

Optional and Nullable Attributes

You can mark payload attributes as optional (key can be omitted) or nullable (value can be nil) using hash syntax:

command :update_profile do
  # Optional key - attribute can be omitted from payload
  payload phone: { type: :string, optional: true },
          # Nullable value - attribute must be present but can be nil
          max_travel_time: { type: :integer, nullable: true },
          # Both optional key and nullable value
          email: { type: :email, optional: true, nullable: true }
end
  • optional: true - The key can be omitted from the command payload (for commands) or event data (for events)
  • nullable: true - The value can be nil (wraps the type with .maybe for commands, uses .maybe() for events)

Note: For commands, nullable attributes are automatically unwrapped from Dry::Monads::Maybe::Some/None when accessing command.payload to ensure compatibility with event creation.

Guards

Add validation rules with guards:

command :publish do
  guard :all_required_fields_present do
    title.present? && description.present?
  end

  guard :not_already_published do
    !published
  end
end
Custom Event Names

Customize the generated event name:

command :publish do
  event :apprenticeship_published
end

When no custom event name is provided, Yes automatically generates an event name based on the command name. Currently, only standard command prefixes are supported. If you use a command that doesn't start with a supported prefix, you must specify the event name explicitly. For a list of supported prefixes, see lib/yes/core/utils/event_name_resolver.rb.

Encrypting Event Payload Attributes

Yes supports encrypting sensitive data in events. You can mark payload attributes for encryption using three approaches:

1. Inline Encryption Declaration (Recommended for mixed payloads)

command :update_contact_info do
  payload email: { type: :email, encrypt: true },
          phone: { type: :phone, encrypt: true },
          address: :string  # not encrypted
end

2. Separate encrypt Method (Recommended for multiple encrypted fields)

command :update_sensitive_data do
  payload ssn: :string, email: :email, phone: :phone
  encrypt :ssn, :email, :phone
end

3. Command Shortcut with encrypt Option

# For simple attribute commands
command :change, :ssn, :string, encrypt: true

Important Notes:

  • Encryption applies to the event payload stored in the event store, not to the aggregate state or read models
  • Encrypted attributes are tracked in the generated event class via an encryption_schema class method
  • You can combine inline and separate encryption declarations in the same command
  • The encryption key is automatically derived from the aggregate ID
Custom State Updates

Define exactly how state should change:

command :add_tag do
  payload tag: :string

  update_state do
    tags { (tags || []) + [payload.tag] }
  end
end

You can also use the update_state method to update multiple attributes at once:

update_state do
  name { payload.name }
  email { payload.email }
end

Make sure the attributes updated in the update_state block are all defined on the aggregate.

State Update Behavior

Commands update the aggregate state in one of two ways:

1. Automatic State Updates (Without update_state Block)

If you don't define an update_state block, the command will automatically update the aggregate's attributes based on the payload:

module Companies
  module Company
    class Aggregate < Yes::Core::Aggregate
      # Define attributes that match the payload keys
      attribute :name, :string
      attribute :description, :string

      command :update_details do
        # Payload keys must match attribute names
        payload name: :string,
                description: :string
        # No update_state block needed - automatic update
      end
    end
  end
end

company = Companies::Company::Aggregate.new
company.update_details(name: "Acme Inc", description: "Manufacturing company")
# Both name and description attributes will be updated automatically

Important: When not using an update_state block:

  • All payload keys must be defined as attributes on the aggregate
  • The system will validate this and raise an error if there's a mismatch
  • The attribute values will be updated directly from the payload values
2. Custom State Updates (With update_state Block)

When you define an update_state block, you have complete control over how attributes are updated:

module Articles
  module Article
    class Aggregate < Yes::Core::Aggregate
      attribute :title, :string
      attribute :tags, :array
      attribute :status, :string

      command :publish do
        payload title: :string

        update_state do
          # You can reference payload values
          title { payload.title }
          # Or set static values
          status { "published" }
          # Or combine existing data with payload
          tags { (tags || []) + ["published"] }
        end
      end
    end
  end
end

Important: When using an update_state block:

  • Payload keys don't need to match attribute names
  • However, all attributes updated in the block must be defined on the aggregate
  • The system will validate this and raise an error if an undefined attribute is updated
  • You have full control over transformation logic

Generated Command Methods

For each command, Yes generates:

Command Method

Executes the command:

company.assign_user(user_id: "123e4567-e89b-12d3-a456-426614174000")
Can Command Method

Validates if the command would succeed:

if company.can_assign_user?(user_id: "123e4567-e89b-12d3-a456-426614174000")
  company.assign_user(user_id: "123e4567-e89b-12d3-a456-426614174000")
else
  puts company.assign_user_error
end

Command shortcuts

For the most frequently used cases Yes DSL allows to use shortcuts in command definitions.

Change command with attribute
command :change, :age, :integer, localized: true

is expanded to

  attribute :age, :integer, localized: true
  command :change_age do
    payload age: :integer, locale: :locale
    guard(:no_change) { value_changed?(send(attribute_name), payload.send(attribute_name)) }
  end

You can overwrite the default no change guard by providing a custom one:

command :change, :age, :integer do
  payload fantastic_new_age: :integer
  guard(:no_change) { age != payload.fantastic_new_age }
end
Boolean attribute command

:enable and :activate command names are triggering this shortcut.

command :activate, :dropout, attribute: :dropout_enabled

is expanded to

  attribute :dropout_enabled, :boolean
  command :activate_dropout do
    guard(:no_change) { !dropout_enabled }
    update_state { dropout_enabled { true } }
  end
Toggle commands
command [:enable, :disable], :dropout

is expanded to

  attribute :dropout, :boolean
  command :enable_dropout do
    guard(:no_change) { !dropout }
    update_state { dropout { true } }
  end

  command :disable_dropout do
    guard(:no_change) { dropout }
    update_state { dropout { false } }
  end
Publish command
command :publish

is expanded to

  attribute :published, :boolean
  command :publish do
    guard(:no_change) { !published }
    update_state { published { true } }
  end

Guards

Guards are powerful validation mechanisms that enforce business rules by controlling when commands and attribute changes are permitted to execute. They act as gatekeepers that ensure all operations maintain the integrity of your domain logic.

Default Guards

Both commands and attributes automatically include a :no_change guard that ensures the aggregate's state would actually change when applying the command. For commands, this default guard is only active when there is no update_state block present in the command definition.

Adding Guards to Attributes

When defining an attribute with a command, you can add guards to implement validation:

attribute :email, :email, command: true do
  guard :check_email_domain do
    payload.email.end_with?('@example.com')
  end
end

Adding Guards to Commands

Similarly, you can add guards to commands to control when they can execute:

command :publish do
  guard :all_required_fields_present do
    title.present? && description.present?
  end

  guard :not_already_published do
    !published
  end
end

Inside any guard block you can access:

  • payload - The command payload with access to both data and metadata
  • Any aggregate attribute directly by name
Accessing Metadata in Guards

The payload object in guards provides access to command metadata alongside the regular payload data. This metadata can contain useful contextual information like user information, or tracking data.

You can access metadata in two ways:

command :update_status do
  payload status: :string

  guard :valid_response do
    # Method-style access
    payload.metadata.response_id.present?

    # Hash-style access
    payload.metadata[:response_id].present?
  end

  guard :authorized_user do
    # If a metadata key doesn't exist, nil is returned
    payload.metadata.user_role == 'admin' # returns nil if user_role is not in metadata
  end
end

This allows guards to make decisions based on both the command's data payload and any additional contextual metadata that was provided when the command was issued.

Guard Error Types

Guards have two distinct behaviors based on their name:

  • Guards named :no_change trigger a no-change transition error when they fail. This indicates that the operation would not modify the aggregate's state.
  • All other guard names trigger an invalid transition error when they fail. This indicates that the operation is not allowed in the current state.
command :update_profile do
  payload bio: :string

  # Will trigger a no-change transition error if bio hasn't changed
  guard :no_change do
    payload.bio != bio
  end

  # Will trigger an invalid transition error if bio contains prohibited words
  guard :appropriate_content do
    !payload.bio.include?("prohibited content")
  end
end

Custom Error Messages

You can provide custom localized error messages for guards using I18n translation files:

# config/locales/en.yml
en:
  aggregates:
    test: # context
      apprenticeship: # aggregate
        commands:
          change_location: # command
            guards:
              location_published: # guard
                error: "Location is not published"
              company_matches:
                error: "Location company does not match apprenticeship company"

This allows you to define human-readable error messages that can be easily translated to different languages. These messages will be used instead of the default error messages when a guard fails.

Read Models

Each aggregate automatically gets a corresponding read model (ActiveRecord model) that persists its current state. This is how you access attribute values from an aggregate.

user = Users::User::Aggregate.new
user.change_name("Jane Doe")
user.name # => "Jane Doe" (reads from the read model)

Default Naming

By default, the read model's name is derived from the aggregate's context and name:

# For Users::User::Aggregate
# The read model class will be UsersUser
# And the database table will be users_users

Customizing Read Models

You can customize the read model name and visibility using the read_model method:

module Users
  module User
    class Aggregate < Yes::Core::Aggregate
      # Use a custom read model name
      read_model 'custom_user', public: false

      attribute :email, :email, command: true
      attribute :name, :string
    end
  end
end

In this example:

  • The read model class will be CustomUser instead of UsersUser
  • The database table will be custom_users
  • public: false means this read model won't be accessible via the read API

Read Model Schema Generator

When you add or remove aggregates or attributes, you need to update your database schema. Yes provides a Rails generator for this:

rails generate yes:core:read_models:update

This will:

  1. Find all aggregates in your application
  2. Create migration files that update read model tables to match your aggregate definitions
  3. Add, modify, or remove columns as needed

Example generated migration:

class UpdateReadModels < ActiveRecord::Migration[7.1]
  def change
    create_table :users do |t|
      t.string :name
      t.string :email
      t.integer :age
      t.integer :revision, null: false, default: -1
      t.timestamps
    end

    add_column :companies, :name, :string
    remove_column :companies, :old_field
  end
end
Type Mapping

Attribute types are mapped to database column types as follows:

  • :string, :email, :url:string
  • :integer:integer
  • :uuid:uuid
  • :boolean:boolean
  • :hash:jsonb
  • :aggregate:uuid (stored as <attribute_name>_id)

Pending Update Tracking Generator

To ensure read model consistency and enable recovery from failures during event processing, Yes provides a generator that adds pending update tracking to your read models:

rails generate yes:core:read_models:add_pending_update_tracking

This generator creates a migration that:

  1. Adds a pending_update_since column to all read model tables
  2. Creates indexes to efficiently track and recover stale pending updates
  3. Automatically handles PostgreSQL's 63-character index name limit by truncating long names
What It Does

The pending update tracking system helps prevent read models from getting stuck in an inconsistent state by:

  • Marking read models as "pending" before event publication
  • Clearing the pending state after successful updates
  • Allowing automatic recovery of stale pending states (default timeout: 5 minutes)
Generated Migration Example
class AddPendingUpdateTrackingToReadModels < ActiveRecord::Migration[7.1]
  def up
    read_model_tables = Yes::Core.configuration.all_read_model_table_names

    read_model_tables.each do |table_name|
      next unless ActiveRecord::Base.connection.table_exists?(table_name)

      add_column table_name, :pending_update_since, :datetime

      # Unique index to prevent concurrent updates to same aggregate
      add_index table_name, :id,
                unique: true,
                where: 'pending_update_since IS NOT NULL',
                name: truncate_index_name("idx_#{table_name}_one_pending_per_aggregate")

      # Index for efficient recovery queries
      add_index table_name, :pending_update_since,
                where: 'pending_update_since IS NOT NULL',
                name: truncate_index_name("idx_#{table_name}_pending_recovery")
    end
  end
end
Recovery Job

You can schedule a background job to automatically recover stale pending updates:

# app/jobs/read_model_recovery_job.rb
class ReadModelRecoveryJob < ApplicationJob
  def perform
    Yes::Core::Jobs::ReadModelRecoveryJob.new.perform
  end
end

# Schedule it to run periodically (e.g., every 5 minutes)
# In your scheduler (whenever, sidekiq-cron, etc.):
ReadModelRecoveryJob.perform_later
Manual Recovery

You can also manually trigger recovery for specific read models:

# Recover a specific read model instance
read_model = UserReadModel.find(id)
Yes::Core::CommandHandling::ReadModelRecoveryService.recover(read_model)

# Recover all stale pending updates (older than 5 minutes by default)
Yes::Core::CommandHandling::ReadModelRecoveryService.recover_all_stale

Parent Aggregates

Link aggregates in a hierarchy:

module Companies
  module Location
    class Aggregate < Yes::Core::Aggregate
      parent :company

      attribute :name, :string, command: true
      attribute :address, :string, command: true
    end
  end
end

The parent method defines an assign command with its attribute by default. For the above example it will be assign_company with company_id attribute.

command option

Set parent command option to false to skip defining assign command:

parent :company, command: false

Primary Context

Specify the main context:

module Users
  module User
    class Aggregate < Yes::Core::Aggregate
      primary_context :users

      attribute :name, :string, command: true
    end
  end
end

Removable

Define a default removal behavior for an aggregate:

module Users
  module User
    class Aggregate < Yes::Core::Aggregate
      removable
    end
  end
end

It defines a remove command which works with the removed_at attribute by default and applies a default removal behavior.

The removable method accepts a custom name for an attribute which will also be used for the removal behavior. You can see an example below.

module Users
  module User
    class Aggregate < Yes::Core::Aggregate
      removable(attr_name: :deleted_at)
    end
  end
end

You can also define additional guards or custom behavior:

module Users
  module User
    class Aggregate < Yes::Core::Aggregate
      removable do
        guard(:published) { published? }
      end
    end
  end
end

Draftable

The draftable feature allows aggregates to be created and modified in a draft state before being published. This is useful when you want to prepare changes without immediately making them live.

module Articles
  module Article
    class Aggregate < Yes::Core::Aggregate
      # Makes aggregate draftable by connecting it to a draft aggregate for managing the draft state.
      # The draft aggregate has to exist already. The default draft aggregate is <CurrentAggregateContext>::<CurrentAggregateName>Draft.
      # Also configures a changes read model (defaults to "<read_model>_change")
      draftable

      # Draftable with custom parameters
      # draftable draft_aggregate: { context: 'ArticleDrafts', aggregate: 'ArticleDraft' }, changes_read_model: :article_change

      attribute :title, :string, command: true
      attribute :content, :string, command: true
    end
  end
end

Method Parameters

The draftable method accepts two optional parameters:

  • draft_aggregate: A hash containing the draft aggregate configuration
    • context: The context name for the draft version (defaults to the same context as the main aggregate)
    • aggregate: The aggregate name for the draft version (defaults to the main aggregate name with "Draft" suffix)
  • changes_read_model: The name for the changes read model (defaults to the main read model name with "_change" appended)

Example Usage

# Use all defaults
draftable

# Custom context only
draftable draft_aggregate: { context: 'DraftContext' }

# Custom aggregate name only
draftable draft_aggregate: { aggregate: 'MyDraft' }

# Both context and aggregate
draftable draft_aggregate: { context: 'DraftContext', aggregate: 'MyDraft' }

# Custom changes read model only
draftable changes_read_model: :custom_changes

# All custom parameters
draftable draft_aggregate: { context: 'DraftContext', aggregate: 'MyDraft' }, changes_read_model: :my_changes

When changes_read_model is not specified, it defaults to using the main read model name with "_change" appended (e.g., if the read model is "article", the changes read model becomes "article_change").

Authorization

Auth Adapter

Both the Command API and Read API delegate authentication to a configurable adapter. Configure it in an initializer:

# config/initializers/yes.rb
Yes::Core.configure do |config|
  config.auth_adapter = MyAuthAdapter.new
end

The adapter must implement three methods:

Method Purpose Called by
authenticate(request) Verify the JWT token and return an auth data hash. Raise a Yes::Core::AuthenticationError subclass on failure. Both API controllers (before every request)
verify_token(token) Decode a raw JWT token string. Return an object responding to .token that returns [decoded_payload_hash]. MessageBus user identification
error_classes Return an array of exception classes that represent authentication failures. Command API controller (to rescue and render 401)

How It Works

  1. On every request, the controller calls adapter.authenticate(request).
  2. The returned hash is stored as auth_data and passed to command authorizers, read request authorizers, and read model authorizers throughout the request lifecycle.
  3. The hash must include at minimum an :identity_id key, which is used for command metadata, MessageBus channel defaults, and authorization.

Example Implementation

class MyAuthAdapter
  AuthError = Class.new(Yes::Core::AuthenticationError)

  # @param request [ActionDispatch::Request]
  # @raise [AuthError] if the token is missing or invalid
  # @return [Hash] auth data passed to authorizers as auth_data
  def authenticate(request)
    token = request.headers['Authorization']&.delete_prefix('Bearer ')
    raise AuthError, 'Token missing' unless token

    payload = JWT.decode(token, public_key, true, algorithm: 'RS256').first
    { identity_id: payload['sub'], host: request.host }.merge(payload.symbolize_keys)
  end

  # @param token [String] raw JWT token (extracted from Authorization header)
  # @return [OpenStruct] object with .token returning [decoded_payload_hash]
  def verify_token(token)
    decoded = JWT.decode(token, public_key, true, algorithm: 'RS256')
    OpenStruct.new(token: decoded)
  end

  # @return [Array<Class>] exception classes the controller rescues as 401
  def error_classes
    [AuthError, JWT::DecodeError]
  end

  private

  def public_key
    OpenSSL::PKey::RSA.new(ENV.fetch('JWT_PUBLIC_KEY'))
  end
end

Aggregate Authorization

To make aggregates available via the command API, you must define an authorization scheme at the aggregate level. This controls who can execute commands on the aggregate.

Simple Authorization

The simplest authorization simply allows all commands to be executed:

module Users
  module User
    class Aggregate < Yes::Core::Aggregate
      # Allow all commands
      authorize do
        true
      end

      attribute :name, :string, command: true
    end
  end
end

Inside the authorize block, you can access:

  • command - The command being executed
  • auth_data - The decoded data from the JWT authentication token

This allows for custom authorization logic:

authorize do
  # Only allow commands if the authenticated identity matches the user
  command.user_id == auth_data[:identity_id]
end

Command Authorization

Commands can define per-command authorization that extends or overrides the aggregate-level authorizer.

# First define an aggregate level authorizer
class Aggregate < Yes::Core::Aggregate
  authorize do
    # Base level authorization logic
    auth_data[:identity_id].present?
  end

  # Then add command-specific refinements
  command :publish do
    payload user_id: :uuid

    # Command-specific authorization logic
    authorize do
      # Has access to the command and auth_data
      command.user_id == auth_data[:user_id]
    end
  end
end

When an aggregate has declared authorize at the class level, commands can define their own authorization logic that inherits from the aggregate-level authorizer. Each command with an authorize block automatically receives its own Authorizer subclass that inherits from the aggregate-level authorizer.

Command authorizers are registered in the configuration and can be retrieved with:

Yes::Core.configuration.aggregate_class('Context', 'Aggregate', :publish, :authorizer)

Cerbos Authorization

For more complex authorization needs, Yes integrates with Cerbos, a powerful authorization engine:

module Users
  module User
    class Aggregate < Yes::Core::Aggregate
      authorize cerbos: true

      attribute :name, :string, command: true
    end
  end
end

When using Cerbos, you can specify additional parameters:

  • read_model_class - The class used to load the read model for authorization checks (defaults to the aggregate's read model)
  • resource_name - The resource name used in Cerbos policies (defaults to the underscored aggregate name)
module Companies
  module CompanySettings
    class Aggregate < Yes::Core::Aggregate
      # Custom read model and resource name
      authorize cerbos: true,
                read_model_class: CustomCompanySettings,
                resource_name: 'company_settings'

      attribute :name, :string, command: true
    end
  end
end

When using custom read models with Cerbos, the model must implement an auth_attributes method that returns a hash of attributes for authorization:

class CustomCompanySettings < ApplicationRecord
  def auth_attributes
    { company_id: company_id || '' }
  end
end

These attributes are passed to Cerbos for making authorization decisions based on your policies.

Customizing Cerbos Integration

For advanced use cases, you can customize how Yes interacts with Cerbos by overriding the resource_attributes and cerbos_payload methods in your authorization block. Currently, this customization is only available within command-level authorization blocks, not at the aggregate level:

module Universe
  module Star
    class Aggregate < Yes::Core::Aggregate
      # Base aggregate-level Cerbos authorization
      authorize cerbos: true

      attribute :name, :string, command: true

      # Command with customized Cerbos integration
      command :update_details do
        payload details: :string

        # Command-level authorization with custom Cerbos integration
        authorize do
          # Override resource attributes sent to Cerbos
          resource_attributes { { owner_id: 'test-user-id' } }

          # Override the entire Cerbos payload
          cerbos_payload { { principal: auth_data, resource_id: 'test-id' } }
        end
      end
    end
  end
end

Inside the resource_attributes block, you can access:

  • command - The command being executed
  • resource - The read model instance for the aggregate

Inside the cerbos_payload block, you can access:

  • command - The command being executed
  • resource - The read model instance for the aggregate
  • auth_data - The decoded data from the JWT authentication token

These blocks allow you to precisely control what data is sent to Cerbos for authorization decisions on a per-command basis.

Command API

The Command API (yes-command-api) provides an HTTP endpoint for executing commands as JSON batches. It is a standalone Rails engine that does not depend on the aggregate DSL — it works with any command class that follows one of the supported naming conventions.

Command API Installation

Add the gem and mount the engine:

# Gemfile
gem 'yes-command-api'
# config/routes.rb
mount Yes::Command::Api::Engine => '/v1/commands'

Request Format

Send a POST request with a JSON body containing a commands array:

{
  "commands": [
    {
      "context": "Users",
      "subject": "User",
      "command": "ChangeName",
      "data": {
        "user_id": "47330036-7246-40b4-a3c7-7038df508774",
        "name": "Jane Doe"
      },
      "metadata": {}
    }
  ],
  "channel": "my-notifications"
}

Each command requires context, subject, command, and data. The optional channel parameter controls which MessageBus channel receives notifications (defaults to the authenticated user's identity_id).

Set async=true or async=false as a query parameter to override the default processing mode (Yes::Core.configuration.process_commands_inline).

Command Class Resolution

The deserializer resolves command classes by trying three naming conventions in order:

Priority Convention Class pattern Typical use
1 Command Group CommandGroups::<Command>::Command Composed commands
2 V2 <Context>::<Subject>::Commands::<Command>::Command DSL-generated commands
3 V1 <Context>::Commands::<Subject>::<Command> Manually created commands

The first matching constant wins. This means you can use the API with DSL-generated commands, manually created commands, or both.

Processing Pipeline

When a request arrives, it passes through these stages:

  1. Authentication — the auth adapter verifies the JWT token
  2. Params validation — checks that each command hash contains context, subject, command, and data
  3. Deserialization — resolves class names and instantiates command objects
  4. Expansion — flattens command groups into individual commands
  5. Authorization — each command's authorizer is looked up and called with auth_data
  6. Validation — optional per-command validators are called
  7. Command bus — commands are dispatched (inline or via ActiveJob)

Using Commands Without the DSL

You can create command classes manually and use them with the Command API. A complete command requires four parts: a command, a handler, an event, and an authorizer. The file structure follows a convention:

app/contexts/
  billing/
    invoice/
      commands/
        authorizer.rb            # shared base authorizer (optional)
        create/
          command.rb             # command definition
          handler.rb             # command handler
          authorizer.rb          # per-command authorizer
      events/
        created.rb               # event definition

Command

Defines the payload attributes and identifies the aggregate:

# app/contexts/billing/invoice/commands/create/command.rb
module Billing
  module Invoice
    module Commands
      module Create
        class Command < Yes::Core::Command
          attribute :invoice_id, Yes::Core::Types::UUID
          attribute :amount, Yes::Core::Types::Integer
          attribute :currency, Yes::Core::Types::String

          alias aggregate_id invoice_id
        end
      end
    end
  end
end

Handler

Processes the command and publishes the event. The handler inherits from Yes::Core::Commands::Stateless::Handler and declares which event to emit:

# app/contexts/billing/invoice/commands/create/handler.rb
module Billing
  module Invoice
    module Commands
      module Create
        class Handler < Yes::Core::Commands::Stateless::Handler
          self.event_name = 'Created'

          def call
            # Add guard logic here, e.g.:
            # no_change_transition('Already exists') if already_exists?

            super # publishes the event
          end
        end
      end
    end
  end
end

Event

Defines the event schema for validation when writing to the event store:

# app/contexts/billing/invoice/events/created.rb
module Billing
  module Invoice
    module Events
      class Created < Yes::Core::Event
        def schema
          Dry::Schema.Params do
            required(:invoice_id).value(Yes::Core::Types::UUID)
            required(:amount).value(:integer)
            required(:currency).value(:string)
          end
        end
      end
    end
  end
end

Authorizer

Controls who can execute the command. You can define a shared base authorizer for the aggregate and inherit from it:

# app/contexts/billing/invoice/commands/authorizer.rb
module Billing
  module Invoice
    module Commands
      class Authorizer < Yes::Core::Authorization::CommandAuthorizer
        def self.call(_command, auth_data)
          raise CommandNotAuthorized, 'Not allowed' unless auth_data[:identity_id].present?
        end
      end
    end
  end
end

# app/contexts/billing/invoice/commands/create/authorizer.rb
module Billing
  module Invoice
    module Commands
      module Create
        class Authorizer < Billing::Invoice::Commands::Authorizer
          # Inherits base authorization; add command-specific checks here
        end
      end
    end
  end
end

This command can then be executed via the API:

{
  "context": "Billing",
  "subject": "Invoice",
  "command": "Create",
  "data": {
    "invoice_id": "550e8400-e29b-41d4-a716-446655440000",
    "amount": 10000,
    "currency": "CHF"
  }
}

Real-Time Command Notifications

For performance and reliability, WebSocket-based notifications are the preferred way to inform frontends about command execution status. The Command API ships with two built-in notifiers and supports custom implementations.

Notifiers are configured globally and broadcast three event types per command batch:

Event When Payload includes
batch_started Before processing begins batch_id, commands list
Per-command response After each command completes Command result or error
batch_finished After all commands complete batch_id, failed commands (if any)

Configuration

Register one or more notifier classes in the initializer:

Yes::Core.configure do |config|
  config.command_notifier_classes = [
    Yes::Command::Api::Commands::Notifiers::ActionCable,
    Yes::Command::Api::Commands::Notifiers::MessageBus
  ]
end

The channel parameter from the API request (or the authenticated user's identity_id as fallback) is passed to each notifier, so clients only receive notifications for their own commands.

ActionCable Notifier

Broadcasts notifications via ActionCable.server.broadcast. This is well suited for use with a dedicated WebSocket gateway service that connects to the same Redis backend:

config.command_notifier_classes = [Yes::Command::Api::Commands::Notifiers::ActionCable]

The frontend subscribes to the channel and receives JSON messages:

{ "type": "batch_started", "batch_id": "abc-123", "published_at": 1711540800, "commands": [...] }
{ "type": "batch_finished", "batch_id": "abc-123", "published_at": 1711540801, "failed_commands": [] }

MessageBus Notifier

Uses the MessageBus gem for long-polling or WebSocket delivery. Messages are scoped to the authenticated user via user_ids:

config.command_notifier_classes = [Yes::Command::Api::Commands::Notifiers::MessageBus]

The auth adapter's verify_token method is used by MessageBus to identify subscribers by their identity_id.

Custom Notifiers

You can implement your own notifier by subclassing Yes::Core::Commands::Notifier:

class SlackNotifier < Yes::Core::Commands::Notifier
  def notify_batch_started(batch_id, transaction = nil, commands = nil)
    # ...
  end

  def notify_batch_finished(batch_id, transaction = nil, responses = nil)
    # ...
  end

  def notify_command_response(cmd_response)
    # ...
  end
end

Read API

The Read API (yes-read-api) provides an HTTP endpoint for querying read models with filtering, pagination, and authorization. Like the Command API, it is a standalone Rails engine that does not depend on the aggregate DSL — it works with any ActiveRecord model that has a matching serializer.

Read API Installation

Add the gem and mount the engine:

# Gemfile
gem 'yes-read-api'
# config/routes.rb
mount Yes::Read::Api::Engine => '/queries'

Basic Queries

Send a GET request with the read model name as the path and optional query parameters:

GET /queries/users?filters[ids]=1,2,3&order[name]=asc&page[number]=1&page[size]=20&include=company
  • filters[<key>] — filter by attribute (handled by the model's filter class)
  • order[<key>] — sort direction (asc or desc)
  • page[number] and page[size] — pagination
  • include — comma-separated list of associations to include in the response

Advanced Queries

Send a POST request for complex filtering with AND/OR logic:

{
  "model": "users",
  "filter_definition": {
    "type": "filter_set",
    "logical_operator": "and",
    "filters": [
      {
        "type": "filter",
        "attribute": "name",
        "operator": "is",
        "value": "Jane"
      },
      {
        "type": "filter",
        "attribute": "status",
        "operator": "is_not",
        "value": "archived"
      }
    ]
  },
  "order": { "name": "asc" },
  "page": { "number": 1, "size": 20 }
}

Filters

Filters are optional per-model classes that define available filter scopes. If no custom filter exists, the base Yes::Core::ReadModel::Filter is used.

module ReadModels
  module User
    class Filter < Yes::Core::ReadModel::Filter
      has_scope :name do |_controller, scope, value|
        scope.where(name: value)
      end

      has_scope :ids do |_controller, scope, value|
        scope.where(id: value.split(','))
      end

      private

      def read_model_class
        ::UserReadModel
      end
    end
  end
end

Read API Authorization

The Read API enforces two levels of authorization:

  1. Request authorizer — controls whether a user can query a given model at all. Looked up as ReadModels::<Model>::RequestAuthorizer.
module ReadModels
  module User
    class RequestAuthorizer
      def self.call(filter_options, auth_data)
        unless auth_data[:identity_id].present?
          raise Yes::Core::Authorization::ReadRequestAuthorizer::NotAuthorized, 'Not allowed'
        end
      end
    end
  end
end
  1. Read model authorizer — filters returned records based on what the user can access. Configured via Yes::Core::Authorization::ReadModelsAuthorizer.

Serializers

Each read model requires a serializer class following the convention ReadModels::<Model>::Serializers::<Model>. The serializer receives auth_data and filter options, allowing it to customize the response based on the authenticated user.

Event Processing

Subscriptions

Yes wraps PgEventstore subscriptions for processing events in real-time.

Setting Up Subscriptions

# lib/tasks/eventstore.rb
subscriptions = Yes::Core::Subscriptions.new

subscriptions.subscribe_to_all(
  MyReadModel::Builder.new,
  { event_types: ['MyContext::SomethingHappened', 'MyContext::SomethingElseHappened'] }
)

subscriptions.start

Start subscriptions via the PgEventstore CLI:

bundle exec pg-eventstore subscriptions start -r ./lib/tasks/eventstore.rb

Heartbeat

Configure a heartbeat URL for monitoring subscription health:

Yes::Core.configure do |config|
  config.subscriptions_heartbeat_url = ENV['SUBSCRIPTIONS_HEARTBEAT_URL']
  config.subscriptions_heartbeat_interval = 30 # seconds
end

Process Managers

Process managers coordinate commands across services via HTTP.

ServiceClient

Sends commands to another service's command API:

client = Yes::Core::ProcessManagers::ServiceClient.new('media')
# Resolves to MEDIA_SERVICE_URL env var or http://media-cluster-ip-service:3000

client.call(access_token: token, commands_data: [...], channel: '/notifications')

CommandRunner

Base class for process managers that publish commands to external services:

class MyProcessManager < Yes::Core::ProcessManagers::CommandRunner
  def call(event)
    publish(
      client_id: ENV['MY_CLIENT_ID'],
      client_secret: ENV['MY_CLIENT_SECRET'],
      commands_data: build_commands(event)
    )
  end
end

State

Reconstructs entity state from events for use in process managers:

class UserState < Yes::Core::ProcessManagers::State
  RELEVANT_EVENTS = ['Auth::UserCreated', 'Auth::UserNameChanged'].freeze

  attr_reader :name

  private

  def stream
    PgEventstore::Stream.new(context: 'Auth', stream_name: 'User', stream_id: @id)
  end

  def required_attributes
    [:name]
  end

  def apply_user_name_changed(event)
    @name = event.data['name']
  end
end

state = UserState.load(user_id)
state.valid? # true if all required_attributes are present

Configuration Reference

Yes::Core.configure do |config|
  # Command processing
  config.process_commands_inline = true          # Process commands synchronously (default: true)
  config.command_notifier_classes = []            # Array of notifier classes for command batch notifications

  # Authentication
  config.auth_adapter = nil                      # Auth adapter instance (required for command/read APIs)

  # Cerbos Authorization
  config.cerbos_url = ENV['CERBOS_URL']          # Cerbos server URL (default from env var)
  config.cerbos_principal_data_builder = -> {}    # Lambda to build Cerbos principal data for commands
  config.cerbos_read_principal_data_builder = nil # Lambda for read requests (falls back to above)
  config.cerbos_commands_authorizer_include_metadata = false
  config.cerbos_read_authorizer_include_metadata = false
  config.cerbos_read_authorizer_actions = %w[read]
  config.cerbos_read_authorizer_resource_id_prefix = 'read-'
  config.cerbos_read_authorizer_principal_anonymous_id = 'anonymous'
  config.super_admin_check = ->(_auth_data) { false }

  # Subscriptions
  config.subscriptions_heartbeat_url = nil       # URL to ping for subscription health monitoring
  config.subscriptions_heartbeat_interval = 30   # Heartbeat interval in seconds

  # Observability
  config.otl_tracer = nil                        # OpenTelemetry tracer instance
  config.logger = Rails.logger                   # Logger instance

  # Error reporting
  config.error_reporter = nil                    # Callable for error reporting (e.g. Sentry)
end

Development

After checking out the repo, run bin/setup to install dependencies.

Start PG EventStore using Docker:

docker compose up

Setup databases:

./bin/setup_db

Enter a development console (from a gem's spec/dummy directory):

bundle exec rails c

Example Usage

user = Test::User::Aggregate.new
user.change_name(name: "John Doe")
user.name # => "John Doe"
TestUser.last.name # => "John Doe"

Testing the APIs

The dummy app includes mounted command and read APIs for testing. Start the server from one of the gem dummy apps:

cd yes-core/spec/dummy
bundle exec rails s

Authentication

The dummy app uses a simple Base64-encoded auth adapter for development. Generate a token:

require 'base64'
user_id = "47330036-7246-40b4-a3c7-7038df508774"
token = Base64.strict_encode64({ identity_id: user_id, user_id: user_id }.to_json)

Or from the command line:

TOKEN=$(echo -n '{"identity_id":"47330036-7246-40b4-a3c7-7038df508774","user_id":"47330036-7246-40b4-a3c7-7038df508774"}' | base64)

Testing Command API

Execute a command with curl:

curl --location 'http://127.0.0.1:3000/commands' \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer $TOKEN" \
--data '{
  "commands": [{
    "subject": "User",
    "context": "Test",
    "command": "ChangeName",
    "data": {
      "user_id": "47330036-7246-40b4-a3c7-7038df508774",
      "name": "Judydoody Doodle"
    }
  }],
  "channel": "test-notifications"
}'

Testing Read API

Query the read models:

curl --location 'http://127.0.0.1:3000/queries/test_users' \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer $TOKEN"

Running Specs

Each gem has its own test suite that runs in isolation with its own bundle context.

Run specs for a single gem:

rake yes_core:spec
rake yes_command_api:spec
rake yes_read_api:spec

Run specs for all gems:

rake spec

You can also run specs directly from within a gem directory:

cd yes-core && bundle exec rspec spec

Gem Installation and Release

Install the gem locally:

bundle exec rake install

Release a new version:

  1. Update the version in version.rb
  2. Run:
bundle exec rake release

This creates a git tag, pushes commits and tags, and pushes the gem to rubygems.org.

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/yousty/yes. See CONTRIBUTING.md for development setup and guidelines.

Changelog

See CHANGELOG.md for a list of changes.

License

The gem is available as open source under the terms of the MIT License.