philiprehberger-parallel_each
Parallel iteration with configurable thread pool and ordered results
Requirements
- Ruby >= 3.1
Installation
Add to your Gemfile:
gem "philiprehberger-parallel_each"Or install directly:
gem install philiprehberger-parallel_eachUsage
require "philiprehberger/parallel_each"
# Parallel map (results preserve input order)
results = Philiprehberger::ParallelEach.map(urls, concurrency: 8) do |url|
fetch(url)
endParallel Each
Philiprehberger::ParallelEach.each(items, concurrency: 4) do |item|
process(item)
endParallel Select
even = Philiprehberger::ParallelEach.select(numbers, concurrency: 4, &:even?)Parallel Flat Map
pairs = Philiprehberger::ParallelEach.flat_map(records, concurrency: 4) do |r|
[r.id, r.name]
endMap and Each with Index
# map_with_index passes (item, index) to the block
labeled = Philiprehberger::ParallelEach.map_with_index(items, concurrency: 4) do |item, idx|
"#{idx}: #{item}"
end
# each_with_index for side effects with index access
Philiprehberger::ParallelEach.each_with_index(items, concurrency: 4) do |item, idx|
puts "Processing item #{idx}: #{item}"
endShort-Circuit Methods
has_admin = Philiprehberger::ParallelEach.any?(users, concurrency: 4, &:admin?)
all_valid = Philiprehberger::ParallelEach.none?(records, concurrency: 4, &:invalid?)Count and Reduce
even_count = Philiprehberger::ParallelEach.count(numbers, concurrency: 4, &:even?)
total = Philiprehberger::ParallelEach.reduce([1, 2, 3, 4], 0) { |acc, item| acc + item }Concurrency
All methods accept a concurrency: keyword argument that controls the thread pool size. It defaults to Etc.nprocessors (the number of available CPU cores).
# Use 2 threads
Philiprehberger::ParallelEach.map(items, concurrency: 2) { |i| i * 2 }
# Use all available cores (default)
Philiprehberger::ParallelEach.map(items) { |i| i * 2 }Error Handling
If any block raises an exception, the first error is re-raised after all threads finish:
begin
Philiprehberger::ParallelEach.map(items, concurrency: 4) do |item|
raise ArgumentError, 'invalid' if item.nil?
transform(item)
end
rescue ArgumentError => e
puts e.message # => "invalid"
endAPI
| Method | Description |
|---|---|
ParallelEach.map(collection, concurrency:) { |item| } |
Parallel map preserving input order |
ParallelEach.each(collection, concurrency:) { |item| } |
Parallel each, returns original collection |
ParallelEach.select(collection, concurrency:) { |item| } |
Parallel filter preserving input order |
ParallelEach.flat_map(collection, concurrency:) { |item| } |
Parallel flat_map, flattens one level |
ParallelEach.any?(collection, concurrency:) { |item| } |
Short-circuit any? |
ParallelEach.none?(collection, concurrency:) { |item| } |
Complement of any? |
ParallelEach.map_with_index(collection, concurrency:) { |item, idx| } |
Parallel map with index |
ParallelEach.each_with_index(collection, concurrency:) { |item, idx| } |
Parallel each with index |
ParallelEach.count(collection, concurrency:) { |item| } |
Count matching elements |
ParallelEach.reduce(collection, initial, concurrency:) { |acc, item| } |
Sequential reduction |
Development
bundle install
bundle exec rspec
bundle exec rubocopSupport
If you find this project useful: