Project

direktor

0.0
The project is in a healthy, maintained state
Define and execute DAGs (Directed Acyclic Graphs) with a clean DSL, backed by event sourcing for comprehensive forensics
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
 Dependencies

Development

Runtime

>= 6.1
~> 3.0
>= 6.1
 Project Readme

direktor

Gem Version

Event-sourced DAG (Directed Acyclic Graph) orchestration for Rails with ActiveJob.

Define complex workflows with a clean DSL, execute tasks in parallel via ActiveJob (Sidekiq, Resque, etc.), and get comprehensive forensics through event sourcing.

Features

  • Clean DSL - Define workflows with simple, intuitive syntax
  • Event Sourcing - Append-only event logs for complete audit trails
  • ActiveJob Integration - Works with any ActiveJob backend (Sidekiq, Resque, etc.)
  • Parallel Execution - Independent tasks run concurrently
  • DAG Validation - Automatic cycle detection
  • Retry Support - Full retry lineage tracking
  • Comprehensive Forensics - Track every state transition with metadata
  • Database-backed - Persistent state with projections for fast queries

Installation

Add to your Gemfile:

gem 'direktor'

Then run:

bundle install
rails generate direktor:install
rails db:migrate

Quick Start

1. Define a Workflow

Create a workflow class (e.g., app/workflows/data_pipeline_workflow.rb):

class DataPipelineWorkflow < Direktor::Workflow
  task :extract_data do |params, task_run|
    # Extract logic
    data = API.fetch(params[:date])
    { records: data.count }
  end

  task :transform_data, depends_on: :extract_data do |params, task_run|
    # Transform logic
    records = Database.transform(params[:date])
    { transformed: records.count }
  end

  task :load_to_warehouse, depends_on: :transform_data do |params, task_run|
    # Load logic
    Warehouse.load(params[:date])
  end

  task :generate_reports, depends_on: :load_to_warehouse do |params, task_run|
    # Generate reports
    ReportGenerator.run(params[:date])
  end
end

2. Execute the Workflow

workflow = DataPipelineWorkflow.create(params: { date: Date.today })
workflow.execute!

3. Monitor Execution

# Check workflow status
workflow.workflow_run.status  # => "running", "completed", "failed"

# View task statuses
workflow.workflow_run.task_runs.pluck(:task_name, :status)
# => [["extract_data", "completed"], ["transform_data", "running"], ...]

# Get full event timeline
workflow.workflow_run.full_event_timeline.each do |event|
  puts "#{event.source}: #{event.event_type} at #{event.created_at}"
end

Advanced Usage

Using ActiveJob Classes

Instead of blocks, you can specify ActiveJob classes:

class DataPipelineWorkflow < Direktor::Workflow
  task :extract_data, job_class: "ExtractDataJob"
  task :transform_data, depends_on: :extract_data, job_class: "TransformDataJob"
  task :load_to_warehouse, depends_on: :transform_data, job_class: "LoadDataJob"
end

class ExtractDataJob < ApplicationJob
  def perform(params, task_run)
    # Your extraction logic
  end
end

Complex Dependencies

Tasks can depend on multiple other tasks:

class ComplexWorkflow < Direktor::Workflow
  task :task_a
  task :task_b
  task :task_c, depends_on: [:task_a, :task_b]  # Runs after both A and B complete
  task :task_d, depends_on: :task_c
end

Async Webhooks (External Service Integration)

Tasks can wait for external services to complete via webhooks:

class VideoProcessingWorkflow < Direktor::Workflow
  task :validate, job_class: "ValidateVideoJob"
  task :submit_processing, depends_on: :validate, job_class: "SubmitProcessingJob"
  task :save_results, depends_on: :submit_processing, job_class: "SaveResultsJob"
end

class SubmitProcessingJob < ApplicationJob
  def perform(params, task_run)
    result = external_service.submit(
      input: params[:video_url],
      webhook_url: "https://myapp.com#{task_run.webhook_url}"
    )

    # Return :waiting to pause this task until webhook completes it
    :waiting
  end
end

# Webhook controller
class Webhooks::ProcessingController < ApplicationController
  include Direktor::WebhookSupport

  def complete
    handle_task_webhook do |payload|
      # Transform service payload to direktor format
      { result: payload[:data] }
    end
  end
end

# Routes
post '/webhooks/direktor/tasks/:id/complete', to: 'webhooks/processing#complete'

When a task returns :waiting, it enters the "waiting" status until your webhook calls Executor#on_webhook_complete. The workflow automatically continues when the webhook completes the task.

See examples/async_webhook_workflow.rb for a complete example.

Retry Failed Workflows

workflow = DataPipelineWorkflow.create(params: { date: Date.today })
workflow.execute!

# If it fails, retry with a new workflow run
if workflow.workflow_run.failed?
  retried_workflow = workflow.retry!
  # Creates new WorkflowRun with parent_correlation_id set
end

Forensics Queries

# Find all failed workflows today
Direktor::WorkflowRunEvent.failed_today

# Get retry chain for a workflow
workflow_run.retry_chain  # => [parent_workflow_run, grandparent_workflow_run, ...]

# Find tasks that retry frequently
Direktor::TaskRunEvent.frequent_retries(threshold: 3)

# Get failure reasons
Direktor::TaskRunEvent.failure_reasons
# => { "ActiveRecord::RecordNotFound" => 5, "Timeout::Error" => 3 }

# Find slow tasks
Direktor::TaskRunEvent.slow_tasks(duration_threshold_seconds: 300)

# Full event timeline with correlation_id
events = Direktor::WorkflowRunEvent.where(correlation_id: 'abc-123')
         .or(Direktor::TaskRunEvent.where(correlation_id: 'abc-123'))
         .order(:created_at)

Architecture

Event Sourcing

direktor uses event sourcing for comprehensive audit trails:

  • Events - Append-only logs (workflow_run_events, task_run_events)
  • Projections - Current state snapshots (workflow_runs, task_runs)

Every state transition is recorded as an immutable event:

Workflow Events:

  • started - Workflow execution began
  • completed - All tasks completed successfully
  • failed - At least one task failed

Task Events:

  • enqueued - Task job was queued
  • started - Task execution began
  • completed - Task finished successfully
  • failed - Task encountered an error
  • retrying - Task is being retried

Correlation ID

Every workflow execution gets a unique correlation_id (UUID) that:

  • Links all events across workflow and tasks
  • Enables distributed tracing
  • Makes retry lineage queryable via parent_correlation_id

Database Schema

workflow_runs (projection)
├── id, correlation_id, parent_correlation_id
├── workflow_class, status, params, metadata
└── timestamps

task_runs (projection)
├── id, workflow_run_id, correlation_id
├── task_name, status, depends_on, result, error
└── timestamps

workflow_run_events (append-only)
├── id, workflow_run_id, correlation_id
├── event_type, metadata
└── created_at

task_run_events (append-only)
├── id, task_run_id, workflow_run_id, correlation_id
├── event_type, metadata
└── created_at

How It Works

  1. Define - Use DSL to define tasks and dependencies
  2. Validate - DAG validator checks for cycles
  3. Create - WorkflowRun and TaskRun records created
  4. Execute - Executor enqueues ready tasks as ActiveJobs
  5. Monitor - Events recorded at each state transition
  6. Complete - Executor checks for completion after each task

Parallel Execution: Tasks with satisfied dependencies run concurrently via ActiveJob. For example:

task :a
task :b
task :c, depends_on: [:a, :b]

Tasks a and b run in parallel, then c runs after both complete.

Testing

Run tests:

bundle exec rspec

Configuration

direktor uses your existing ActiveJob configuration:

# config/application.rb
config.active_job.queue_adapter = :sidekiq

Contributing

Bug reports and pull requests are welcome on GitHub.

License

The gem is available as open source under the terms of the Apache License 2.0.