0.0
No release in over 3 years
Framework for building business operations as composable pipelines. Chain steps that short-circuit on failure, collect validation errors, wrap in database transactions with callbacks, and scope nested data structures.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
 Dependencies
 Project Readme

OmniService Framework

Composable business operations with railway-oriented programming.

Quick Start

class Posts::Create
  extend OmniService::Convenience

  option :post_repo, default: -> { PostRepository.new }

  def self.system
    @system ||= chain(
      input,
      transaction(create, on_success: [notify])
    )
  end

  def self.input
    @input ||= fanout(
      params { required(:title).filled(:string) },
      FindOne.new(:author, repository: AuthorRepository.new, with: :author_id)
    )
  end

  def self.create
    ->(params, author:, **) { post_repo.create(params.merge(author:)) }
  end
end

result = Posts::Create.system.call({ title: 'Hello', author_id: 1 })
result.success?  # => true
result.context   # => { author: <Author>, post: <Post> }

Core Concepts

Components

Any callable returning Success(context_hash) or Failure(errors). Include Dry::Monads[:result] (via OmniService::Convenience) or use Dry::Monads::Success/Failure explicitly.

# Lambda
->(params, **ctx) { Success(post: Post.new(params)) }

# Class with #call
class ValidateTitle
  def call(params, **)
    params[:title].present? ? Success({}) : Failure([{ code: :blank, path: [:title] }])
  end
end

Result

Structured output with: context, params, errors, on_success, on_failure.

result.success?  # no errors?
result.failure?  # has errors?
result.context   # { post: <Post>, author: <Author> }
result.errors    # [#<Error code=:blank path=[:title]>]
result.to_monad  # Success(result) or Failure(result)

Signature and Params Flow

Each component has a signature [param_count, has_context] that determines how many positional params it consumes:

  • param_count: nil (all params) or >= 0 (specific count)
  • has_context: whether it accepts **context

Replacement semantics: When components are composed, consumed params are replaced by produced params. Unconsumed params are preserved.

# Component signatures:
# validate_params: [1, true]  - consumes 1 param
# enrich_data:     [1, true]  - consumes 1 param
# finalize:        [nil, true] - consumes all params

chain(validate_params, enrich_data, finalize)
# Input: [p1, p2, p3]

# validate_params consumes [p1], produces [x]
# => [x, p2, p3]

# enrich_data consumes [x], produces [y, z] (fan-out)
# => [y, z, p2, p3]

# finalize consumes all [y, z, p2, p3], produces [result]
# => [result]

If a component produces nothing (empty params), consumed params are kept in place:

# context_validator has signature [0, true] - consumes 0 params
chain(context_validator, process_data)
# Input: [p1, p2]

# context_validator consumes [], produces []
# => [p1, p2]  (unchanged)

# process_data receives [p1, p2]

For split, each component gets a disjoint slice based on its signature. With a single input param, components with signature 1 receive the same param (fan-out); signature 0 receives none. Components requiring more than one param receive whatever is available and may raise on strict arity.

# A: [1, true], B: [2, true]
split(A, B)
# Input: [p1, p2, p3, p4]

# A consumes [p1], produces [x]
# B consumes [p2, p3], produces [y]
# => [x, y, p4]  (replacements + unconsumed)

With a single input param:

# A: [1, true], B: [1, true]
split(A, B)
# Input: [p1]

# A consumes [p1], produces [x]
# B consumes [p1], produces [y]
# => [x, y]

Composition

chain

Runs components in order. Short-circuits on first failure.

chain(
  validate_params,  # Failure stops here
  find_author,      # Adds :author to context
  create_post       # Receives :author
)

fanout

Runs all components with shared input, collects all errors.

fanout(
  validate_title,  # => Failure([{ path: [:title], code: :blank }])
  validate_body    # => Failure([{ path: [:body], code: :too_short }])
)
# => Result with both errors collected

split

Distributes params across components, fails fast on first failure.

split(
  parse_title,
  parse_body,
  parse_author
)

transaction

Wraps in DB transaction with callbacks.

transaction(
  chain(validate, create),
  on_success: [send_email, update_cache],  # After commit
  on_failure: [log_error]                   # After rollback
)

namespace

Scopes params/context under a key.

# params: { post: { title: 'Hi' }, author: { name: 'John' } }
fanout(
  namespace(:post, validate_post),
  namespace(:author, validate_author)
)
# Errors: [:post, :title], [:author, :name]

context

Validates caller-provided context using Dry::Types. Invalid context returns a failure Result by default; use context! to raise.

chain(
  context(current_user: Types::Instance(User), request_id: Types::String),
  create_post
)

collection

Iterates over arrays.

# params: { comments: [{ body: 'A' }, { body: '' }] }
collection(validate_comment, namespace: :comments)
# Errors: [:comments, 1, :body]

optional

Swallows failures.

chain(
  create_user,
  optional(fetch_avatar),  # Failure won't stop pipeline
  send_email
)

shortcut

Early exit on success.

chain(
  shortcut(find_existing),  # Found? Exit early
  create_new                 # Not found? Create
)

either

Tries components in order, returns first success.

either(
  cache_lookup,      # Try cache first
  database_lookup,   # Fallback to database
  compute_fresh      # Last resort: compute
)

With transaction isolation for rollback on failure:

transaction(
  either(
    transaction(risky_operation),  # If fails, rolled back
    safe_fallback                  # Runs clean
  )
)

Entity Lookup

FindOne

FindOne.new(:post, repository: repo)
# params: { post_id: 1 } => Success(post: <Post>)

# Options
FindOne.new(:post, repository: repo, with: :slug)           # Custom param key
FindOne.new(:post, repository: repo, by: [:author_id, :slug])  # Multi-column
FindOne.new(:post, repository: repo, nullable: true)        # Allow nil
FindOne.new(:post, repository: repo, omittable: true)       # Allow missing key
FindOne.new(:post, repository: repo, skippable: true)       # Skip not found

# Polymorphic
FindOne.new(:item, repository: { 'Post' => post_repo, 'Article' => article_repo })
# params: { item_id: 1, item_type: 'Post' }

FindMany

FindMany.new(:posts, repository: repo)
# params: { post_ids: [1, 2, 3] } => Success(posts: [...])

# Nested IDs
FindMany.new(:products, repository: repo, by: { id: [:items, :product_id] })
# params: { items: [{ product_id: 1 }, { product_id: [2, 3] }] }

Error Format

Failure(:not_found)
# => Error(code: :not_found, path: [])

Failure([{ code: :blank, path: [:title] }])
# => Error(code: :blank, path: [:title])

Failure([{ code: :too_short, path: [:body], tokens: { min: 100 } }])
# => Error with interpolation tokens for i18n

Async Execution

class Posts::Create
  extend OmniService::Convenience
  extend OmniService::Async::Convenience[queue: 'default']

  def self.system
    @system ||= chain(...)
  end
end

Posts::Create.system_async.call(params, **context)
# => Success(job: #<OperationJob:...>)

Posts::Create.system_async.set(wait: 5.minutes, queue: 'low').call(params, **context)

Strict Mode

operation.call!(params)  # Raises OmniService::OperationFailed on failure

# Or via convenience
Posts::Create.system!.call(params)