A long-lived project that still receives updates
Avoid writing boilerplate code in order to consume messages from an AMQP message queue. Plug in queue configuration, and how to process each message.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
 Dependencies

Development

Runtime

~> 2.11
 Project Readme

GOV.UK Message Queue Consumer

Gem Version

Standardises the way GOV.UK consumes messages from RabbitMQ. RabbitMQ is a messaging framework that allows applications to broadcast messages that can be picked up by other applications.

On GOV.UK, publishing-api publishes the content-items it receives, so that applications such as email-alert-service can be notified of changes in content.

For detailed documentation, check out the gem documentation on rubydoc.info.

This gem is used by:

Overview of RabbitMQ

To see an overview of RabbitMQ and how we use it, see here.

Technical documentation

This is a ruby gem that deals with the boiler plate code of communicating with RabbitMQ. The user of this gem is left the task of supplying the configuration and a class that processes messages.

The gem is automatically released by Jenkins. To release a new version, raise a pull request with the version number incremented.

Dependencies

  • The Bunny gem: to interact with RabbitMQ.

Usage

Add the gem to your Gemfile.

Add a rake task like the following example:

# lib/tasks/message_queue.rake
namespace :message_queue do
  desc "Run worker to consume messages from rabbitmq"
  task consumer: :environment do
    GovukMessageQueueConsumer::Consumer.new(
      queue_name: "some-queue",
      processor: MyProcessor.new,
    ).run
  end
end

More options are documented here.

The consumer expects a number of environment variables to be present. On GOV.UK, these should be set up in puppet.

RABBITMQ_HOSTS=rabbitmq1.example.com,rabbitmq2.example.com
RABBITMQ_VHOST=/
RABBITMQ_USER=a_user
RABBITMQ_PASSWORD=a_super_secret

Define a class that will process the messages:

# eg. app/queue_consumers/my_processor.rb
class MyProcessor
  def process(message)
    # do something cool
  end
end

The worker should also be added to the Procfile to run in production:

# Procfile
worker: bundle exec rake message_queue:consumer

Because you need the environment variables when running the consumer, you should use govuk_setenv to run your app in development:

$ govuk_setenv app-name bundle exec rake message_queue:consumer

Processing a message

Once you receive a message, you must tell RabbitMQ once you've processed it. This is called acking. You can also discard the message, or retry it.

class MyProcessor
  def process(message)
    result = do_something_with(message)

    if result.ok?
      # Ack the message when it has been processed correctly.
      message.ack
    elsif result.failed_temporarily?
      # Retry the message to make RabbitMQ send the message again later.
      message.retry
    elsif result.failed_permanently?
      # Discard the message when it can't be processed.
      message.discard
    end
  end
end

Statsd integration

You can pass a statsd_client to the GovukMessageQueueConsumer::Consumer initializer. The consumer will emit counters to statsd with these keys:

  • your_queue_name.started - message picked up from the your_queue_name
  • your_queue_name.retried - message has been retried
  • your_queue_name.acked - message has been processed and acked
  • your_queue_name.discarded - message has been discarded
  • your_queue_name.uncaught_exception - an uncaught exception occured during processing

Remember to use a namespace for the Statsd client:

statsd_client = Statsd.new("localhost")
statsd_client.namespace = "govuk.app.my_app_name"

GovukMessageQueueConsumer::Consumer.new(
  statsd_client: statsd_client
  # ... other setup code omitted
).run

Testing your processor

This gem provides a test helper for your processor.

# eg. spec/queue_consumers/my_processor_spec.rb
require 'test_helper'
require 'govuk_message_queue_consumer/test_helpers'

describe MyProcessor do
  it_behaves_like "a message queue processor"
end

This will verify that your processor class implements the correct methods. You should add your own tests to verify its behaviour.

You can use GovukMessageQueueConsumer::MockMessage to test the processor behaviour. When using the mock, you can verify it acknowledged, retried or discarded. For example, with MyProcessor above:

it "acks incoming messages" do
  message = GovukMessageQueueConsumer::MockMessage.new

  MyProcessor.new.process(message)

  expect(message).to be_acked

  # or if you use minitest:
  assert message.acked?
end

For more test cases see the spec for the mock itself.

Running the test suite

bundle exec rake spec

Further reading

  • Bunny is the RabbitMQ client we use.
  • The Bunny Guides explain all AMQP concepts really well.
  • The Developer Docs documents the usage of "heartbeat" messages, which this gem also supports.

Licence

MIT License

Versioning policy

We follow Semantic versioning. Check the CHANGELOG for changes.