Project

main_loop

0.0
A long-lived project that still receives updates
Main Loop implementation to control subprocesses and threads
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
 Dependencies
 Project Readme

MainLoop

Gem Version Gem YARD

Coverage Quality Outdated Vulnerabilities

MainLoop — Ruby-библиотека для управления субпроцессами и потоками с функциями:

  • автоматический сбор дочерних процессов (reaping)
  • корректное завершение (SIGTERM/SIGINT) процессов и потоков
  • автоматический перезапуск по количеству повторов
  • принудительное завершение по таймауту
  • обработка завершения процессов и потоков

MainLoop is a Ruby library for managing subprocesses and threads with features:

  • automatic child process reaping
  • graceful shutdown (SIGTERM/SIGINT) for processes and threads
  • automatic restart by retry count
  • timeout-based force termination
  • process/thread completion handling

Возможности / Features

  • Потоко-безопасный канал обмена событиями (IO.pipe) / Thread-safe event bus (IO.pipe)
  • Управление процессами через Kernel.fork / Process management via Kernel.fork
  • Управление потоками через Thread.new / Thread management via Thread.new
  • Обработка сигналов TERM/INT/CLD / SIGTERM/INT/CLD signal handling
  • Автоматический сбор завершенных процессов / Automatic child process reaping
  • Retry-логика для перезапуска / Retry logic for restarts
  • Принудительное завершение по таймауту / Timeout-based force termination

Начало работы / Getting started

gem install main_loop

При установке MainLoop через bundler добавьте следующую строку в Gemfile:


If you'd rather install MainLoop using bundler, add a line for it in your Gemfile:

gem 'main_loop'

Затем выполните / Then run:

bundle install # для установки гема / gem installation

Корневой модуль / Root module

MainLoop - это корневой модуль, который подключает все компоненты:


MainLoop is the root module that requires all components:

require 'main_loop'

Субмодули / Submodules:

  • MainLoop::Bus — канал обмена событиями (IO.pipe)
  • MainLoop::Dispatcher — координация обработчиков и управление жизненным циклом
  • MainLoop::Loop — главный цикл обработки событий и сигналов
  • MainLoop::Handler — абстрактный базовый класс
  • MainLoop::ProcessHandler — управление субпроцессами
  • MainLoop::ThreadHandler — управление потоками

Архитектура / Architecture

graph TB
    subgraph "MainLoop"
        Loop["Loop<br><i>Главный цикл</i>"]
        Dispatcher["Dispatcher<br><i>Координирует обработчиков</i>"]
        Bus["Bus<br><i>Канал обмена событиями (IO.pipe)</i>"]
        Handler["Handler<br><i>Базовый класс</i>"]
        ProcessHandler["ProcessHandler<br><i>Управляет процессами</i>"]
        ThreadHandler["ThreadHandler<br><i>Управляет потоками</i>"]
        
        Loop -->|координирует| Dispatcher
        Dispatcher -->|использует| Bus
        Loop -->|получает события из| Bus
        ProcessHandler -->|наследует| Handler
        ThreadHandler -->|наследует| Handler
        ProcessHandler -->|регистрируется в| Dispatcher
        ThreadHandler -->|регистрируется в| Dispatcher
    end
Loading

Жизненный цикл обработчика / Handler lifecycle

sequenceDiagram
    participant Cycle as Loop
    participant Bus
    participant Dispatcher
    participant Handler

    note over Cycle,Handler: Инициализация / Initialization
    Cycle->>Bus: install_signal_handlers
    Handler->>Dispatcher: add_handler

    Cycle->>Cycle: start_loop_forever
    loop Forever
        Cycle->>Bus: gets(wait)
        alt sig:TERM/INT
            Bus-->>Cycle: "sig:TERM"
            Cycle->>Dispatcher: term
            Dispatcher->>Dispatcher: @terminating_at = Time.now
            note right of Dispatcher: Graceful termination
            Dispatcher->>Handler: term
            opt @on_term || @runnable.on_term
                Handler->>Handler: @on_term.call(pid/thread)
            end
        else sig:CLD
            Cycle->>Cycle: wait for reap_children
        else reap:pid:status
            Cycle->>Dispatcher: reap_by_id
            Dispatcher->>Handler: reap(status)
            alt retry_count > 0
                Handler->>Handler: handle_retry -> run
            else
                Handler->>Bus: publish(:term)
            end
        end
        Cycle->>Dispatcher: tick
        alt need_force_kill?
            note right of Dispatcher: Превышен timeout / Timeout exceeded
            Dispatcher->>Dispatcher: @killed = true
            Dispatcher->>Handler: kill
        end
    end
Loading

Способы определения логики

Библиотека поддерживает два основных подхода для описания работы процессов и потоков:

1. Блок (inline block)

Передайте блок кода непосредственно в конструктор обработчика (ProcessHandler или ThreadHandler). В этом блоке размещается основная логика. При завершении (по сигналу, ошибке или таймауту) блок прерывается; вы можете определить дополнительные действия, используя переданный объект обработчика.

MainLoop::ProcessHandler.new(dispatcher, 'my_process', retry_count: 3) do
  # основная логика
  loop { sleep 1 }
end

MainLoop::ThreadHandler.new(dispatcher, 'my_thread', retry_count: 0) do |handler|
  handler.on_term do
    # действия при завершении (очистка, закрытие ресурсов)
    @stop = true
  end
  # основная логика с возможностью проверки флага
  @stop = false
  loop { sleep 1; break if @stop }
end

2. Объект с интерфейсом run / on_term

Передайте экземпляр класса, который реализует два обязательных метода:

  • run — содержит основную логику; для ProcessHandler метод не принимает аргументов, для ThreadHandler получает объект потока.
  • on_term — вызывается при необходимости завершить процесс/поток; для ProcessHandler принимает PID, для ThreadHandler — объект потока. В этом методе следует инициировать корректное завершение (например, послать сигнал процессу или установить флаг остановки для потока).
class Worker
  def run
    trap('USR1') { @stop = true; raise Interrupt }
    @stop = false
    loop { sleep 1; break if @stop }
  rescue Interrupt
    # завершаемся
    exit 0
  end

  def on_term(pid)
    Process.kill('USR1', pid)
  end
end

MainLoop::ProcessHandler.new(dispatcher, 'worker', runnable: Worker.new)

Оба подхода могут комбинироваться с параметрами (retry_count, logger и т.д.) и одинаково хорошо интегрируются с циклом MainLoop::Loop.

Выбор зависит от удобства: для простых сценариев подойдёт блок, для сложной логики управления завершением — объект с явными методами.

Использование / Usage

Базовая настройка / Basic setup

require 'main_loop'
require 'logger'

logger = Logger.new(STDOUT)
logger.level = Logger::DEBUG

# Шина и диспетчер. Параметр timeout (в секундах) опционален.
bus = MainLoop::Bus.new
dispatcher = MainLoop::Dispatcher.new(bus, timeout: 10, logger: logger)
mainloop = MainLoop::Loop.new(bus, dispatcher, logger: logger)

Обработка процессов / Process handling

Простейший пример / Simplest example

# Процесс test1: будет перезапущен 3 раза, после завершения выходит с кодом 0
MainLoop::ProcessHandler.new dispatcher, 'test1', retry_count: 3, logger: logger do
  sleep 2
  exit! 0
end

# Процесс test2: обрабатывает SIGTERM и выходит с кодом 1 после 2 перезапусков
MainLoop::ProcessHandler.new dispatcher, 'test2', retry_count: 2, logger: logger do
  trap 'TERM' do
    exit(0)
  end
  sleep 2
  exit! 1
end

С блоком кода / With code block

MainLoop::ProcessHandler.new dispatcher, 'worker', retry_count: 3, logger: logger do
  loop do
    # основная логика / main logic
    sleep 1
  end
  exit 0
end

С объектом runnable / With runnable object

class Worker
  def run
    # Используем пользовательский сигнал, чтобы прервать системные вызовы (например, sleep)
    trap('USR1') do
      puts "Получен USR1. pid = #{Process.pid}"
      @stop = true
      raise Interrupt   # прерывает текущий блок (sleep и т.д.)
    end

    @stop = false
    loop do
      puts "работаю..."
      sleep 100
      break if @stop
    rescue Interrupt
      puts "прерывание, выходим"
      break
    end
    exit 0
  end

  def on_term(pid)
    puts "Завершаю работу. pid = #{pid}"
    Process.kill('USR1', pid)   # посылаем пользовательский сигнал

    begin
      Timeout.timeout(5) { Process.wait(pid) }
    rescue Timeout::Error
      puts "не завершился за 5 секунд"
    end

    puts "Завершил работу"
  end
end

worker = Worker.new
MainLoop::ProcessHandler.new(dispatcher, 'worker', runnable: worker, retry_count: :unlimited, logger: logger)

Обработка потоков / Thread handling

Поток с блоком / Thread with block

# Поток thread2: не перезапускается (retry_count: 0), выполняет внешнюю команду
MainLoop::ThreadHandler.new dispatcher, 'thread2', retry_count: 0, logger: logger do
  system('sleep 15;echo ok')
end

Поток с блоком и колбэком завершения / Thread with block and termination callback

MainLoop::ThreadHandler.new dispatcher, 'worker', retry_count: 0, logger: logger do |handler|
  # Устанавливаем обработчик завершения потока
  handler.on_term do
    puts "Завершаем поток, выполняем cleanup..."
    @stop = true
  end

  @stop = false
  loop do
    puts "Работаю..."
    sleep 1
    break if @stop
  end
end

Поток с объектом runnable / Thread with runnable object

class Worker
  def run(thread)
    @stop = false
    loop do
      sleep 60
      break if @stop
    end
  end

  def on_term(thread)
    @stop = true
    thread&.wakeup
  end
end

worker = Worker.new
MainLoop::ThreadHandler.new dispatcher, 'worker', runnable: worker, logger: logger

Запуск цикла / Start loop

# Бесконечный цикл / Infinite loop
mainloop.run

# С таймаутом (30 секунд) / With timeout (30 seconds)
mainloop.run(30)

Обработка завершения / Termination handling

Когда отправляется сигнал TERM или INT:

  1. trap перехватывает сигнал и отправляет bus.puts("sig:TERM")
  2. Loop получает событие из Bus и вызывает Dispatcher#term
  3. Dispatcher устанавливает @terminating_at = Time.now
  4. Все обработчики получают term:
    • ProcessHandler посылает Process.kill('TERM', pid)
    • ThreadHandler вызывает @on_term блок
  5. Если через timeout (по умолчанию 5 сек) процессы не завершились:
    • Dispatcher#tick проверяет need_force_kill?
    • Если true — посылает kill всем обработчикам
  6. Когда все обработчики завершаются (finished?):
    • Dispatcher#try_exit! вызывает exit(@exit_code)

When sending TERM or INT signal:

  1. trap catches the signal and sends bus.puts("sig:TERM")
  2. Loop gets the event from Bus and calls Dispatcher#term
  3. Dispatcher sets @terminating_at = Time.now
  4. All handlers receive term:
    • ProcessHandler sends Process.kill('TERM', pid)
    • ThreadHandler calls @on_term block
  5. If processes don't terminate within timeout (default 5 seconds):
    • Dispatcher#tick checks need_force_kill?
    • If true — sends kill to all handlers
  6. When all handlers finish (finished?):
    • Dispatcher#try_exit! calls exit(@exit_code)

Повторы (retry) / Retry

retry_count: 3      # повторить 3 раза / retry 3 times
retry_count: 0      # не повторять / don't retry
retry_count: :unlimited  # бесконечные повторы / infinite retries

Публикация событий / Publishing events

Обработчики могут отправлять события в шину:


Handlers can publish events to the bus:

# Из любого места обработчика / From any handler place:
publish("reap:#{id}:exited")
publish(:term)

Особенности / Features

  • Потоко-безопасность: Bus и Dispatcher используют MonitorMixin
  • Таймауты: Timeouter используется для timeout в Bus#gets и Loop#start_loop_forever
  • Логирование: все классы принимают параметр logger:, по умолчанию Logger.new(nil)
  • Ошибки: используйте rescue StandardError (не пустой rescue)
  • Коды выхода: exit!(code) в процессах, exit(code) в основном потоке

  • Thread safety: Bus and Dispatcher use MonitorMixin
  • Timeouts: Timeouter is used for timeout in Bus#gets и Loop#start_loop_forever
  • Logging: all classes accept logger: parameter, default is Logger.new(nil)
  • Error handling: use rescue StandardError (not bare rescue)
  • Exit codes: exit!(code) in processes, exit(code) in main thread

Тестирование / Testing

# Запуск всех тестов / Run all tests
bundle exec rspec

# Запуск одного файла / Run single file
bundle exec rspec spec/bus_spec.rb

# Запуск одного теста / Run single test
bundle exec rspec spec/bus_spec.rb:12

# Покрытие кода / Code coverage (96%+)
bundle exec rspec --format progress

Для запуска примеров из локальной копии репозитория (без установки гема) используйте опцию -I интерпретатора Ruby, указав путь к директории lib относительно текущей папки:

ruby -I ./lib examples/имя_файла.rb

Версия / Version

Текущая версия / Current version: 0.1.4

Автор / Author

Юрий Самойленко / Yuri Samoylenko kinnalru@gmail.com

Лицензия / License

Библиотека доступна с открытым исходным кодом в соответствии с условиями лицензии MIT.


The gem is available as open source under the terms of the MIT License.