No release in over a year
ActionSubscriber is a DSL that allows a rails app to consume messages from a RabbitMQ broker.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
 Dependencies

Development

Runtime

 Project Readme

Build Status Code Climate Dependency Status Join the chat at https://gitter.im/mxenabled/action_subscriber

ActionSubscriber

ActionSubscriber is a DSL for for easily intergrating your Rails app with a RabbitMQ messaging server.

Requirements

I test on Ruby 2.2.1 and Jruby 9.x. MRI 1.9 and jRuby 1.7 are still supported.

If you want to use MRI 1.9 you will need to lock down the amq-protocol and bunny gems to < 2.0 since they both require ruby 2.0+.

Migrating from ActionSubscriber 3.X or earlier

If you were using the --mode=pop from the 2.X or 3.X version of ActionSubscriber you can get the same sort of behavior by drawing your routes like this:

::ActionSubscriber.draw_routes do
  # instead of creating custom threadpools you set the threadpool size of your connection here in the routes
  # you can set the threadpool size for the default connection via the `::ActionSubscriber.configuration.threadpool_size = 16`
  route UserSubscriber, :created,
    :prefetch => 1,
    :concurrency => 16,
    :acknowledgements => true

  # in user_subscriber.rb make sure to set `at_most_once!` like this
  #
  # class UserSubscriber < ::ActionSubscriber::Base
  #   at_most_once!
  # end

  # If you were previously using custom threadpools for different routes you can mimic that behavior by opening multiple connections
  connection(:slow_work, :threadpool_size => 32) do
    route UserSubscriber, :created,
      :prefetch => 1,
      :concurrency => 32,
      :acknowledgements => true
  end
end

That will give you a similar behavior to the old --mode=pop where messages polled from the server, but with reduced latency.

Supported Message Types

ActionSubscriber support JSON and plain text out of the box, but you can easily add support for any custom message type.

Example

A subscriber is set up by creating a class that inherits from ActionSubscriber::Base.

class UserSubscriber < ::ActionSubscriber::Base
  def created
    # do something when a user is created
  end
end

checkout the examples dir for more detailed examples.

Usage

In your application setup you will draw your subscription routes. In a rails app this is usually done in config/initializers/action_subscriber.rb.

::ActionSubscriber.draw_routes do
  # you can define routes one-by-one for fine-grained controled
  route UserSubscriber, :created

  # or you can setup default routes for all the public methods in a subscriber
  default_routes_for UserSubscriber
end

Now you can start your subscriber process with:

$ bundle exec action_subscriber start

This will connect your subscribers to the rabbitmq broker and allow it to push messages down to your subscribers.

Around Filters

"around" filters are responsible for running their associated actions by yielding, similar to how Rack middlewares work (and Rails around filters work)

class UserSubscriber < ::ActionSubscriber::Base
  around_filter :log_things

  def created
    # do something when a user is created
  end

  private

  def log_things
    puts "before I do some stuff"
    yield
    puts "I did some stuff"
  end
end

Warning: an around filter will only be added once to the chain, duplicate around filters are not supported

Configuration

ActionSubscriber needs to know how to connect to your rabbit server to start getting messages.

In an initializer, you can set the host and the port like this :

ActionSubscriber.configure do |config|
  config.hosts = ["rabbit1", "rabbit2", "rabbit3"]
  config.port = 5672
end

Other configuration options include :

  • config.add_decoder - add a custom decoder for a custom content type
  • config.allow_low_priority_methods - Subscribe to *_low queues in addition to normal queues.
  • config.connection_reaping_interval - Connection reaping interval when using a project ActiveRecord
  • config.connection_reaping_timeout_interval - Connection reaping timeout interval when using a project ActiveRecord
  • config.default_exchange - set the default exchange that your queues will use, using the default RabbitMQ exchange is not recommended
  • config.error_handler - handle error like you want to handle them!
  • config.heartbeat - number of seconds between hearbeats (default 5) see bunny documentation for more details
  • config.hosts - an array of hostnames in your cluster (ie ["rabbit1.myapp.com", "rabbit2.myapp.com"])
  • config.network_recovery_interval - reconnection interval for TCP connection failures (default 1)
  • config.password - RabbitMQ password (default "guest")
  • config.prefetch - number of messages to hold in the local queue in subscriber mode
  • config.resubscribe_on_consumer_cancellation - resubscribe when the consumer is cancelled (queue deleted or cluster fails, default true)
  • config.seconds_to_wait_for_graceful_shutdown - time to wait before force stopping server after shutdown signal
  • config.threadpool_size - set the number of threads available to action_subscriber
  • config.timeout - how many seconds to allow rabbit to respond before timing out
  • config.tls - true/false whether to use TLS when connecting to the server
  • config.tls_ca_certificats - a list of ca certificates to use for verifying the servers TLS certificate
  • config.tls_cert - a client certificate to use during the TLS handshake
  • config.tls_key - a key to use during the TLS handshake
  • config.username - RabbitMQ username (default "guest")
  • config.verify_peer - whether to attempt to validate the server's TLS certificate
  • config.virtual_host - RabbitMQ virtual host (default "/")

Note: TLS is not handled identically in bunny and march_hare. The configuration options we provide are passed through as provided. For details on expected behavior please check the bunny or march_hare documentation based on whether you are running in MRI or jRuby.

Message Acknowledgment

no_acknolwedgement!

This mode is the default. Rabbit is told to not expect any message acknowledgements so messages will be lost if an error occurs. This also allows the broker to send messages as quickly as it wants down to your subscriber.

Warning: If messages arrive very quickly this could cause your process to crash as your memory fills up with unprocessed message. We highly recommend you use at_least_once! mode to provide a throttle so the broker does not overwhelm your process with messages.

manual_acknowledgement!

This mode leaves it up to the subscriber to handle acknowledging or rejecting messages. In your subscriber you can just call acknowledge, reject, or nack.

at_most_once!

Rabbit is told to expect message acknowledgements, but sending the acknowledgement is left up to ActionSubscriber. We send the acknowledgement right before calling your subscriber.

at_least_once!

Rabbit is told to expect message acknowledgements, but sending the acknowledgement is left up to ActionSubscriber. We send the acknowledgement right after calling your subscriber. If an error is raised your message will be retried on a sent back to rabbitmq and retried on an exponential backoff schedule.

safe_nack

If you turn on acknowledgements and a message is not acknowledged by your code manually or using one of the filters above the ErrorHandler middleware which wraps the entire block with call nack this is a last resort so the connection does not get backed up in cases of unexpected or unhandled errors.

redeliver

A message can be sent to "redeliver" with ::ActionSubscriber::MessageRetry.redeliver_message_with_backoff or the DSL method redeliver and optionally takes a "backoff schedule" which is a hash of backoff milliseconds for each redeliver, the default:

  SCHEDULE = {
    2  =>        100,
    3  =>        500,
    4  =>      2_500,
    5  =>     12_500,
    6  =>     62_500,
    7  =>    312_500,
    8  =>  1_562_500,
    9  =>  7_812_500,
    10 => 39_062_500,
  }

when the schedule "returns" nil the message will not be retried

Warning: If you use redeliver you need to handle reject/acknowledge according how errors are handled; if an error is caught and the ack/reject is already done then you may duplicate the message in at_least_once! mode

Testing

ActionSubscriber includes support for easy unit testing with RSpec.

In your spec_helper.rb:

require 'action_subscriber/rspec'

RSpec.configure do |config|
  config.include ::ActionSubscriber::RSpec
end

In your_subscriber_spec.rb : subject { mock_subscriber }

Your test subject will be an instance of your subscriber class, and you can easily test your public methods without dependence on data from Rabbit. You can optionally pass data for your mock subscriber to consume if you wish.

subject { mock_subscriber(:header => "test_header", :payload => "payload") }

Development

If you want to work on action_subscriber you will need to have a rabbitmq instance running locally on port 5672 with a management plugin enabled on port 15672. Usually the easiest way to accomplish this is to use docker and run the command:

$ docker run --net=host --rm=true --hostname diagon --name rabbit rabbitmq:3.6.6-management

Now that rabbitmq is running you can clone this project and run:

$ cd action_subscriber
$ bundle install
$ bundle exec rspec