RubyKafkaRetry
The ruby_kafka_retry gem provides a mechanism to handle message retries and dead-letter queue (DLQ) functionality in Ruby applications using Kafka. It ensures messages are retried with an increasing delay before being sent to a DLQ.
Installation
Add this line to your application's Gemfile:
gem 'ruby_kafka_retry', '~> 0.1.0'And then execute:
$ bundle install
Or install it yourself as:
$ gem install ruby_kafka_retry
Usage
Retrying Messages
To handle message retries, use the RubyKafkaRetry::RetryFailedEvent class. This class allows you to specify the retry topic, DLQ topic, and the message to be retried, along with an optional maximum retry attempt count.
retry_topic = 'my_retry_topic'
dlq_topic = 'my_dlq_topic'
topic_message = { key: 'value' } # The message to be processed
max_retry_attempt = 5 # Optional parameter, default is 3 if not provided
retry_event = RubyKafkaRetry::RetryFailedEvent.new(retry_topic, dlq_topic, topic_message, max_retry_attempt)
retry_event.retryDetailed Description
-
First Retry Attempt: If the topic_message does not include the
current_retry_attemptkey, the gem considers it as the first retry attempt andcurrent_retry_attemptwill be appended to thetopic_messagewith the value as 1. The modifiedtopic_messagewill then be published to the retry_topic. -
Message Format: The
topic_messagemust be a hash. If a non-hash object is passed, the gem will raise an error:raise TypeError, 'topic_message must be a Hash'
-
Retry Logic:
- If the
current_retry_attemptvalue in the topic_message reaches themax_retry_attemptcount, the message will be published to the DLQ topic. - If the
current_retry_attemptvalue is less than themax_retry_attempt, thecurrent_retry_attemptvalue will be incremented, and the message will be republished to theretry_topicafter a delay. - The delay before republishing is calculated as
2 ** current_retry_attemptminutes. - The
max_retry_attemptparameter is optional. If it is not provided, the default value is3.
- If the
Example Workflow
Here's a step-by-step example workflow:
- A message
topic_message = { key: 'value' }is received and processed. - If processing fails, it triggers a retry:
-
current_retry_attemptkey is added to the message if not present. - Message becomes
{ key: 'value', current_retry_attempt: 1 }.
-
- The message is published to the
retry_topicafter a delay of 2 ** 1 (2 minutes). - If processing fails again, current_retry_attempt is incremented, and the message is republished after a delay of 2 ** 2 (4 minutes).
- This continues until
current_retry_attemptreachesmax_retry_attempt. - Once
max_retry_attemptis reached, the message is published to the DLQ topic.
Configuration
You need to configure the gem by creating a YAML configuration file at config/ruby_kafka_retry.yml. This file should contain the following settings:
development:
client_id: "my_kafka_client_id"
brokers:
- "localhost:9092"
ssl_ca_certs_from_system: false
redis_host: "127.0.0.1"
redis_db: "10"
redis_port: "6379"
sidekiq_queue: "test_retry_queue"
stage:
client_id: "my_kafka_client_id"
brokers:
- "localhost:9092"
ssl_ca_certs_from_system: false
redis_host: "127.0.0.1"
redis_db: "10"
redis_port: "6379"
sidekiq_queue: "test_retry_queue"
production:
client_id: "my_kafka_client_id"
brokers:
- "localhost:9092"
ssl_ca_certs_from_system: false
redis_host: "127.0.0.1"
redis_db: "10"
redis_port: "6379"
sidekiq_queue: "test_retry_queue"add the same sidekiq_queue in sidekiq.yml file as well
Dependencies
The ruby_kafka_retry gem depends on the following gems:
- ruby-kafka
- sidekiq
Running Services
To use this gem, ensure the following services are running in the background:
- Kafka Server: Ensure your Kafka server is up and running.
- Sidekiq Server: Start your Sidekiq server to handle background job processing.
Development
After checking out the repo, run bin/setup to install dependencies. You can also run bin/console for an interactive prompt that will allow you to experiment.
To install this gem onto your local machine, run bundle exec rake install. To release a new version, update the version number in version.rb, and then run bundle exec rake release, which will create a git tag for the version, push git commits and the created tag, and push the .gem file to rubygems.org.
Contributing
Bug reports and pull requests are welcome on GitHub at https://github.com/veeraveeraveera/ruby_kafka_retry.