Low commit activity in last 3 years
Thread-safe queue and stack data structures with configurable capacity limits, blocking enqueue/dequeue with timeouts, and peek operations. Uses Mutex and ConditionVariable for safe concurrent access.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
 Dependencies
 Project Readme

philiprehberger-queue_stack

Tests Gem Version Last updated

philiprehberger-queue_stack

Thread-safe Queue and Stack with capacity limits and blocking operations

Requirements

  • Ruby >= 3.1

Installation

Add to your Gemfile:

gem "philiprehberger-queue_stack"

Or install directly:

gem install philiprehberger-queue_stack

Usage

require "philiprehberger/queue_stack"

q = Philiprehberger::QueueStack::Queue.new(capacity: 100)
q.enqueue('task')
item = q.dequeue  # => 'task'

Queue (FIFO)

q = Philiprehberger::QueueStack::Queue.new(capacity: 10)
q.enqueue('first')
q.enqueue('second')
q.enqueue('third')
q.peek         # => 'first'
q.peek_at(1)   # => 'second'
q.peek_at(-1)  # => 'third'
q.peek_at(99)  # => nil (out of range)
q.dequeue      # => 'first'
q.size         # => 2

Stack (LIFO)

s = Philiprehberger::QueueStack::Stack.new(capacity: 10)
s.push('first')
s.push('second')
s.pop       # => 'second'
s.peek      # => 'first'
s.size      # => 1

Blocking with Timeout

q = Philiprehberger::QueueStack::Queue.new
item = q.try_dequeue(timeout: 5)  # waits up to 5 seconds

s = Philiprehberger::QueueStack::Stack.new
item = s.try_pop(timeout: 5)  # waits up to 5 seconds

Conditional Removal

Remove the front/top item only when a predicate holds. Non-blocking; returns nil if the collection is empty or the block returns false (the item stays put).

q = Philiprehberger::QueueStack::Queue.new
q.enqueue({ priority: 10 })
q.enqueue({ priority: 2 })

# Only take high-priority work
q.dequeue_if { |job| job[:priority] >= 5 }  # => { priority: 10 }
q.dequeue_if { |job| job[:priority] >= 5 }  # => nil (head priority 2 left intact)

s = Philiprehberger::QueueStack::Stack.new
s.push(:ready)
s.pop_if { |item| item == :ready }   # => :ready
s.pop_if { |item| item == :ready }   # => nil (empty)

Drain

q = Philiprehberger::QueueStack::Queue.new
q.enqueue('a')
q.enqueue('b')
q.enqueue('c')
q.drain  # => ['a', 'b', 'c'] (queue is now empty)

s = Philiprehberger::QueueStack::Stack.new
s.push('a')
s.push('b')
s.push('c')
s.drain  # => ['c', 'b', 'a'] (stack is now empty)

Iteration

q = Philiprehberger::QueueStack::Queue.new
q.enqueue('a')
q.enqueue('b')
q.each { |item| puts item }  # prints 'a', 'b'
q.to_a                        # => ['a', 'b'] (queue unchanged)

Non-Blocking Insertion

q = Philiprehberger::QueueStack::Queue.new(capacity: 1)
q.enqueue('a')
q.try_enqueue('b')                 # => false (full, no wait)
q.try_enqueue('b', timeout: 0.5)   # => false after waiting up to 0.5s

s = Philiprehberger::QueueStack::Stack.new(capacity: 1)
s.push('a')
s.try_push('b')                    # => false (full, no wait)

Clear

q = Philiprehberger::QueueStack::Queue.new
q.enqueue('a'); q.enqueue('b')
q.clear
q.empty?  # => true

Close / Shutdown

q = Philiprehberger::QueueStack::Queue.new
q.enqueue('a')
q.close
q.closed?     # => true
q.dequeue     # => 'a'
q.dequeue     # => nil (closed and empty)
q.enqueue('b') # raises Philiprehberger::QueueStack::ClosedError

Capacity Limits

q = Philiprehberger::QueueStack::Queue.new(capacity: 3)
q.full?   # => false
3.times { |i| q.enqueue(i) }
q.full?   # => true
# enqueue blocks until space is available

Capacity

Read the configured capacity and the number of additional items that can be accepted. Both return nil for unbounded containers; remaining_capacity returns 0 when full. Useful for sizing batches or backpressure decisions.

q = Philiprehberger::QueueStack::Queue.new(capacity: 100)
q.capacity            # => 100
q.remaining_capacity  # => 100
50.times { |i| q.enqueue(i) }
q.remaining_capacity  # => 50

batch_size = [items.length, q.remaining_capacity].min

Batch Insertion and Draining

enqueue_all / push_all insert an array of items under a single mutex acquisition. dequeue_batch / pop_batch remove up to max items at once and signal waiting producers. All four respect capacity and ClosedError semantics.

q = Philiprehberger::QueueStack::Queue.new
q.enqueue_all(%w[a b c d])
q.dequeue_batch(2)  # => ["a", "b"]
q.dequeue_batch(99) # => ["c", "d"]   (clamped to available)

s = Philiprehberger::QueueStack::Stack.new
s.push_all(%w[a b c])
s.pop_batch(2)      # => ["c", "b"]   (LIFO: top first)

API

Queue

Method Description
.new(capacity:) Create a queue with optional capacity limit
#enqueue(item) Add item to back (blocks if full)
#enqueue_all(items) Enqueue an array of items in FIFO order (blocks per-element if full)
#try_enqueue(item, timeout: nil) Non-blocking enqueue, returns true/false (waits up to timeout if given)
#dequeue Remove and return front item (blocks if empty)
#dequeue_batch(max) Remove and return up to max items in FIFO order (non-blocking)
#dequeue_if { |item| ... } Remove and return the front item only if the block is truthy (non-blocking)
#try_dequeue(timeout:) Dequeue with timeout, returns nil on timeout
#clear Remove all items without returning them
#peek View front item without removing
#peek_at(index) View item at the given position (supports negative indices, returns nil if out of range)
#drain Remove and return all items as array (FIFO order)
#each Iterate items without removing (returns Enumerator if no block)
#to_a Snapshot as array (FIFO order)
#close Mark as closed (new enqueues raise ClosedError)
#closed? Whether the queue has been closed
#size Number of items
#empty? Whether the queue is empty
#full? Whether the queue is at capacity
#capacity Configured capacity, or nil for an unlimited queue
#remaining_capacity Items the queue can still accept (nil for unlimited, 0 when full)

Stack

Method Description
.new(capacity:) Create a stack with optional capacity limit
#push(item) Push item on top (blocks if full)
#push_all(items) Push an array of items in order; last becomes the top (blocks per-element if full)
#try_push(item, timeout: nil) Non-blocking push, returns true/false (waits up to timeout if given)
#pop Remove and return top item (blocks if empty)
#pop_batch(max) Pop up to max items, top first (non-blocking)
#pop_if { |item| ... } Remove and return the top item only if the block is truthy (non-blocking)
#try_pop(timeout:) Pop with timeout, returns nil on timeout
#clear Remove all items without returning them
#peek View top item without removing
#drain Remove and return all items as array (LIFO order)
#each Iterate items without removing (returns Enumerator if no block)
#to_a Snapshot as array (LIFO order)
#close Mark as closed (new pushes raise ClosedError)
#closed? Whether the stack has been closed
#size Number of items
#empty? Whether the stack is empty
#full? Whether the stack is at capacity
#capacity Configured capacity, or nil for an unlimited stack
#remaining_capacity Items the stack can still accept (nil for unlimited, 0 when full)

Development

bundle install
bundle exec rspec
bundle exec rubocop

Support

If you find this project useful:

Star the repo

🐛 Report issues

💡 Suggest features

❤️ Sponsor development

🌐 All Open Source Projects

💻 GitHub Profile

🔗 LinkedIn Profile

License

MIT