The project is in a healthy, maintained state
A lightweight, zero-dependency, thread-safe in-process async job queue with configurable concurrency for Ruby applications.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
 Dependencies
 Project Readme

philiprehberger-task_queue

Gem Version CI License

In-process async job queue with concurrency control for Ruby.

Requirements

  • Ruby >= 3.1

Installation

Add to your Gemfile:

gem "philiprehberger-task_queue"

Or install directly:

gem install philiprehberger-task_queue

Usage

require "philiprehberger/task_queue"

queue = Philiprehberger::TaskQueue.new(concurrency: 4)

10.times do |i|
  queue.push { puts "Processing job #{i}" }
end

puts queue.size      # number of pending tasks
puts queue.running?  # => true

queue.shutdown(timeout: 30)

Using the << alias

queue << -> { puts "Hello from a task!" }

Error handling

queue = Philiprehberger::TaskQueue.new

queue.on_error do |exception, task|
  puts "Task failed: #{exception.message}"
end

queue.push { raise "oops" }

Statistics

queue.stats
# => { completed: 5, failed: 1, pending: 2 }

Draining

10.times { |i| queue.push { process(i) } }
queue.drain(timeout: 10)  # waits for all tasks to finish
# queue is still running and accepting new tasks

API

Method Description
.new(concurrency: 4) Create a new queue with the given max worker count
#push(&block) Enqueue a task (block) for async execution
#<< (&block) Alias for #push
#size Number of pending (not yet started) tasks
#running? Whether the queue is accepting new tasks
#shutdown(timeout: 30) Gracefully stop all workers, waiting up to timeout seconds
#on_error(&block) Register error callback for failed tasks
#stats Returns hash with :completed, :failed, :pending counts
#drain(timeout: 30) Block until all pending tasks complete (without shutdown)

Development

bundle install
bundle exec rspec
bundle exec rubocop

License

MIT