DispatchPolicy
⚠️ Experimental. The API, schema, and defaults can change between minor releases without notice. DispatchPolicy is currently running in production on pulso.run — that's how we learn what breaks. If you pick it up for your own project, pin the exact version and expect to follow the changelog.
PostgreSQL only (11+). The staging, admission, and fairness machinery lean on
jsonb, partial indexes,FOR UPDATE SKIP LOCKED,ON CONFLICT, andCROSS JOIN LATERAL. MySQL/SQLite support isn't closed off as a goal — being drop-in across every ActiveJob backend is the long-term direction — but it would take meaningful rework (shadow columns forjsonb, full indexes instead of partial, a different batch-fetch strategy for fairness). Contributions welcome.
Per-partition admission control for ActiveJob. Stages perform_later
into a dedicated table, runs a tick loop that admits jobs through
declared gates (throttle, concurrency, global_cap, fair_interleave,
adaptive_concurrency), then forwards survivors to the real adapter.
Use it when you need:
- Per-tenant / per-endpoint throttle that's exact (token bucket) instead of best-effort enqueue-side.
- Per-partition concurrency with a proper release hook on job completion (and lease-expiry recovery if the worker dies mid-perform).
- Adaptive concurrency — a cap that shrinks under queue pressure and grows back when workers keep up, without manual tuning.
- Dedupe against a partial unique index, not an in-memory key.
- Round-robin fairness across tenants (LATERAL batch fetch) so one tenant's burst can't starve the others.
Demo
A runnable playground that exercises every gate and the admin UI lives
at ceritium/dispatch_policy-demo.
Clone it, bundle && rails db:setup, and use the in-browser forms to
fire jobs through throttle / concurrency / adaptive / round-robin
policies while the admin UI updates in real time.
Install
Add to your Gemfile:
gem "dispatch_policy"Copy the migration and run it:
bundle exec rails dispatch_policy:install:migrations
bundle exec rails db:migrate
Mount the admin UI in config/routes.rb (optional):
mount DispatchPolicy::Engine => "/admin/dispatch_policy"Configure in config/initializers/dispatch_policy.rb:
DispatchPolicy.configure do |c|
c.enabled = ENV.fetch("DISPATCH_POLICY_ENABLED", "true") != "false"
c.lease_duration = 15.minutes
c.batch_size = 500
c.round_robin_quantum = 50
c.tick_sleep = 1 # idle
c.tick_sleep_busy = 0.05 # after productive ticks
endFlow
ActiveJob#perform_later
→ Dispatchable#enqueue
→ StagedJob.stage! (insert into dispatch_policy_staged_jobs, pending)
(tick loop, periodically)
→ SELECT pending FOR UPDATE SKIP LOCKED
→ Run gates in declared order; survivors are the admitted set
→ StagedJob#mark_admitted! (increment counters, set admitted_at)
→ job.enqueue(_bypass_staging: true) (hand off to the real adapter)
(worker runs perform)
→ Dispatchable#around_perform
→ block.call
→ release counters, mark StagedJob completed_at, record observation
Declaring a policy
class SendWebhookJob < ApplicationJob
include DispatchPolicy::Dispatchable
dispatch_policy do
# Persisted in the staged row so gates can read it without touching AR.
context ->(args) {
event = args.first
{ endpoint_id: event.endpoint_id, rate_limit: event.endpoint.rate_limit }
}
# Partial unique index dedupes identical keys while the previous is pending.
dedupe_key ->(args) { "event:#{args.first.id}" }
# Tenant fairness — see the "Round-robin" section below.
round_robin_by ->(args) { args.first.account_id }
gate :throttle,
rate: ->(ctx) { ctx[:rate_limit] },
per: 1.minute,
partition_by: ->(ctx) { ctx[:endpoint_id] }
gate :fair_interleave
end
def perform(event) = event.deliver!
endperform_later stages the job; the tick admits it when its gates pass.
Gates
Gates run in declared order, each narrowing the survivor set. Any option
that takes a value can alternatively take a lambda that receives the
ctx hash, so parameters can depend on per-job data.
:concurrency — in-flight cap per partition
Caps the number of admitted-but-not-yet-completed jobs in each
partition. Tracks in-flight counts in
dispatch_policy_partition_counts; decremented by the around_perform
hook when the job finishes, or by the reaper when a lease expires
(worker crashed).
gate :concurrency,
max: ->(ctx) { ctx[:max_per_account] || 5 },
partition_by: ->(ctx) { "acct:#{ctx[:account_id]}" }When to reach for it: external APIs with per-tenant concurrency limits, database-heavy jobs you don't want to pile up per customer, anything where "at most N running at once for this key" matters.
:throttle — token-bucket rate limit per partition
Refills rate tokens every per seconds, capped at burst (defaults
to rate). Admits jobs while tokens are available; leaves the rest
pending for the next tick.
gate :throttle,
rate: 100, # tokens
per: 1.minute, # refill window
burst: 100, # bucket cap (optional, defaults to rate)
partition_by: ->(ctx) { "host:#{ctx[:host]}" }rate and burst accept lambdas, so the limit can come from
configuration stored alongside the thing being rate-limited:
gate :throttle,
rate: ->(ctx) { ctx[:rate_limit] },
per: 1.minute,
partition_by: ->(ctx) { ctx[:endpoint_id] }Unlike :concurrency, throttle does not release tokens on job
completion — tokens refill only with elapsed time.
:global_cap — single cap across all partitions
A global version of :concurrency: at most max jobs admitted
simultaneously across the whole policy, regardless of partition.
Useful as a safety ceiling on top of per-partition limits.
gate :concurrency, max: 10, partition_by: ->(ctx) { ctx[:tenant] }
gate :global_cap, max: 200Reads: "up to 10 in flight per tenant, but never more than 200 total".
:fair_interleave — round-robin ordering across partitions
Not a filter — a reordering step. Groups the batch by its primary partition and interleaves, so no single partition can starve others even if it has many pending jobs.
gate :concurrency, max: 10, partition_by: ->(ctx) { "acct:#{ctx[:account_id]}" }
gate :fair_interleavePlace it after a gate that assigned partitions; interleaving is keyed off the first partition a row picked up.
:adaptive_concurrency — per-partition cap that self-tunes
The cap per partition (current_max) shrinks when the adapter queue
backs up (EWMA of queue lag > target_lag_ms) or when performs raise;
grows back by +1 when lag stays under target. AIMD loop on a
per-partition stats row (dispatch_policy_adaptive_concurrency_stats).
gate :adaptive_concurrency,
partition_by: ->(ctx) { ctx[:account_id] },
initial_max: 3,
target_lag_ms: 1000, # acceptable queue wait before admission
min: 1 # floor so a partition can't lock out
end-
Feedback signal:
admitted_at → perform_start(queue wait in the real adapter). Pure saturation signal — slow performs in the downstream service don't punish admissions if workers still drain the queue quickly. -
Growth: +1 per fast success. No hard ceiling; the algorithm
self-limits via
target_lag_ms. If the queue builds up, the cap shrinks multiplicatively. -
Failure:
current_max *= 0.5(halve) whenperformraises. -
Slow:
current_max *= 0.95when EWMA lag > target.
Choosing target_lag_ms
It's the knob that trades latency for throughput. Rough guide:
- Too low (e.g. 10-50 ms). The gate reacts to every tiny bump in queue wait and shrinks the cap aggressively. Workers can end up idle with jobs still pending admission because the cap is overcorrecting — classic contention / overshoot.
- Too high (e.g. 30 s). The gate barely ever pushes back, so you get near-maximum throughput at the cost of real queue buildup; newly admitted jobs may wait seconds before a worker picks them up.
-
Reasonable starting point:
≈ worker_max_threads × avg_perform_ms. If you run 5 workers at ~200 ms/perform,target_lag_ms: 1000means "it's OK if the adapter queue stays at most ~1 second deep". You'll want to tune from there based on what your downstream tolerates and how fast you want bursts to drain.
Pair it with round_robin_by for multi-tenant systems that want
automatic backpressure without hand-tuned caps per tenant:
round_robin_by ->(args) { args.first[:account_id] }
gate :adaptive_concurrency,
partition_by: ->(ctx) { ctx[:account_id] },
initial_max: 3,
target_lag_ms: 1000Queues and partitioning
DispatchPolicy operates at the policy (class) level. A job's
ActiveJob queue and priority travel through staging into admission
and on to the real adapter — workers of each queue pick up their jobs
normally — but neither affects which staged rows the gates see. All
enqueues of the same job class share one policy, one throttle bucket,
one concurrency cap.
Two consequences to be aware of:
- Enqueuing the same job to different queues does not give one
queue priority at admission; they share the policy's gates. If
urgent work should jump ahead, set a lower ActiveJob
priority(the admission SELECT isORDER BY priority, staged_at) — or split into a subclass with its own policy. -
dedupe_keyis queue-agnostic: the same key enqueued to:urgentand:lowdedupes to one row.
Using queue as a partition
The context hash has queue_name and priority injected automatically
at stage time (user-supplied keys win). Use them in any partition_by:
class SendEmailJob < ApplicationJob
include DispatchPolicy::Dispatchable
dispatch_policy do
context ->(args) { { account_id: args.first.account_id } }
# Separate throttle bucket per (queue, account) — urgent and default
# don't share rate tokens.
gate :throttle,
rate: 100,
per: 1.minute,
partition_by: ->(ctx) { "#{ctx[:queue_name]}:#{ctx[:account_id]}" }
end
end
SendEmailJob.set(queue: :urgent).perform_later(user)
SendEmailJob.set(queue: :default).perform_later(user)
# → two partitions, each with its own bucket.If you'd rather keep the two streams fully isolated (separate policies, admin rows, and dedupe scopes), subclass:
class UrgentEmailJob < SendEmailJob
queue_as :urgent
dispatch_policy do
context ->(args) { { account_id: args.first.account_id } }
gate :throttle, rate: 500, per: 1.minute, partition_by: ->(ctx) { ctx[:account_id] }
end
endDedupe
dedupe_key is enforced by a partial unique index on
(policy_name, dedupe_key) WHERE completed_at IS NULL. Semantics:
- Re-enqueuing while a previous staged row is pending or admitted → silently dropped.
- Re-enqueuing after the previous completes → fresh staged row.
- Returning
nilfrom the lambda → no dedup for that enqueue.
Typical pattern: "<domain>:<entity>:<id>" ("monitor:42",
"event:abc123"). Keep it stable for the duration of a logical unit
of work.
Round-robin batching (tenant fairness)
For policies where every tenant should keep making progress even
when one suddenly enqueues 100× its normal volume, neither throttle
nor concurrency is a good fit — you want max throughput, just
fairness. round_robin_by solves it at the batch SELECT layer:
dispatch_policy do
context ->(args) { { account_id: args.first.account_id } }
round_robin_by ->(args) { args.first.account_id }
endAt stage time the lambda's result is written into the dedicated
round_robin_key column (indexed). Tick.run then uses a two-phase
fetch:
-
LATERAL join — distinct keys × per-key
LIMIT round_robin_quantum. Guarantees each active tenant gets at leastquantumrows per tick, so a tenant with 10 pending is served in the same tick as a tenant with 50k pending. -
Top-up — if the fairness floor doesn't fill
batch_size, the remaining slots go to the oldest pending (excluding the ids already locked). Keeps single-tenant throughput at full capacity.
Cost per tick is O(quantum × active_keys), not O(backlog) — so the
admin stays snappy even with thousands of distinct tenants.
Running the tick
The gem exposes DispatchPolicy::TickLoop.run(policy_name:, stop_when:)
but does not ship a tick job — concurrency semantics are
queue-adapter specific (GoodJob's total_limit, Sidekiq Enterprise
uniqueness, etc.), so you write a small job in your app that wraps
the loop with whatever dedup your adapter provides. Example for
GoodJob:
# app/jobs/dispatch_tick_loop_job.rb
class DispatchTickLoopJob < ApplicationJob
include GoodJob::ActiveJobExtensions::Concurrency
good_job_control_concurrency_with(
total_limit: 1,
key: -> { "dispatch_tick_loop:#{arguments.first || 'all'}" }
)
def perform(policy_name = nil)
deadline = Time.current + DispatchPolicy.config.tick_max_duration
DispatchPolicy::TickLoop.run(
policy_name: policy_name,
stop_when: -> {
GoodJob.current_thread_shutting_down? || Time.current >= deadline
}
)
# Self-chain so the next run starts immediately; cron below is a safety net.
DispatchTickLoopJob.set(wait: 1.second).perform_later(policy_name)
end
endSchedule it (every 10s as a safety net — the self-chain keeps one alive under normal operation):
# config/application.rb
config.good_job.cron = {
dispatch_tick_loop: {
cron: "*/10 * * * * *",
class: "DispatchTickLoopJob"
}
}For adapters without a first-class dedup mechanism, implement it
yourself (e.g. pg_try_advisory_lock inside perform) before calling
DispatchPolicy::TickLoop.run.
Admin UI
DispatchPolicy::Engine ships a read-only admin mounted wherever
you like. Features:
- Policy index with pending / admitted / completed-24h totals.
- Per-policy page with a partition breakdown (watched + searchable list) showing pending-eligible / pending-scheduled / in-flight / completed / adaptive cap / EWMA latency / last enqueue / last dispatch per partition.
- Line chart of avg EWMA queue lag (last hour, per minute) with completions-per-minute bars behind it.
- Per-partition sparkline with the same overlay; click to watch /
unwatch. Watched set is persisted in
localStorageand synced into the URL so reloading keeps your view. - Opt-in auto-refresh (off / 2s / 5s / 15s) stored in
localStorage. Page updates via Turbo morph — scroll position and tooltips survive.
Testing
bundle install
bundle exec rake test
Tests require a PostgreSQL instance (uses ON CONFLICT, partial
indexes, FOR UPDATE SKIP LOCKED, jsonb). PGUSER / PGHOST /
PGPASSWORD env vars override the defaults in
test/dummy/config/database.yml.
License
MIT.