No commit activity in last 3 years
No release in over 3 years
Ruby wrapper around java kafka high-level consumer
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
 Dependencies

Development

~> 10.4

Runtime

 Project Readme

Jruby::Kafka

Prerequisites

About

This gem is primarily used to wrap most of the [Kafka 0.8.2.1 high level consumer] and [Kafka 0.8.2.1 producer] API into jruby. The [Kafka Consumer Group Example] is pretty much ported to this library.

Note that the Scala Kafka::Producer will deprecate and Java Kafka::KafkaProducer is taking over.

Installation

This package is now distributed via RubyGems.org but you can build it using the following instructions.

From the root of the project run:

$ bundle install
$ rake setup jar package

You can run the following to install the resulting package:

$ gem install jruby-kafka*.gem

Add this line to your application's Gemfile:

gem 'jruby-kafka'

Usage

If you want to run the tests, make sure you already have downloaded Kafka 0.8.X, followed the kafka quickstart instructions and have KAFKA_PATH set in the environment.

Usage

The following producer code sends a message to a test topic

require 'jruby-kafka'

producer_options = {:broker_list => "192.168.59.103:9092", "serializer.class" => "kafka.serializer.StringEncoder"}

producer = Kafka::Producer.new(producer_options)
producer.connect()
100.times { |i| producer.send_msg("test", i.to_s, i.to_s) }

The following consumer example indefinitely listens to the test topic and prints out messages as they are received from Kafka:

require 'jruby-kafka'

consumer_options = {
  :topic_id => "test",
  :zk_connect => "192.168.59.103:2181",
  :group_id => "test_group",
  :auto_commit_enable => "#{false}",
  :auto_offset_reset => "smallest",
}

messages = Queue.new

consumer_group = Kafka::Group.new(consumer_options)
consumer_group.run(2) do |message, metadata|
  messages << [message, metadata]
  consumer_group.commit(metadata)
  sleep 0.5
  print message
end

trap('SIGINT') do
  consumer_group.shutdown()
  puts "Consumed #{messages.size} messages"
end

Using in logstash:

Check out this repo: https://github.com/joekiller/logstash-kafka

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request