GOV.UK Message Queue Consumer
govuk_message_queue_consumer is a wrapper around the Bunny gem for communicating with RabbitMQ. The user of govuk_message_queue_consumer supplies some configuration and a class that processes messages.
RabbitMQ is a multi-producer, multi-consumer message queue that allows applications to subscribe to notifications published by other applications.
GOV.UK publishing-api publishes a message to RabbitMQ when a ContentItem is added or changed. Other applications (consumers) subscribe to these messages so that they can perform actions such as emailing users or updating a search index.
Several GOV.UK applications use govuk_message_queue_consumer:
Technical documentation
You can browse the API documentation on rubydoc.info.
Release a new version
To release a new version, increment the version number and raise a pull request.
The CI GitHub Actions workflow should automatically build and push the new release when you merge the PR.
Usage
Add a rake task like the following example:
# lib/tasks/message_queue.rake
namespace :message_queue do
desc "Run worker to consume messages from rabbitmq"
task consumer: :environment do
GovukMessageQueueConsumer::Consumer.new(
queue_name: "some-queue",
processor: MyProcessor.new,
).run
end
endSee the API documentation for the full list of parameters.
GovukMessageQueueConsumer::Consumer expects the RABBITMQ_URL environment
variable
to be set to an AMQP connection string, for example:
RABBITMQ_URL=amqp://mrbean:hunter2@rabbitmq.example.com:5672The GOV.UK-specific environment variables RABBITMQ_HOSTS, RABBITMQ_VHOST,
RABBITMQ_USER and RABBITMQ_PASSWORD are deprecated. Support for these will
be removed in a future version of govuk_message_queue_consumer.
Define a class that will process the messages:
# eg. app/queue_consumers/my_processor.rb
class MyProcessor
def process(message)
# do something cool
end
endYou can start the worker by running the message_queue:consumer Rake task.
bundle exec rake message_queue:consumerProcess a message
Once you receive a message, you must tell RabbitMQ once you've processed it. This is called acking. You can also discard the message, or retry it.
class MyProcessor
def process(message)
result = do_something_with(message)
if result.ok?
# Ack the message when it has been processed correctly.
message.ack
elsif result.failed_temporarily?
# Retry the message to make RabbitMQ send the message again later.
message.retry
elsif result.failed_permanently?
# Discard the message when it can't be processed.
message.discard
end
end
endTest your processor
govuk_message_queue_consumer provides a test helper for your processor.
# e.g. spec/queue_consumers/my_processor_spec.rb
require 'test_helper'
require 'govuk_message_queue_consumer/test_helpers'
describe MyProcessor do
it_behaves_like "a message queue processor"
endThis will verify that your processor class implements the correct methods. You should add your own tests to verify its behaviour.
You can use GovukMessageQueueConsumer::MockMessage to test the processor
behaviour. When using the mock, you can verify it acknowledged, retried or
discarded. For example, with MyProcessor above:
it "acks incoming messages" do
message = GovukMessageQueueConsumer::MockMessage.new
MyProcessor.new.process(message)
expect(message).to be_acked
# or if you use minitest:
assert message.acked?
endFor more test cases see the spec for the mock itself.
Run the test suite
bundle exec rake specFurther reading
- Bunny is the RabbitMQ client we use.
- The Bunny Guides explain AMQP concepts.
Licence
Versioning policy
We follow Semantic versioning. Check the CHANGELOG for changes.