DWF
Distributed workflow runner following Gush interface using Sidekiq and Redis. This project is for researching DSL purpose
Installation
1. Add dwf to Gemfile
gem 'dwf', '~> 0.1.12'2. Execute flow example
Declare jobs
require 'dwf'
class FirstItem < Dwf::Item
def perform
puts "#{self.class.name}: running"
puts "#{self.class.name}: finish"
end
end
class SecondItem < Dwf::Item
def perform
puts "#{self.class.name}: running"
output('Send to ThirdItem')
puts "#{self.class.name} finish"
end
end
class ThirdItem < Dwf::Item
def perform
puts "#{self.class.name}: running"
puts "#{self.class.name}: finish"
end
end
class FourthItem < Dwf::Item
def perform
puts "#{self.class.name}: running"
puts "payloads from incoming: #{payloads.inspect}"
puts "#{self.class.name}: finish"
end
end
FifthItem = Class.new(FirstItem)Declare flow
require 'dwf'
class TestWf < Dwf::Workflow
def configure
run FirstItem
run SecondItem, after: FirstItem
run ThirdItem, after: FirstItem
run FourthItem, after: [ThirdItem, SecondItem]
run FifthItem, after: FourthItem
end
endStart background worker process
bundle exec sidekiq -q dwf
Execute flow
wf = TestWf.create
wf.callback_type = Dwf::Workflow::SK_BATCH
wf.start!Note
dwf supports 2 callback types Dwf::Workflow::BUILD_IN and Dwf::Workflow::SK_BATCH
-
Dwf::Workflow::BUILD_INis a build-in callback -
Dwf::Workflow::SK_BATCHis sidekiq batch callback which requiredsidekiq-pro
By default dwf will use Dwf::Workflow::BUILD_IN callback.
Output
FirstItem: running
FirstItem: finish
SecondItem: running
SecondItem finish
ThirdItem: running
ThirdItem: finish
FourthItem: running
FourthItem: finish
FifthItem: running
FifthItem: finish
Config redis and default queue
dwf uses redis as the key value stograge through redis-rb, So you can pass redis configuration by redis_opts
Dwf.config do |config|
SENTINELS = [
{ host: "127.0.0.1", port: 26380 },
{ host: "127.0.0.1", port: 26381 }
]
config.redis_opts = { host: 'mymaster', sentinels: SENTINELS, role: :master }
config.namespace = 'dwf'
endAdvanced features
Pipelining
You can pass jobs result to next nodes
class SendOutput < Dwf::Item
def perform
output('it works')
end
endoutput method used to output data from the job to add outgoing jobs
class ReceiveOutput < Dwf::Item
def perform
message = payloads.first[:output] # it works
end
endpayloads is an array that containing outputs from incoming jobs
[
{
id: "SendOutput|1849a3f9-5fce-401e-a73a-91fc1048356",
class: "SendOutput",
output: 'it works'
}
]Sub workflow
There might be a case when you want to reuse a workflow in another workflow
As an example, let's write a workflow which contain another workflow, expected that the SubWorkflow workflow execute after SecondItem and the ThirdItem execute after SubWorkflow
Setup
class FirstItem < Dwf::Item
def perform
puts "Main flow: #{self.class.name} running"
puts "Main flow: #{self.class.name} finish"
end
end
SecondItem = Class.new(FirstItem)
ThirtItem = Class.new(FirstItem)
class FirstSubItem < Dwf::Item
def perform
puts "Sub flow: #{self.class.name} running"
puts "Sub flow: #{self.class.name} finish"
end
end
SecondSubItem = Class.new(FirstSubItem)
class SubWorkflow < Dwf::Workflow
def configure
run FirstSubItem
run SecondSubItem, after: FirstSubItem
end
end
class TestWf < Dwf::Workflow
def configure
run FirstItem
run SecondItem, after: FirstItem
run SubWorkflow, after: SecondItem
run ThirtItem, after: SubWorkflow
end
end
wf = TestWf.create
wf.start!Result
Main flow: FirstItem running
Main flow: FirstItem finish
Main flow: SecondItem running
Main flow: SecondItem finish
Sub flow: FirstSubItem running
Sub flow: FirstSubItem finish
Sub flow: SecondSubItem running
Sub flow: SecondSubItem finish
Main flow: ThirtItem running
Main flow: ThirtItem finish
Dynamic workflows
There might be a case when you have to contruct the workflow dynamically depend on the input As an example, let's write a workflow which puts from 1 to 100 into the terminal parallelly . Additionally after finish all job, it will puts the finshed word into the terminal
class FirstMainItem < Dwf::Item
def perform
puts "#{self.class.name}: running #{params}"
end
end
SecondMainItem = Class.new(FirstMainItem)
class TestWf < Dwf::Workflow
def configure
items = (1..100).to_a.map do |number|
run FirstMainItem, params: number
end
run SecondMainItem, after: items, params: "finished"
end
endWe can achieve that because run method returns the id of the created job, which we can use for chaining dependencies. Now, when we create the workflow like this:
wf = TestWf.create
# wf.callback_type = Dwf::Workflow::SK_BATCH
wf.start!Todo
- Make it work
- Support pass params
- Support with build-in callback
- Add github workflow
- Redis configurable
- Pipelining
- Test
- Sub workflow
- CLI
- Support Resque
- Key value store plugable
- research https://github.com/moneta-rb/moneta
Contributing
- Fork it ( http://github.com/dthtien/wf/fork )
- Create your feature branch (git checkout -b awsome-feature)
- Commit your changes (git commit -am 'Add some awsome feature')
- Push to the branch (git push origin awsome-feature)
- Create new Pull Request