Project

hermann

0.06
Repository is archived
No commit activity in last 3 years
No release in over 3 years
Ruby gem for talking to Kafka
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
 Dependencies

Runtime

~> 1.8.2
~> 0.6.0
~> 0.3.4
 Project Readme

Hermann

Gitter chat Build Status

A Ruby gem implementing a Kafka Publisher and Consumer

On MRI (C-based Ruby), this library wraps the librdkafka library which is implemented in C.

On JRuby this library declares jar dependencies inside the .gemspec to express dependencies on the Java-based Kafka library provided by the Kafka project. Tools like jbundler will handle these declarations correctly.

Usage

Usage is modelled on the kafka-rb gem and is fairly straightforward.

  • Kafka 0.8 is supported.
  • Ruby 1.9.3, 2.1.1 and JRuby are tested against

Producer

Zookeeper discovery

Discover Kafka brokers through zookeeper. Looks at /brokers/ids in Zookeeper to find the list of brokers.

require 'hermann/producer'
require 'hermann/discovery/zookeeper'

broker_ids_array = Hermann::Discovery::Zookeeper.new('localhost:2181').get_brokers
producer = Hermann::Producer.new('topic', broker_ids_array)

promise = producer.push('hello world') # send message to kafka
promise.value                          # forces the Concurrent::Promise to finish excuting (#value!)
promise.state                          # the state of the promise

MRI only

require 'hermann/producer'

broker_ids_array = Hermann::Discovery::Zookeeper.new('localhost:2181').get_brokers
p = Hermann::Producer.new('topic', broker_ids_array)  # arguments topic, list of brokers
f = p.push('hello world from mri')
f.state
p.tick_reactor
f.state

Consumer

Messages can be consumed by calling the consume method and passing a block to handle the yielded messages. The consume method blocks, so take care to handle that functionality appropriately (i.e. use Concurrent::Promise, Thread, etc).

(JRuby)

require 'hermann'
require 'hermann/consumer'
require 'hermann_jars'

topic     = 'topic'
new_topic = 'other_topic'

the_consumer = Hermann::Consumer.new(topic, zookeepers: "localhost:2181", group_id: "group1")

the_consumer.consume(new_topic) do |msg|   # can change topic with optional argument to .consume
  puts "Recv: #{msg}"
end

(MRI)

MRI currently has no zookeeper / client group support.

require 'hermann'
require 'hermann/consumer'

topic     = 'topic'
new_topic = 'other_topic'

the_consumer = Hermann::Consumer.new(topic, brokers: "localhost:9092", partition: 1)

the_consumer.consume(new_topic) do |msg, key, offset|   # can change topic with optional argument to .consume
  puts "Recv: #{msg}, key: #{key}, offset: #{offset}"
end

Metadata request (MRI-only)

Topic and cluster metadata may be retrieved in the MRI version by querying the Kafka brokers.

require 'hermann'
require 'hermann/discovery/metadata'

c = Hermann::Discovery::Metadata.new( "localhost:9092" )
topic = c.topic("topic")

puts topic.partitions.first

consumers = topic.partitions.map do |partition|
  partition.consumer
end

Build & Unit Test

First time (from a clean repository): bundle install && bundle exec rake

Thereafter: bundle exec rake spec

Testing

To run the integration tests:

  • startup your own instance of zookeeper/kafka
  • rspec spec/integration/producer_spec.rb

How to convert from using jruby-kafka

  • Gemfile
    • remove jruby-kafka
    • add gem "hermann"
    • bundle install
  • Jarfile
    • removed unecessary jars from your Jarfile (i.e. kafka, log4j)
    • jar dependencies are automatically included with Hermann
    • jbundle install
  • Test out one of the Producer/Consumer examples above