Project

dwf

0.0
No release in over a year
Workflow
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
 Dependencies

Development

~> 11.1.3
~> 3.2
~> 0.27.2

Runtime

~> 4.2.0
~> 6.2.0
~> 4.0.2
 Project Readme

DWF

Distributed workflow runner following Gush interface using Sidekiq and Redis. This project is for researching DSL purpose

Installation

1. Add dwf to Gemfile

gem 'dwf', '~> 0.1.12'

2. Execute flow example

Declare jobs

require 'dwf'

class FirstItem < Dwf::Item
  def perform
    puts "#{self.class.name}: running"
    puts "#{self.class.name}: finish"
  end
end

class SecondItem < Dwf::Item
  def perform
    puts "#{self.class.name}: running"
    output('Send to ThirdItem')
    puts "#{self.class.name} finish"
  end
end

class ThirdItem < Dwf::Item
  def perform
    puts "#{self.class.name}: running"
    puts "#{self.class.name}: finish"
  end
end

class FourthItem < Dwf::Item
  def perform
    puts "#{self.class.name}: running"
    puts "payloads from incoming: #{payloads.inspect}"
    puts "#{self.class.name}: finish"
  end
end

FifthItem = Class.new(FirstItem)

Declare flow

require 'dwf'

class TestWf < Dwf::Workflow
  def configure
    run FirstItem
    run SecondItem, after: FirstItem
    run ThirdItem, after: FirstItem
    run FourthItem, after: [ThirdItem, SecondItem]
    run FifthItem, after: FourthItem
  end
end

Start background worker process

bundle exec sidekiq -q dwf

Execute flow

wf = TestWf.create
wf.callback_type = Dwf::Workflow::SK_BATCH
wf.start!

Note

dwf supports 2 callback types Dwf::Workflow::BUILD_IN and Dwf::Workflow::SK_BATCH

  • Dwf::Workflow::BUILD_IN is a build-in callback
  • Dwf::Workflow::SK_BATCH is sidekiq batch callback which required sidekiq-pro

By default dwf will use Dwf::Workflow::BUILD_IN callback.

Output

FirstItem: running
FirstItem: finish
SecondItem: running
SecondItem finish
ThirdItem: running
ThirdItem: finish
FourthItem: running
FourthItem: finish
FifthItem: running
FifthItem: finish

Config redis and default queue

dwf uses redis as the key value stograge through redis-rb, So you can pass redis configuration by redis_opts

Dwf.config do |config|
  SENTINELS = [
    { host: "127.0.0.1", port: 26380 },
    { host: "127.0.0.1", port: 26381 }
  ]
  config.redis_opts = { host: 'mymaster', sentinels: SENTINELS, role: :master }
  config.namespace = 'dwf'
end

Advanced features

Pipelining

You can pass jobs result to next nodes

class SendOutput < Dwf::Item
  def perform
    output('it works')
  end
end

output method used to output data from the job to add outgoing jobs

class ReceiveOutput < Dwf::Item
  def perform
    message = payloads.first[:output] # it works
  end
end

payloads is an array that containing outputs from incoming jobs

[
  {
    id: "SendOutput|1849a3f9-5fce-401e-a73a-91fc1048356",
    class: "SendOutput",
    output: 'it works'
  }
]

Sub workflow

There might be a case when you want to reuse a workflow in another workflow

As an example, let's write a workflow which contain another workflow, expected that the SubWorkflow workflow execute after SecondItem and the ThirdItem execute after SubWorkflow

Setup

class FirstItem < Dwf::Item
  def perform
    puts "Main flow: #{self.class.name} running"
    puts "Main flow: #{self.class.name} finish"
  end
end

SecondItem = Class.new(FirstItem)
ThirtItem = Class.new(FirstItem)

class FirstSubItem < Dwf::Item
  def perform
    puts "Sub flow: #{self.class.name} running"
    puts "Sub flow: #{self.class.name} finish"
  end
end

SecondSubItem = Class.new(FirstSubItem)

class SubWorkflow < Dwf::Workflow
  def configure
    run FirstSubItem
    run SecondSubItem, after: FirstSubItem
  end
end


class TestWf < Dwf::Workflow
  def configure
    run FirstItem
    run SecondItem, after: FirstItem
    run SubWorkflow, after: SecondItem
    run ThirtItem, after: SubWorkflow
  end
end

wf = TestWf.create
wf.start!

Result

Main flow: FirstItem running
Main flow: FirstItem finish
Main flow: SecondItem running
Main flow: SecondItem finish
Sub flow: FirstSubItem running
Sub flow: FirstSubItem finish
Sub flow: SecondSubItem running
Sub flow: SecondSubItem finish
Main flow: ThirtItem running
Main flow: ThirtItem finish

Dynamic workflows

There might be a case when you have to contruct the workflow dynamically depend on the input As an example, let's write a workflow which puts from 1 to 100 into the terminal parallelly . Additionally after finish all job, it will puts the finshed word into the terminal

class FirstMainItem < Dwf::Item
  def perform
    puts "#{self.class.name}: running #{params}"
  end
end

SecondMainItem = Class.new(FirstMainItem)

class TestWf < Dwf::Workflow
  def configure
    items = (1..100).to_a.map do |number|
      run FirstMainItem, params: number
    end
    run SecondMainItem, after: items, params: "finished"
  end
end

We can achieve that because run method returns the id of the created job, which we can use for chaining dependencies. Now, when we create the workflow like this:

wf = TestWf.create
# wf.callback_type = Dwf::Workflow::SK_BATCH
wf.start!

Todo

  • Make it work
  • Support pass params
  • Support with build-in callback
  • Add github workflow
  • Redis configurable
  • Pipelining
  • Test
  • Sub workflow
  • CLI
  • Support Resque
  • Key value store plugable

Contributing

  1. Fork it ( http://github.com/dthtien/wf/fork )
  2. Create your feature branch (git checkout -b awsome-feature)
  3. Commit your changes (git commit -am 'Add some awsome feature')
  4. Push to the branch (git push origin awsome-feature)
  5. Create new Pull Request

References