Low commit activity in last 3 years
Processes collections in configurable chunks with progress tracking callbacks and per-item error collection. Returns detailed results including processed count, error entries, chunk count, and elapsed time.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
 Dependencies
 Project Readme

philiprehberger-batch

Tests Gem Version Last updated

Batch processing toolkit with chunking, progress, and error collection

Requirements

  • Ruby >= 3.1

Installation

Add to your Gemfile:

gem "philiprehberger-batch"

Or install directly:

gem install philiprehberger-batch

Usage

require "philiprehberger/batch"

result = Philiprehberger::Batch.process(records, size: 50) do |batch|
  batch.each { |record| save(record) }
end

puts result.processed  # => number of successful items
puts result.success?   # => true if no errors

Progress Tracking

result = Philiprehberger::Batch.process(items, size: 100) do |batch|
  batch.each { |item| process(item) }
  batch.on_progress do |info|
    puts "Chunk #{info[:chunk_index] + 1}/#{info[:total_chunks]} - #{info[:percentage]}%"
  end
end

Top-level Progress Callback

Pass on_progress: at the call site to subscribe without touching every chunk:

progress = ->(info) { puts "#{info[:percentage]}% (#{info[:processed]}/#{info[:total_items]})" }

Philiprehberger::Batch.process(items, size: 100, on_progress: progress) do |batch|
  batch.each { |item| process(item) }
end

Error Collection

result = Philiprehberger::Batch.process(jobs, size: 25) do |batch|
  batch.each { |job| job.execute! }
  batch.on_error { |item, err| log_error(item, err) }
end

result.errors.each do |entry|
  puts "Failed: #{entry[:item]} - #{entry[:error].message}"
end

Early Termination

result = Philiprehberger::Batch.process(items, size: 50) do |batch|
  batch.on_error { |_item, _err| :halt }
  batch.each { |item| risky_operation(item) }
end

result.halted?  # => true if processing stopped early

Retry Per Chunk

result = Philiprehberger::Batch.process(items, size: 100, retries: 2) do |batch|
  batch.each { |item| unreliable_api_call(item) }
end

Result Aggregation

result = Philiprehberger::Batch.process(users, size: 50) do |batch|
  batch.each { |user| user.active? ? :active : :inactive }
end

result.counts                              # => { active: 42, inactive: 8 }
result.flat_map { |status| [status] }      # => [:active, :active, :inactive, ...]
result.group_by { |status| status }        # => { active: [...], inactive: [...] }

Success Rate

result = Philiprehberger::Batch.process(jobs, size: 50) do |batch|
  batch.each { |job| job.execute! }
end

result.success_rate  # => 0.0..1.0 ratio of processed to total (1.0 when total is 0)
puts "#{(result.success_rate * 100).round(1)}% succeeded"

Timing Statistics

result = Philiprehberger::Batch.process(records, size: 50) do |batch|
  batch.each { |record| save(record) }
end

stats = result.timing
puts stats[:total]          # => overall elapsed time in seconds
puts stats[:per_chunk]      # => average time per chunk
puts stats[:per_item]       # => average time per item
puts stats[:fastest_chunk]  # => shortest chunk duration
puts stats[:slowest_chunk]  # => longest chunk duration

Timeout Per Chunk

result = Philiprehberger::Batch.process(items, size: 100, timeout_per_chunk: 30) do |batch|
  batch.each { |item| slow_external_call(item) }
end

# Chunks that exceed 30 seconds are interrupted. The TimeoutError is captured
# in result.errors; items from that chunk are NOT counted in result.processed.
# Processing continues with the remaining chunks.
timeout_errors = result.errors.select { |e| e[:error].is_a?(Philiprehberger::Batch::TimeoutError) }

Filtering Errors by Class

result = Philiprehberger::Batch.process(items, size: 50) do |batch|
  batch.each { |item| item.sync! }
end

timeout_errors = result.filter_errors(Philiprehberger::Batch::TimeoutError)
timeout_errors.each { |e| puts "Chunk timed out: #{e[:item].inspect}" }

arg_errors = result.filter_errors(ArgumentError)
arg_errors.each { |e| puts "Bad argument for #{e[:item]}: #{e[:error].message}" }

Errors for a Specific Item

result = Philiprehberger::Batch.process(records, size: 50) do |batch|
  batch.each { |record| record.save! }
end

result.errors_for(records.first).each do |entry|
  puts "#{entry[:item]} failed: #{entry[:error].message}"
end

Partial Success

result = Philiprehberger::Batch.process([1, 2, 3, 4]) do |batch|
  batch.each { |n| raise "even" if n.even? }
end

result.partial?      # => true (some succeeded, some failed)
result.failed_items  # => [2, 4]

Concurrency

result = Philiprehberger::Batch.process(records, size: 100, concurrency: 4) do |batch|
  batch.each { |record| api_call(record) }
end

result.processed  # => total successful across all threads
result.results    # => collected in chunk order

API

Method / Class Description
.process(collection, size:, concurrency:, retries:, timeout_per_chunk:, on_progress:) { |batch| } Process collection in chunks (optional top-level progress callback and per-chunk timeout)
Batch::TimeoutError Raised internally and captured in Result#errors when a chunk exceeds timeout_per_chunk
Chunk#each { |item| } Iterate over items in the chunk
Chunk#on_progress { |info| } Register progress callback
Chunk#on_error { |item, err| } Register error callback (return :halt to stop)
Result#processed Number of successfully processed items
Result#errors Array of error hashes
Result#total Total number of items
Result#chunks Number of chunks processed
Result#elapsed Elapsed time in seconds
Result#success? True if no errors occurred
Result#halted? True if processing was halted early
Result#results Array of collected return values
Result#flat_map { |r| } Map over results and flatten
Result#counts Hash counting occurrences of each result value
Result#group_by { |r| } Group results by block return value
Result#success_rate Ratio of processed to total as a Float in [0.0, 1.0] (1.0 when empty)
Result#timing Hash of timing stats: total, per_chunk, per_item, fastest_chunk, slowest_chunk
Result#filter_errors(error_class) Array of { item:, error: } hashes where the error is an instance of the given class
Result#errors_for(item) Array of { item:, error: } hashes for a specific item
Result#failed_items Unique items that errored, in first-failure order
Result#partial? True when some items succeeded and some errored (false on full success or full failure)

Development

bundle install
bundle exec rspec
bundle exec rubocop

Support

If you find this project useful:

Star the repo

🐛 Report issues

💡 Suggest features

❤️ Sponsor development

🌐 All Open Source Projects

💻 GitHub Profile

🔗 LinkedIn Profile

License

MIT