task_tempest¶ ↑
Framework for building threaded asynchronous job processors.
Description¶ ↑
task_tempest basically lets you build glorified loops, reading messages from a queue and dispatching them to classes to handle. In short, it’s just another background job processor for Ruby.
task_tempest is based on the idea of a thread pool; each background job is executed on a thread. This can achieve high throughput, but your code will need to be threadsafe.
task_tempest can also run in fibered mode in conjunction with EventMachine. This can achieve high(er?) throughput, but your code will need to be fiber aware.
task_tempest is queue agnostic; you can use whatever queue you like, as long as it supports two operations: enqueue, dequeue.
task_tempest is used in production at Onespot and processes over 1 million jobs a day.
Quickstart¶ ↑
Run the code below and both MathTempest.log and MathTempest.task.log will be created in your current working directory. To stop the tempest, send the process an Interrupt or SIGTERM.
require "task_tempest" class MemoryQueue def initialize; @array = []; end def enqueue(message); @array.push(message); end def dequeue; @array.shift; end end class MathTempest < TaskTempest::Engine configure do threads 2 queue MemoryQueue.new end end class Adder extend TaskTempest::Task def process(a, b) c = a + b task_logger.info "#{a} + #{b} = #{c}" end end 10.times do a, b = rand(10), rand(10) MathTempest.submit(Adder, a, b) end MathTempest.run
Queue¶ ↑
The queue object given in the TaskTempest::Engine’s configure block must define 2 methods: enqueue and dequeue.
task_tempest doesn’t care what kind of queue you use or what you actually store in it, as long as dequeue returns a message as described here. You are responsible for any serialization and deserialization that may be required.
A message is an array with the following format:
[task_id, task_class_name, arg1, arg2, ...]
An example message is:
[nil, "Adder", 2, 5]
The queue’s enqueue method will receive a message as its first argument, plus any additional arguments given to submit.
The queue’s dequeue method must return a message.
If the task_id of a message is nil, then task_tempest will automatically assign an id.
Tasks¶ ↑
Messages are dispatched to a task class to handle. A task class must extend TaskTempest::Task and define a process method. The process method will be called with the arguments from the message being dispatched.
class Averager
extend TaskTempest::Task
configure_task do
...
end
def process(*args)
avg = args.inject(0){ |memo, n| memo += n; memo } / args.length
task_logger.info "The average of #{args.inspect} is #{avg}"
end
end
MathTempest.submit(Averager, 2, 4)
# Will produce a message like ["9ac4f", "Averager", 2, 4]
# And eventually Averager.process(2, 4) will be called.
MathTempest.submit(Averager, 2, 4, 6)
# Will produce a message like ["b733c", "Averager", 2, 4, 6]
# And eventually Averager.process(2, 4, 6) will be called.
See TaskTempest::Task::Configuration for what options can be set in a task’s configure block.
Logging¶ ↑
task_tempest logs to a main log and a task log.
The main log shows a high level view of all the tasks that are run (when they started, when the finished, if they failed, if they timed out, etc).
The task log shows detailed information about each task. Each line in the task log is prefixed with a task id. This is so that if you see a task failed in the main log, you can grep for its id in the task log.
Any logging done in a task using task_logger will be written to the task log.
Callbacks¶ ↑
Each task class can define callbacks for success, failure, and timeout of a task.
class Averager
extend TaskTempest::Task
configure_task do
timeout 1.5
after_failure proc { |exception|
HoptoadNotifier.notify(exception)
}
after_timeout proc {
task_logger.warn "Crap, I took longer than 1.5 seconds"
}
end
...
end
See TaskTempest::Task::Configuration for the names of each callback and arguments yielded.
EventMachine + Fibers¶ ↑
The fibers option puts task_tempest into fibered mode and also defines the size of the fiber pool.
class FiberedTempest << TaskTempest::Engine
configure do
fibers 5
...
end
...
end
Now when FiberedTempest.run is called, The EventMachine reactor will be started and each task will be dispatched on a fiber.
If you run in fibered mode, you need to install fiber_storm.
Daemonizing¶ ↑
There is no code in task_tempest to run as a daemon, that is left to you. It’s easy with the Daemons gem though.
Assuming your tempest is defined in my_tempest.rb, just put the following code at the bottom of the file.
if $0 == __FILE__ require "daemons" Daemons.run_proc(MyTempest.conf.name, :log_output => true) do MyTempest.run end end
Now you can run it as a daemon from the command line.
ruby my_tempest.rb start ruby my_tempest.rb stop ruby my_tempest.rb run # Run in foreground
See the Daemons rdoc for more info.
Rails¶ ↑
When task_tempest is used as an asynchronous task processor for a Rails app, you probably want to load the Rails environment so you can have access to models, configuration, etc.
You can load the Rails environment however you like, but if you choose to load it in one of task_tempest‘s initialization callbacks, then there are some caveats to be aware of.
class MathTempest < TaskTempest::Engine configure do root{ Rails.root } after_initialize proc { require "config/environment" # Load Rails. } end end
That won’t work and will result in an exception saying Rails isn’t defined, because the root configuration option is used during the initialization process before the after_initialized callback is called. To fix the error, change after_initialized to before_initialize.
Complete examples¶ ↑
See the examples directory.
Copyright¶ ↑
Copyright © 2010 Christopher J. Bottaro. See LICENSE for details.