Low commit activity in last 3 years
Parallel iteration methods (map, each, select, reject, find, flat_map, any?, all?, none?, count, reduce) with a configurable thread pool. Results maintain input order.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
 Dependencies
 Project Readme

philiprehberger-parallel_each

Tests Gem Version Last updated

Parallel iteration with configurable thread pool and ordered results

Requirements

  • Ruby >= 3.1

Installation

Add to your Gemfile:

gem "philiprehberger-parallel_each"

Or install directly:

gem install philiprehberger-parallel_each

Usage

require "philiprehberger/parallel_each"

# Parallel map (results preserve input order)
results = Philiprehberger::ParallelEach.map(urls, concurrency: 8) do |url|
  fetch(url)
end

Parallel Each

Philiprehberger::ParallelEach.each(items, concurrency: 4) do |item|
  process(item)
end

Parallel Select and Reject

even = Philiprehberger::ParallelEach.select(numbers, concurrency: 4, &:even?)
odd = Philiprehberger::ParallelEach.reject(numbers, concurrency: 4, &:even?)

Parallel Partition

Evaluate a predicate on every element in parallel and split into [truthy, falsy] in a single pass, order preserved within each array:

even, odd = Philiprehberger::ParallelEach.partition(numbers, concurrency: 4, &:even?)

Parallel Find

admin = Philiprehberger::ParallelEach.find(users, concurrency: 4, &:admin?)

Parallel Flat Map

pairs = Philiprehberger::ParallelEach.flat_map(records, concurrency: 4) do |r|
  [r.id, r.name]
end

Map and Each with Index

# map_with_index passes (item, index) to the block
labeled = Philiprehberger::ParallelEach.map_with_index(items, concurrency: 4) do |item, idx|
  "#{idx}: #{item}"
end

# each_with_index for side effects with index access
Philiprehberger::ParallelEach.each_with_index(items, concurrency: 4) do |item, idx|
  puts "Processing item #{idx}: #{item}"
end

Short-Circuit Methods

has_admin = Philiprehberger::ParallelEach.any?(users, concurrency: 4, &:admin?)
all_valid = Philiprehberger::ParallelEach.all?(users, concurrency: 4, &:valid?)
no_errors = Philiprehberger::ParallelEach.none?(records, concurrency: 4, &:invalid?)

Count and Reduce

even_count = Philiprehberger::ParallelEach.count(numbers, concurrency: 4, &:even?)

total = Philiprehberger::ParallelEach.reduce([1, 2, 3, 4], 0) { |acc, item| acc + item }

Concurrency

All methods accept a concurrency: keyword argument that controls the thread pool size. It defaults to Etc.nprocessors (the number of available CPU cores).

# Use 2 threads
Philiprehberger::ParallelEach.map(items, concurrency: 2) { |i| i * 2 }

# Use all available cores (default)
Philiprehberger::ParallelEach.map(items) { |i| i * 2 }

Stats

After a parallel run, inspect the most recent run's stats with last_stats:

Philiprehberger::ParallelEach.map(items, concurrency: 4) { |i| process(i) }

Philiprehberger::ParallelEach.last_stats
# => { workers: 4, completed: 10, failed: 0, elapsed_seconds: 0.123 }

last_stats returns nil until the first run completes. elapsed_seconds is nil when no run has finished. Stats are reset and updated on each run.

Error Handling

If any block raises an exception, the first error is re-raised after all threads finish:

begin
  Philiprehberger::ParallelEach.map(items, concurrency: 4) do |item|
    raise ArgumentError, 'invalid' if item.nil?

    transform(item)
  end
rescue ArgumentError => e
  puts e.message # => "invalid"
end

API

Method Description
ParallelEach.map(collection, concurrency:) { |item| } Parallel map preserving input order
ParallelEach.each(collection, concurrency:) { |item| } Parallel each, returns original collection
ParallelEach.select(collection, concurrency:) { |item| } Parallel filter preserving input order
ParallelEach.reject(collection, concurrency:) { |item| } Parallel inverse filter preserving input order
ParallelEach.partition(collection, concurrency:) { |item| } Parallel partition returning [truthy, falsy] with order preserved
ParallelEach.flat_map(collection, concurrency:) { |item| } Parallel flat_map, flattens one level
ParallelEach.find(collection, concurrency:) { |item| } Short-circuit find, returns first match or nil
ParallelEach.any?(collection, concurrency:) { |item| } Short-circuit any?
ParallelEach.all?(collection, concurrency:) { |item| } Short-circuit all?
ParallelEach.none?(collection, concurrency:) { |item| } Complement of any?
ParallelEach.map_with_index(collection, concurrency:) { |item, idx| } Parallel map with index
ParallelEach.each_with_index(collection, concurrency:) { |item, idx| } Parallel each with index
ParallelEach.count(collection, concurrency:) { |item| } Count matching elements
ParallelEach.reduce(collection, initial, concurrency:) { |acc, item| } Sequential reduction
ParallelEach.last_stats Hash of stats from the most recent run (workers, completed, failed, elapsed_seconds), or nil

Development

bundle install
bundle exec rspec
bundle exec rubocop

Support

If you find this project useful:

Star the repo

🐛 Report issues

💡 Suggest features

❤️ Sponsor development

🌐 All Open Source Projects

💻 GitHub Profile

🔗 LinkedIn Profile

License

MIT