Provides client and server support for RabbitMQ
Installation
gem "rabbit_messaging"$ bundle install
# --- or ---
$ gem install "rabbit_messaging"require "rabbit_messaging"Usage
- Configuration
- Client
- Server
Configuration
-
RabbitMQ connection configuration fetched from the
bunny_optionssection of/config/sneakers.yml -
Rabbit.configprovides setters for following options:-
group_id(Symbol), requiredShared identifier which used to select api. As usual, it should be same as default project_id (I.e. we have project 'support', which runs only one application in production. So on, it's group_id should be :support)
-
project_id(Symbol), requiredPersonal identifier which used to select exact service. As usual, it should be same as default project_id with optional stage_id. (I.e. we have project 'support', in production it's project_id is :support, but in staging it uses :support1 and :support2 ids for corresponding stages)
-
queue_suffix(String)Optional suffix added to the read queue name. For example, in case of
group_id = "grp",project_id = "prj"andqueue_suffix = "sfx", Rabbit will read from queue named"grp.prj.sfx". -
exception_notifier(Proc) You must provide your own notifier like this to notify about exceptions:config.exception_notifier = proc { |e| MyCoolNotifier.notify!(e) }
-
hooks(Hash):before_fork and :after_fork hooks, used same way as in unicorn / puma / que / etc
-
environment(one of:test,:development,:production), *default:-:productionInternal environment of gem.
-
:testenvironment stubs publishing and does not suppress errors -
:developmentenvironment auto-creates queues and uses default exchange -
:productionenvironment enables handlers caching and gets maximum strictness
By default gem skips publishing in test and development environments. If you want to change that then manually set
Rabbit.skip_publishing_inwith an array of environments.Rabbit.skip_publishing_in = %i[test]
-
-
receiving_job_class_callable(Proc)Custom ActiveJob subclass to work with received messages. Receives the following attributes as
kwarg-arguments:-
:arguments- information about message type (type), application id (app_id), message id (message_id); -
:delivery_info- information aboutexchange,routing_key, etc; -
:message- received RabbitMQ message (often in astringformat);
{ message: '{"hello":"world","foo":"bar"}', delivery_info: { exchange: "some exchange", routing_key: "some_key" }, arguments: { type: "some_successful_event", app_id: "some_group.some_app", message_id: "uuid", } }
-
-
publishing_job_class_callable(Proc)Custom job class (e.g. ActiveJob or Sidekiq::Job) to work with published messages.
-
default_publishing_job_queue(StringorSymbol)The name of the queue that will be used by default for publishing jobs.
defaultby default. -
before_receiving_hooks, after_receiving_hooks(Array of Procs)Before and after hooks with message processing in the middle. Where
before_receiving_hooksandafter_receiving_hooksare empty arrays by default.It's advised to NOT place procs with long execution time inside.
Setup:
config.before_receiving_hooks.append(proc { |message, arguments| do_stuff_1 }) config.before_receiving_hooks.append(proc { |message, arguments| do_stuff_2 }) config.after_receiving_hooks.append(proc { |message, arguments| do_stuff_3 }) config.after_receiving_hooks.append(proc { |message, arguments| do_stuff_4 })
-
use_backoff_handler(Boolean)If set to
true, useExponentialBackoffHandler. You will also need add the following line to your Gemfile:gem "sneakers_handlers", github: "umbrellio/sneakers_handlers"
See https://github.com/umbrellio/sneakers_handlers for more details.
-
backoff_handler_max_retries(Integer)Number of retries that
ExponentialBackoffHandlerwill use before sending job to the error queue. 5 by default. -
connection_reset_exceptions(Array)Exceptions for reset connection. Default: [
Bunny::ConnectionClosedError].
config.connection_reset_exceptions << MyInterestingException
-
connection_reset_max_retries(Integer)Maximum number of reconnection attempts after a connection loss. Default: 10.
config.connection_reset_max_retries = 20
-
connection_reset_timeout(Float)The timeout duration before attempting to reset the connection. Default: 0.2 sec.
config.connection_reset_timeout = 0.2
-
logger_message_size_limit(Integer)Maximum logger message size. Split message to parts if message is more than limit. Default: 9500.
config.logger_message_size_limit = 9500
-
Client
Rabbit.publish(
{
routing_key: :support,
event: :ping,
data: { foo: :bar }, # default is {}
exchange_name: 'fanout', # default is fine too
confirm_select: true, # setting this to false grants you great speed up and absolutelly no guarantees
headers: { "foo" => "bar" }, # custom arguments for routing, default is {}
message_id: "asdadsadsad", # A unique identifier such as a UUID that your application can use to identify the message.
},
custom_queue_name: :my_custom_queue, # The name of the queue for publishing jobs. Overrides the default queue.
)-
This code sends messages via basic_publish with following parameters:
-
routing_key:"support" -
exchange:"group_id.project_id.fanout"(default is"group_id.poject_id") -
mandatory:true(same as confirm_select)It is set to raise error if routing failed
-
persistent:true -
type:"ping" -
content_type:"application/json"(always) -
app_id:"group_id.project_id"
-
-
Messages are logged to
/log/rabbit.log
Server
-
Server is supposed to run inside a daemon via the
daemons-railsgem. Server is run withRabbit::Daemon.run.before_forkandafter_forkprocs inRabbit.configare used to teardown and setup external connections between daemon restarts, for example ORM connections -
After the server runs, received messages are handled by
Rabbit::EventHandlersubclasses in two possible ways:- a) Subclasses are selected by following code(by default):
rabbit/handler/#{group_id}/#{event}".camelize.constantize
- b) you can change default behaviour to your own logic by setting the
handler_resolver_callableconfig option with aProcthat should return the handler class:Rabbit.config.handler_resolver_callable = -> (group_id, event) { "recivers/#{group_id}/#{event}".camelize.constantize }
If you wish so, you can override
initialize(message), where message is an object with simple api (@see lib/rabbit/receiving/message.rb)Handlers can specify a queue their messages will be put in via a
queue_asclass macro (accepts a string / symbol / block with|message, arguments|params) - a) Subclasses are selected by following code(by default):
-
Received messages are logged to
/log/sneakers.log, malformed messages are logged to/log/malformed_messages.logand deleted from queue
Contributing
Bug reports and pull requests are welcome on GitHub at https://github.com/umbrellio/rabbit_messaging.
License
Released under MIT License
Authors
Team Umbrellio