A long-lived project that still receives updates
Avoid writing boilerplate code in order to consume messages from an AMQP message queue. Plug in queue configuration, and how to process each message.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
 Dependencies

Development

>= 0
>= 0
~> 3.11

Runtime

~> 2.17
 Project Readme

GOV.UK Message Queue Consumer

Gem Version

govuk_message_queue_consumer is a wrapper around the Bunny gem for communicating with RabbitMQ. The user of govuk_message_queue_consumer supplies some configuration and a class that processes messages.

RabbitMQ is a multi-producer, multi-consumer message queue that allows applications to subscribe to notifications published by other applications.

GOV.UK publishing-api publishes a message to RabbitMQ when a ContentItem is added or changed. Other applications (consumers) subscribe to these messages so that they can perform actions such as emailing users or updating a search index.

Several GOV.UK applications use govuk_message_queue_consumer:

Technical documentation

You can browse the API documentation on rubydoc.info.

Release a new version

To release a new version, increment the version number and raise a pull request.

The CI GitHub Actions workflow should automatically build and push the new release when you merge the PR.

Usage

Add the gem to your Gemfile.

Add a rake task like the following example:

# lib/tasks/message_queue.rake
namespace :message_queue do
  desc "Run worker to consume messages from rabbitmq"
  task consumer: :environment do
    GovukMessageQueueConsumer::Consumer.new(
      queue_name: "some-queue",
      processor: MyProcessor.new,
    ).run
  end
end

See the API documentation for the full list of parameters.

GovukMessageQueueConsumer::Consumer expects the RABBITMQ_URL environment variable to be set to an AMQP connection string, for example:

RABBITMQ_URL=amqp://mrbean:hunter2@rabbitmq.example.com:5672

The GOV.UK-specific environment variables RABBITMQ_HOSTS, RABBITMQ_VHOST, RABBITMQ_USER and RABBITMQ_PASSWORD are deprecated. Support for these will be removed in a future version of govuk_message_queue_consumer.

Define a class that will process the messages:

# eg. app/queue_consumers/my_processor.rb
class MyProcessor
  def process(message)
    # do something cool
  end
end

You can start the worker by running the message_queue:consumer Rake task.

bundle exec rake message_queue:consumer

Process a message

Once you receive a message, you must tell RabbitMQ once you've processed it. This is called acking. You can also discard the message, or retry it.

class MyProcessor
  def process(message)
    result = do_something_with(message)

    if result.ok?
      # Ack the message when it has been processed correctly.
      message.ack
    elsif result.failed_temporarily?
      # Retry the message to make RabbitMQ send the message again later.
      message.retry
    elsif result.failed_permanently?
      # Discard the message when it can't be processed.
      message.discard
    end
  end
end

Test your processor

govuk_message_queue_consumer provides a test helper for your processor.

# e.g. spec/queue_consumers/my_processor_spec.rb
require 'test_helper'
require 'govuk_message_queue_consumer/test_helpers'

describe MyProcessor do
  it_behaves_like "a message queue processor"
end

This will verify that your processor class implements the correct methods. You should add your own tests to verify its behaviour.

You can use GovukMessageQueueConsumer::MockMessage to test the processor behaviour. When using the mock, you can verify it acknowledged, retried or discarded. For example, with MyProcessor above:

it "acks incoming messages" do
  message = GovukMessageQueueConsumer::MockMessage.new

  MyProcessor.new.process(message)

  expect(message).to be_acked

  # or if you use minitest:
  assert message.acked?
end

For more test cases see the spec for the mock itself.

Run the test suite

bundle exec rake spec

Further reading

Licence

MIT License

Versioning policy

We follow Semantic versioning. Check the CHANGELOG for changes.