Project

parallel

12.26
A long-lived project that still receives updates
Run any kind of code in parallel processes
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
 Dependencies
 Project Readme

Parallel

Gem Version Build Status

Run any code in parallel Processes(> use all CPUs), Threads(> speedup blocking operations), or Ractors(> use all CPUs).
Best suited for map-reduce or e.g. parallel downloads/uploads.

Install

gem install parallel

Usage

# 2 CPUs -> work in 2 processes (a,b + c)
results = Parallel.map(['a','b','c']) do |one_letter|
  SomeClass.expensive_calculation(one_letter)
end

# 3 Processes -> finished after 1 run
results = Parallel.map(['a','b','c'], in_processes: 3) { |one_letter| SomeClass.expensive_calculation(one_letter) }

# 3 Threads -> finished after 1 run
results = Parallel.map(['a','b','c'], in_threads: 3) { |one_letter| SomeClass.expensive_calculation(one_letter) }

# 3 Ractors -> finished after 1 run
results = Parallel.map(['a','b','c'], in_ractors: 3, ractor: [SomeClass, :expensive_calculation])

Same can be done with each

Parallel.each(['a','b','c']) { |one_letter| ... }

or each_with_index, map_with_index, flat_map

Produce one item at a time with lambda (anything that responds to .call) or Queue.

items = [1,2,3]
Parallel.each( -> { items.pop || Parallel::Stop }) { |number| ... }

Also supports any? or all?

Parallel.any?([1,2,3,4,5,6,7]) { |number| number == 4 }
# => true

Parallel.all?([1,2,nil,4,5]) { |number| number != nil }
# => false

Processes/Threads are workers, they grab the next piece of work when they finish.

Processes

  • Speedup through multiple CPUs
  • Speedup for blocking operations
  • Variables are protected from change
  • Extra memory used
  • Child processes are killed when your main process is killed through Ctrl+c or kill -2

Threads

  • Speedup for blocking operations
  • Variables can be shared/modified
  • No extra memory used

Ractors

  • Ruby 3.0+ only
  • Speedup for blocking operations
  • No extra memory used
  • Very fast to spawn
  • Experimental and unstable
  • start and finish hooks are called on main thread
  • Variables must be passed in Parallel.map([1,2,3].map { |i| [i, ARGV, local_var] }, ...
  • use Ractor.make_shareable to pass in global objects

ActiveRecord

Connection Lost

  • Multithreading needs connection pooling, forks need reconnects
  • Adjust connection pool size in config/database.yml when multithreading
# reproducibly fixes things (spec/cases/map_with_ar.rb)
Parallel.each(User.all, in_processes: 8) do |user|
  user.update_attribute(:some_attribute, some_value)
end
User.connection.reconnect!

# maybe helps: explicitly use connection pool
Parallel.each(User.all, in_threads: 8) do |user|
  ActiveRecord::Base.connection_pool.with_connection do
    user.update_attribute(:some_attribute, some_value)
  end
end

# maybe helps: reconnect once inside every fork
Parallel.each(User.all, in_processes: 8) do |user|
  @reconnected ||= User.connection.reconnect! || true
  user.update_attribute(:some_attribute, some_value)
end

NameError: uninitialized constant

A race happens when ActiveRecord models are autoloaded inside parallel threads in environments that lazy-load, like development, test, or migrations.

To fix, autoloaded classes before the parallel block with either require '<modelname>' or ModelName.class.

Break

Parallel.map([1, 2, 3]) do |i|
  raise Parallel::Break # -> stops after all current items are finished
end
Parallel.map([1, 2, 3]) { |i| raise Parallel::Break, i if i == 2 } == 2

Kill

Only use if whatever is executing in the sub-command is safe to kill at any point

Parallel.map([1,2,3]) do |x|
  raise Parallel::Kill if x == 1# -> stop all sub-processes, killing them instantly
  sleep 100 # Do stuff
end

Progress / ETA

# gem install ruby-progressbar

Parallel.map(1..50, progress: "Doing stuff") { sleep 1 }

# Doing stuff | ETA: 00:00:02 | ====================               | Time: 00:00:10

Use :finish or :start hook to get progress information.

  • :start has item and index
  • :finish has item, index, and result

They are called on the main process and protected with a mutex. (To just get the index, use the more performant Parallel.each_with_index)

Parallel.map(1..100, finish: -> (item, i, result) { ... do something ... }) { sleep 1 }

Set finish_in_order: true to call the :finish hook in the order of the input (will take longer to see initial output).

Parallel.map(1..9, finish: -> (item, i, result) { puts "#{item} ok" }, finish_in_order: true) { sleep rand }

Worker number

Use Parallel.worker_number to determine the worker slot in which your task is running.

Parallel.each(1..5, in_processes: 2) { |i| puts "Item: #{i}, Worker: #{Parallel.worker_number}" }
Item: 1, Worker: 1
Item: 2, Worker: 0
Item: 3, Worker: 1
Item: 4, Worker: 0
Item: 5, Worker: 1

Dynamically generating jobs

Example: wait for work to arrive or sleep

queue = []
Thread.new { loop { queue << rand(100); sleep 2 } } # job producer
Parallel.map(Proc.new { queue.pop }, in_processes: 3) { |f| f ? puts("#{f} received") : sleep(1) }

Tips

  • [Benchmark/Test] Disable threading/forking with in_threads: 0 or in_processes: 0, to run the same code with different setups
  • [Isolation] Do not reuse previous worker processes: isolation: true
  • [Stop all processes with an alternate interrupt signal] 'INT' (from ctrl+c) is caught by default. Catch 'TERM' (from kill) with interrupt_signal: 'TERM'
  • [Process count via ENV] PARALLEL_PROCESSOR_COUNT=16 will use 16 instead of the number of processors detected. This is used to reconfigure a tool using parallel without inserting custom logic.
  • [Process count] parallel uses a number of processors seen by the OS for process count by default. If you want to use a value considering CPU quota, please add concurrent-ruby to your Gemfile.

TODO

  • Replace Signal trapping with simple rescue Interrupt handler

Authors

Michael Grosser
michael@grosser.it
License: MIT