ParallelWorkforce
Easily parallelize functionality by partitioning work in either a Sidekiq (preferred) or ActiveJob worker pool.
See more info on why Enjoy created this Gem at the EnjoyTech blog.
Installation
Add this line to your application's Gemfile:
gem 'parallel_workforce'And then execute:
$ bundle
Or install it yourself as:
$ gem install parallel_workforce
Usage
Parallel execution requires two things:
-
Creating one or more actor classes that has a keyword arguments
initializerand a no-argsperformmethod. -
Invoking
ParallelWorkforce.perform_allwith an array ofactor_classesand an equally sized array foractor_args_arraywith initializer arguments for the actor classes.Optional arguments for
execute_seriallyandjob_classcan be used withperform_allto override configuration settings.An optional
serial_execution_indexescan also be used to execute specific indexes serially. This is useful to easily perform "fast" actors in the current thread and only sending "slow" actors to be executed in workers. The specified serial actors are performed after enqueuing the remaining actors and waiting for them to complete.Additionally, a block can be passed to
perform_allthat allows arbitrary execution in the current thread after the jobs have been enqueued and before checking for job responses.
Example
Create cryptographic hashes of passwords in parallel in workers using BCrypt. Then verify the generated hashes against the passwords.
Create a class that performs the work.
require 'bcrypt'
class PasswordHashGenerator
attr_reader :password
def initialize(password:)
@password = password
end
def perform
BCrypt::Password.create(password, cost: 15).to_s
end
endInvoke the workers and verify the results.
require 'bcrypt'
passwords = ['password 1', 'password 2', 'password 3']
password_hashes = ParallelWorkforce.perform_all(
actor_classes: Array.new(passwords.size) { PasswordHashGenerator },
actor_args_array: passwords.map { |password| { password: password } },
)
# check that password hashes match
passwords.zip(password_hashes).each do |password, password_hash|
raise "Password does not match hash" if BCrypt::Password.new(password_hash) != password
endConfiguration
Execute ParallelWorkforce.configure before executing ParallelWorkforce.perform_all to configure. In a Rails application, add a file parallel_workforce_config.rb in your config/initializers directory.
ParallelWorkforce.configure do |configuration|
configuration.job_class = MyJobClass
# etc
end-
job_class- The class to use to respond to enqueued jobs. See classes under theParallelWorkforce::Jobnamespace. Defaults toParallelWorkforce::Job::ActiveJob. -
logger- Defaults toRails.loggerifRailsis loaded. -
revision_builder- Used to determine if the calling thread and worker have a different revision of code. Defaults to a class that builds a hash from the contents of all.rbfiles contained within the current working directory. -
serial_mode_checker- An object with anexecute_serially?method that allows forcing Actors to execute in calling thread instead of in workers. This can be used to turn off parallel execution globally. Default isnil. -
serializer- An object with aserialize(object)method that returns aStringanddeserialize(string)method that returns anObject. Default imlementation usesMarshal.dumpandMarshal.loadand serializes as a JSON string for Sidekiq compatibility. -
redis_connector- An object with awithmethod that yields a Redis connection. Defaults toParallelWorkforce::RedisConnector::RedisPool. If using Sidekiq, it's best to useParallelWorkforce::RedisConnector::SidekiqRedisPool. -
job_timeout- Time allowed to execte a job before timing out. Default is10seconds. -
job_key_expiration- Time allowed for result key in Redis to remain before it expires. This should be larger thanjob_timeout. Default is20seconds. -
production_environment- Removes serialization/deserialization when executing serially that helps locate problems with objects that fail to serialize. WhenRailsis loaded, usesRails.env.production?, otherwisetrue. -
allow_nested_parallelization- By default, executingParallelWorkforce.perform_allwithin a ParallelWorkforceActorwill execute serially. This can be enabled to allow nesting worker invocations, but ensure that the worker pool is large enough to handle blocked workers waiting for a response. Too small of a pool will lead to timeouts as no workers will be available.
Development
After checking out the repo, run bin/setup to install dependencies. You can also run bin/console for an interactive prompt that will allow you to experiment.
To install this gem onto your local machine, run bundle exec rake install. To release a new version, update the version number in version.rb, and then run bundle exec rake release, which will create a git tag for the version, push git commits and tags, and push the .gem file to rubygems.org.
Contributing
Bug reports and pull requests are welcome on GitHub at https://github.com/EnjoyTech/parallel_workforce.