philiprehberger-retry_queue
Batch processor with per-item retry, backoff, and dead letter collection
Requirements
- Ruby >= 3.1
Installation
Add to your Gemfile:
gem "philiprehberger-retry_queue"Or install directly:
gem install philiprehberger-retry_queueUsage
require "philiprehberger/retry_queue"
result = Philiprehberger::RetryQueue.process(items, max_retries: 3) do |item|
process_item(item)
end
puts result.succeeded.size # => number of successful items
puts result.failed.size # => number of failed itemsCustom Backoff
result = Philiprehberger::RetryQueue.process(items, max_retries: 5, backoff: ->(n) { n * 0.5 }) do |item|
external_api_call(item)
endSelective Retry
result = Philiprehberger::RetryQueue.process(items, max_retries: 3, retry_on: [Net::OpenTimeout, Timeout::Error]) do |item|
api_call(item)
end
# Only Net::OpenTimeout and Timeout::Error trigger retries
# All other errors send the item straight to failedRetry Hooks
logger_hook = ->(item, error, attempt) { puts "Retrying #{item}: #{error.message} (attempt #{attempt})" }
metrics_hook = ->(item, _error, _attempt) { increment_counter("retry.#{item}") }
result = Philiprehberger::RetryQueue.process(items, max_retries: 3, on_retry: [logger_hook, metrics_hook]) do |item|
process_item(item)
endDLQ Reprocessing
result = Philiprehberger::RetryQueue.process(jobs, max_retries: 2) do |job|
job.execute!
end
reprocessed = result.reprocess_failed do |item, error|
fallback_handler(item, error)
end
puts reprocessed.succeeded.size # => items recovered during reprocessing
puts reprocessed.failed.size # => items that failed reprocessing tooStatistics
result = Philiprehberger::RetryQueue.process(records, max_retries: 3) do |record|
save(record)
end
stats = result.stats
# => { total: 100, succeeded: 97, failed: 3, success_rate: 0.97, elapsed: 1.23 }API
| Method | Description |
|---|---|
.process(items, max_retries:, concurrency:, backoff:, retry_on:, on_retry:) { |item| } |
Process items with retry logic |
Result#succeeded |
Array of successfully processed items |
Result#failed |
Array of hashes with :item, :error, :attempts
|
Result#stats |
Hash with :total, :succeeded, :failed, :success_rate, :elapsed
|
Result#reprocess_failed { |item, error| } |
Reprocess failed items, returns a new Result |
Development
bundle install
bundle exec rspec
bundle exec rubocopSupport
If you find this project useful: