Project

fiber_job

0.0
No release in over 3 years
Experimental High-performance, Redis-based background job processing library for Ruby built on fiber-based concurrency
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
 Dependencies

Runtime

~> 2.26, >= 2.26.0
~> 2.5, >= 2.5.0
~> 0.24, >= 0.24.0
~> 0.24, >= 0.24.0
 Project Readme

FiberJob

A high-performance, Redis-based background job processing library for Ruby built on modern fiber-based concurrency. FiberJob combines the persistence of Redis with the speed of async fibers to deliver exceptional performance and reliability.

Requirements

  • Ruby 3.1+
  • Redis 5.0+

Features

  • Fiber-Based Job Processing: Execute jobs using modern Ruby async/fiber concurrency
  • Hybrid Queue Architecture: Redis persistence + in-memory async queues for optimal performance
  • Job Scheduling: Schedule jobs for future execution with precise timing
  • Cron Jobs: Define recurring jobs with cron expressions
  • Retry Logic: Built-in exponential backoff with configurable retry policies
  • Priority Queues: Support for high-priority job execution
  • Advanced Concurrency: Per-queue fiber pools with semaphore-controlled execution
  • Failure Tracking: Store and inspect failed jobs for debugging
  • Redis Integration: Optimized Redis usage with connection pooling and atomic operations

Installation

Add this line to your application's Gemfile:

gem 'fiber_job'

And then execute:

$ bundle install

Or install it yourself as:

$ gem install fiber_job

Quick Start

1. Configuration

require 'fiber_job'

FiberJob.configure do |config|
  config.redis_url = 'redis://localhost:6379/0'
  config.queues = [:default, :high, :low]
  config.queue_concurrency = {
    default: 5,
    high: 10,
    low: 2
  }
  config.log_level = :info
end

2. Define a Job

class EmailJob < FiberJob::Job
  def perform(user_id, message)
    user = User.find(user_id)
    UserMailer.notification(user, message).deliver_now
    FiberJob.logger.info "Email sent to user #{user_id}"
  end
end

3. Enqueue Jobs

# Immediate execution
EmailJob.perform_async(123, "Welcome to our platform!")

# Delayed execution
EmailJob.perform_in(1.hour, 456, "Don't forget to complete your profile")

# Scheduled execution
EmailJob.perform_at(Date.tomorrow.beginning_of_day, 789, "Daily newsletter")

4. Start Workers

# Start a worker for all configured queues
worker = FiberJob::Worker.new
worker.start

# Start a worker for specific queues
worker = FiberJob::Worker.new(queues: [:high, :default])
worker.start

How the Hybrid Architecture Works

FiberJob's unique architecture combines Redis persistence with fiber-based in-memory processing:

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Redis Queue   │───▶│  Polling Fiber  │───▶│ Async::Queue    │
│   (Persistent)  │    │  (Single/Queue) │    │ (In-Memory)     │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                                                        │
                                                        ▼
                                              ┌─────────────────┐
                                              │ Processing Pool │
                                              │ (Fiber Pool +   │
                                              │  Semaphore)     │
                                              └─────────────────┘

Fiber Pool Architecture

# Each queue gets its own fiber ecosystem
Sync do |task|
  @queues.each do |queue_name|
    # 1. Single polling fiber per queue (minimal Redis load)
    task.async { poll_redis_queue(queue_name) }

    # 2. Multiple processing fibers per queue (parallel execution)
    concurrency.times do
      task.async { process_job_queue(queue_name) }
    end
  end
end

# Jobs flow: Redis → Polling Fiber → Async::Queue → Processing Fiber Pool
def poll_redis_queue(queue_name)
  while @running
    job_data = Queue.pop(queue_name, timeout: 1.0)  # Redis operation
    @job_queues[queue_name].enqueue(job_data) if job_data  # Fast async queue
  end
end

def process_job_queue(queue_name)
  while @running
    job_data = @job_queues[queue_name].dequeue  # Instant fiber operation
    @managers[queue_name].execute { execute_job(job_data) }  # Semaphore-controlled
  end
end

Performance Benefits

  • Sub-millisecond job pickup: Jobs are instantly available in Async::Queue
  • Reduced Redis load: One polling fiber per queue instead of multiple workers competing
  • Optimal concurrency: Semaphore ensures exact concurrency limits without oversubscription
  • Zero blocking: All operations use async/await patterns for maximum throughput

Job Configuration

Custom Job Settings

class ComplexJob < FiberJob::Job
  def initialize
    super
    @queue = :high_priority
    @max_retries = 5
    @timeout = 600  # 10 minutes
  end

  def perform(data)
    # Complex processing logic
  end

  def retry_delay(attempt)
    # Custom retry strategy: linear backoff
    attempt * 60  # 1min, 2min, 3min...
  end
end

Priority Retry

class CriticalJob < FiberJob::Job
  def priority_retry?
    true  # Failed jobs go to front of queue on retry
  end
end

Cron Jobs

Define recurring jobs with cron expressions:

class DailyReportJob < FiberJob::CronJob
  cron '0 9 * * *'  # Every day at 9 AM

  def execute_cron_job
    # Generate and send daily reports
    Report.generate_daily
    FiberJob.logger.info "Daily report generated"
  end
end

class HourlyCleanupJob < FiberJob::CronJob
  cron '0 * * * *'  # Every hour

  def execute_cron_job
    # Cleanup old data
    TempData.cleanup_old_records
  end
end

# Register cron jobs (automatically done on startup)
FiberJob.register_cron_jobs

Queue Management

Queue Statistics

stats = FiberJob::Queue.stats(:default)
puts "Queue size: #{stats[:size]}"
puts "Scheduled jobs: #{stats[:scheduled]}"
puts "Processing: #{stats[:processing]}"

Failed Jobs

# View failed jobs
failed_jobs = FiberJob::Queue.failed_jobs
failed_jobs.each do |job|
  puts "Failed: #{job['class']} - #{job['error']}"
end

# Note: clear_failed_jobs method was removed as it was unused

Configuration Options

Option Default Description
redis_url redis://localhost:6379 Redis connection URL
concurrency 2 Global default concurrency
queues [:default] List of queues to process
queue_concurrency {default: 2} Per-queue concurrency settings
log_level :info Logging level (debug, info, warn, error, fatal)

Environment Variables

  • REDIS_URL: Redis connection URL
  • FIBER_JOB_LOG_LEVEL: Logging level

License

This project is licensed under the MIT License - see the LICENSE file for details.

Roadmap

  • Middleware support for job lifecycle hooks
  • Dead letter queue for permanently failed jobs
  • Metrics and monitoring integration
  • ActiveJob adapter
  • Web UI for job monitoring and management

Support

  • Create an issue on GitHub for bug reports or feature requests
  • Check existing issues for solutions to common problems
  • Review the documentation for detailed API information