direktor
Event-sourced DAG (Directed Acyclic Graph) orchestration for Rails with ActiveJob.
Define complex workflows with a clean DSL, execute tasks in parallel via ActiveJob (Sidekiq, Resque, etc.), and get comprehensive forensics through event sourcing.
Features
- Clean DSL - Define workflows with simple, intuitive syntax
- Event Sourcing - Append-only event logs for complete audit trails
- ActiveJob Integration - Works with any ActiveJob backend (Sidekiq, Resque, etc.)
- Parallel Execution - Independent tasks run concurrently
- DAG Validation - Automatic cycle detection
- Retry Support - Full retry lineage tracking
- Comprehensive Forensics - Track every state transition with metadata
- Database-backed - Persistent state with projections for fast queries
Installation
Add to your Gemfile:
gem 'direktor'Then run:
bundle install
rails generate direktor:install
rails db:migrateQuick Start
1. Define a Workflow
Create a workflow class (e.g., app/workflows/data_pipeline_workflow.rb):
class DataPipelineWorkflow < Direktor::Workflow
task :extract_data do |params, task_run|
# Extract logic
data = API.fetch(params[:date])
{ records: data.count }
end
task :transform_data, depends_on: :extract_data do |params, task_run|
# Transform logic
records = Database.transform(params[:date])
{ transformed: records.count }
end
task :load_to_warehouse, depends_on: :transform_data do |params, task_run|
# Load logic
Warehouse.load(params[:date])
end
task :generate_reports, depends_on: :load_to_warehouse do |params, task_run|
# Generate reports
ReportGenerator.run(params[:date])
end
end2. Execute the Workflow
workflow = DataPipelineWorkflow.create(params: { date: Date.today })
workflow.execute!3. Monitor Execution
# Check workflow status
workflow.workflow_run.status # => "running", "completed", "failed"
# View task statuses
workflow.workflow_run.task_runs.pluck(:task_name, :status)
# => [["extract_data", "completed"], ["transform_data", "running"], ...]
# Get full event timeline
workflow.workflow_run.full_event_timeline.each do |event|
puts "#{event.source}: #{event.event_type} at #{event.created_at}"
endAdvanced Usage
Using ActiveJob Classes
Instead of blocks, you can specify ActiveJob classes:
class DataPipelineWorkflow < Direktor::Workflow
task :extract_data, job_class: "ExtractDataJob"
task :transform_data, depends_on: :extract_data, job_class: "TransformDataJob"
task :load_to_warehouse, depends_on: :transform_data, job_class: "LoadDataJob"
end
class ExtractDataJob < ApplicationJob
def perform(params, task_run)
# Your extraction logic
end
endComplex Dependencies
Tasks can depend on multiple other tasks:
class ComplexWorkflow < Direktor::Workflow
task :task_a
task :task_b
task :task_c, depends_on: [:task_a, :task_b] # Runs after both A and B complete
task :task_d, depends_on: :task_c
endAsync Webhooks (External Service Integration)
Tasks can wait for external services to complete via webhooks:
class VideoProcessingWorkflow < Direktor::Workflow
task :validate, job_class: "ValidateVideoJob"
task :submit_processing, depends_on: :validate, job_class: "SubmitProcessingJob"
task :save_results, depends_on: :submit_processing, job_class: "SaveResultsJob"
end
class SubmitProcessingJob < ApplicationJob
def perform(params, task_run)
result = external_service.submit(
input: params[:video_url],
webhook_url: "https://myapp.com#{task_run.webhook_url}"
)
# Return :waiting to pause this task until webhook completes it
:waiting
end
end
# Webhook controller
class Webhooks::ProcessingController < ApplicationController
include Direktor::WebhookSupport
def complete
handle_task_webhook do |payload|
# Transform service payload to direktor format
{ result: payload[:data] }
end
end
end
# Routes
post '/webhooks/direktor/tasks/:id/complete', to: 'webhooks/processing#complete'When a task returns :waiting, it enters the "waiting" status until your webhook calls Executor#on_webhook_complete. The workflow automatically continues when the webhook completes the task.
See examples/async_webhook_workflow.rb for a complete example.
Retry Failed Workflows
workflow = DataPipelineWorkflow.create(params: { date: Date.today })
workflow.execute!
# If it fails, retry with a new workflow run
if workflow.workflow_run.failed?
retried_workflow = workflow.retry!
# Creates new WorkflowRun with parent_correlation_id set
endForensics Queries
# Find all failed workflows today
Direktor::WorkflowRunEvent.failed_today
# Get retry chain for a workflow
workflow_run.retry_chain # => [parent_workflow_run, grandparent_workflow_run, ...]
# Find tasks that retry frequently
Direktor::TaskRunEvent.frequent_retries(threshold: 3)
# Get failure reasons
Direktor::TaskRunEvent.failure_reasons
# => { "ActiveRecord::RecordNotFound" => 5, "Timeout::Error" => 3 }
# Find slow tasks
Direktor::TaskRunEvent.slow_tasks(duration_threshold_seconds: 300)
# Full event timeline with correlation_id
events = Direktor::WorkflowRunEvent.where(correlation_id: 'abc-123')
.or(Direktor::TaskRunEvent.where(correlation_id: 'abc-123'))
.order(:created_at)Architecture
Event Sourcing
direktor uses event sourcing for comprehensive audit trails:
-
Events - Append-only logs (
workflow_run_events,task_run_events) -
Projections - Current state snapshots (
workflow_runs,task_runs)
Every state transition is recorded as an immutable event:
Workflow Events:
-
started- Workflow execution began -
completed- All tasks completed successfully -
failed- At least one task failed
Task Events:
-
enqueued- Task job was queued -
started- Task execution began -
completed- Task finished successfully -
failed- Task encountered an error -
retrying- Task is being retried
Correlation ID
Every workflow execution gets a unique correlation_id (UUID) that:
- Links all events across workflow and tasks
- Enables distributed tracing
- Makes retry lineage queryable via
parent_correlation_id
Database Schema
workflow_runs (projection)
├── id, correlation_id, parent_correlation_id
├── workflow_class, status, params, metadata
└── timestamps
task_runs (projection)
├── id, workflow_run_id, correlation_id
├── task_name, status, depends_on, result, error
└── timestamps
workflow_run_events (append-only)
├── id, workflow_run_id, correlation_id
├── event_type, metadata
└── created_at
task_run_events (append-only)
├── id, task_run_id, workflow_run_id, correlation_id
├── event_type, metadata
└── created_at
How It Works
- Define - Use DSL to define tasks and dependencies
- Validate - DAG validator checks for cycles
-
Create -
WorkflowRunandTaskRunrecords created - Execute - Executor enqueues ready tasks as ActiveJobs
- Monitor - Events recorded at each state transition
- Complete - Executor checks for completion after each task
Parallel Execution: Tasks with satisfied dependencies run concurrently via ActiveJob. For example:
task :a
task :b
task :c, depends_on: [:a, :b]Tasks a and b run in parallel, then c runs after both complete.
Testing
Run tests:
bundle exec rspecConfiguration
direktor uses your existing ActiveJob configuration:
# config/application.rb
config.active_job.queue_adapter = :sidekiqContributing
Bug reports and pull requests are welcome on GitHub.
License
The gem is available as open source under the terms of the Apache License 2.0.