The project is in a healthy, maintained state
Parallel map, each, select, and flat_map with a configurable thread pool. Results maintain input order. Handles errors gracefully.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
 Dependencies
 Project Readme

philiprehberger-parallel_each

Tests Gem Version Last updated

Parallel iteration with configurable thread pool and ordered results

Requirements

  • Ruby >= 3.1

Installation

Add to your Gemfile:

gem "philiprehberger-parallel_each"

Or install directly:

gem install philiprehberger-parallel_each

Usage

require "philiprehberger/parallel_each"

# Parallel map (results preserve input order)
results = Philiprehberger::ParallelEach.map(urls, concurrency: 8) do |url|
  fetch(url)
end

Parallel Each

Philiprehberger::ParallelEach.each(items, concurrency: 4) do |item|
  process(item)
end

Parallel Select

even = Philiprehberger::ParallelEach.select(numbers, concurrency: 4, &:even?)

Parallel Flat Map

pairs = Philiprehberger::ParallelEach.flat_map(records, concurrency: 4) do |r|
  [r.id, r.name]
end

Map and Each with Index

# map_with_index passes (item, index) to the block
labeled = Philiprehberger::ParallelEach.map_with_index(items, concurrency: 4) do |item, idx|
  "#{idx}: #{item}"
end

# each_with_index for side effects with index access
Philiprehberger::ParallelEach.each_with_index(items, concurrency: 4) do |item, idx|
  puts "Processing item #{idx}: #{item}"
end

Short-Circuit Methods

has_admin = Philiprehberger::ParallelEach.any?(users, concurrency: 4, &:admin?)
all_valid = Philiprehberger::ParallelEach.none?(records, concurrency: 4, &:invalid?)

Count and Reduce

even_count = Philiprehberger::ParallelEach.count(numbers, concurrency: 4, &:even?)

total = Philiprehberger::ParallelEach.reduce([1, 2, 3, 4], 0) { |acc, item| acc + item }

Concurrency

All methods accept a concurrency: keyword argument that controls the thread pool size. It defaults to Etc.nprocessors (the number of available CPU cores).

# Use 2 threads
Philiprehberger::ParallelEach.map(items, concurrency: 2) { |i| i * 2 }

# Use all available cores (default)
Philiprehberger::ParallelEach.map(items) { |i| i * 2 }

Error Handling

If any block raises an exception, the first error is re-raised after all threads finish:

begin
  Philiprehberger::ParallelEach.map(items, concurrency: 4) do |item|
    raise ArgumentError, 'invalid' if item.nil?

    transform(item)
  end
rescue ArgumentError => e
  puts e.message # => "invalid"
end

API

Method Description
ParallelEach.map(collection, concurrency:) { |item| } Parallel map preserving input order
ParallelEach.each(collection, concurrency:) { |item| } Parallel each, returns original collection
ParallelEach.select(collection, concurrency:) { |item| } Parallel filter preserving input order
ParallelEach.flat_map(collection, concurrency:) { |item| } Parallel flat_map, flattens one level
ParallelEach.any?(collection, concurrency:) { |item| } Short-circuit any?
ParallelEach.none?(collection, concurrency:) { |item| } Complement of any?
ParallelEach.map_with_index(collection, concurrency:) { |item, idx| } Parallel map with index
ParallelEach.each_with_index(collection, concurrency:) { |item, idx| } Parallel each with index
ParallelEach.count(collection, concurrency:) { |item| } Count matching elements
ParallelEach.reduce(collection, initial, concurrency:) { |acc, item| } Sequential reduction

Development

bundle install
bundle exec rspec
bundle exec rubocop

Support

If you find this project useful:

Star the repo

🐛 Report issues

💡 Suggest features

❤️ Sponsor development

🌐 All Open Source Projects

💻 GitHub Profile

🔗 LinkedIn Profile

License

MIT