A long-lived project that still receives updates
This gem is used for producing Kafka messages. It represents a wrapper over Waterdrop gem and is recommended for using as a transport with sbmt-outbox
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
 Dependencies
 Project Readme

Gem Version Build Status

Sbmt-KafkaProducer

This gem is used to produce Kafka messages. It is a wrapper over the waterdrop gem, and it is recommended for use as a transport with the sbmt-outbox gem.

Installation

Add this line to your app's Gemfile:

gem "sbmt-kafka_producer"

And then execute:

bundle install

Demo

Learn how to use this gem and how it works with Ruby on Rails at here https://github.com/Kuper-Tech/outbox-example-apps

Auto configuration

We recommend going through the configuration and file creation process using the following Rails generators. Each generator can be run by using the --help option to learn more about the available arguments.

Initial configuration

If you plug the gem into your application for the first time, you can generate the initial configuration:

rails g kafka_producer:install

As a result, the config/kafka_producer.yml file will be created.

Producer class

A producer class can be generated with the following command:

rails g kafka_producer:producer MaybeNamespaced::Name sync topic

As the result, a sync producer will be created.

Outbox producer

To generate an Outbox producer for use with Gem sbmt-Outbox, run the following command:

rails g kafka_producer:outbox_producer SomeOutboxItem

Manual configuration

The config/kafka_producer.yml file is the main configuration for this gem.

default: &default
  deliver: true
  # see more options at https://github.com/karafka/waterdrop/blob/master/lib/waterdrop/config.rb
  wait_on_queue_full: true
  max_payload_size: 1000012
  max_wait_timeout: 60000
  auth:
    kind: plaintext
  kafka:
    servers: "kafka:9092" # required
    max_retries: 2 # optional, default: 2
    required_acks: -1 # optional, default: -1
    ack_timeout: 1000 # in milliseconds, optional, default: 1000
    retry_backoff: 1000 # in milliseconds, optional, default: 1000
    connect_timeout: 2000 # in milliseconds, optional, default: 2000
    message_timeout: 55000 # in milliseconds, optional, default: 55000
    # kafka_config: # optional, low-level custom Kafka options (see https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)
    #   queue.buffering.max.messages: 100000
    #   queue.buffering.max.ms: 5

development:
  <<: *default

test:
  <<: *default
  deliver: false
  wait_on_queue_full: false

production:
  <<: *default

auth config section

The gem supports 2 variants: plaintext (default) and SASL-plaintext

SASL-plaintext:

auth:
  kind: sasl_plaintext
  sasl_username: user
  sasl_password: pwd
  sasl_mechanism: SCRAM-SHA-512

If you need to use another variant, use the low-level custom Kafka options kafka_config: of config/kafka_producer.yml. These options will overwrite the options in the auth section.

Example of SASL_SSL protocol auth via kafka_config:

kafka_config:
  security.protocol: SASL_SSL
  sasl.username: user
  sasl.password: pwd
  ssl.ca.pem: ca_cert
  sasl.mechanism: SCRAM-SHA-512

kafka config section

The servers key is required and should be in rdkafka format: without kafka:// prefix, for example: srv1:port1,srv2:port2,....

The kafka_config section may contain any rdkafka option

Producer class

To create a producer that will be responsible for sending messages to Kafka, copy the following code:

# app/producers/some_producer.rb
class SomeProducer < Sbmt::KafkaProducer::BaseProducer
  option :topic, default: -> { "topic" }

  def publish(payload, **options)
    sync_publish(payload, options)
    # async_publish(payload, options)
  end
end

Outbox producer config

Add the following lines to your config/outbox.yml file in the transports section:

outbox_items:
  some_outbox_item:
    transports:
      sbmt/kafka_producer:
        topic: 'topic'
        kafka: # optional kafka options
          required_acks: -1

Middlewares

Middleware is code configured to run before/after a message is produced.

To add middleware, specify it in the configuration

# config/initializers/kafka_producer.rb

Sbmt::KafkaProducer.middlewares.push(
  MyMiddleware
)

# path/to/middlewares

class MyMiddleware
  def call(payload, options)
    "Before producing"
    result = yield
    "After producing"

    result
  end
end

⚠️ note the method must return the result

Usage

To send a message to a Kafka topic, execute the following command:

SomeProducer.new.publish(payload, key: "123", headers: {"some-header" => "some-value"})

Metrics

The gem collects base producing metrics using Yabeda. See metrics at YabedaConfigurer.

Testing

To stub a producer request to real Kafka broker, you can use a fake class. To do this, please add require "sbmt/kafka_producer/testing" to the spec/rails_helper.rb.

Development

Install dip.

And run:

dip provision
dip rspec