No commit activity in last 3 years
No release in over 3 years
Fork of jruby-kafka that uses lockjar instead of ruby-maven
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
 Dependencies

Development

~> 10.4

Runtime

 Project Readme

Jruby::Kafka

Prerequisites

About

This gem is primarily used to wrap most of the [Kafka 0.8.2.1 high level consumer] and [Kafka 0.8.2.1 producer] API into jruby. The [Kafka Consumer Group Example] is pretty much ported to this library.

Note that the Scala Kafka::Producer will deprecate and Java Kafka::KafkaProducer is taking over.

Installation

This package is now distributed via RubyGems.org but you can build it using the following instructions.

From the root of the project run:

$ bundle install
$ rake setup jar package

You can run the following to install the resulting package:

$ gem install jruby-kafka*.gem

Add this line to your application's Gemfile:

gem 'jruby-kafka'

Usage

If you want to run the tests, make sure you already have downloaded Kafka 0.8.X, followed the kafka quickstart instructions and have KAFKA_PATH set in the environment.

Usage

The following producer code sends a message to a test topic

require 'jruby-kafka'

producer_options = {:broker_list => "localhost:9092", "serializer.class" => "kafka.serializer.StringEncoder"}

producer = Kafka::Producer.new(producer_options)
producer.connect()
producer.send_msg("test", nil, "here's a test message")    

The following consumer example indefinitely listens to the test topic and prints out messages as they are received from Kafka:

require 'jruby-kafka'

consumer_options = {
  :topic_id => "test", 
  :zk_connect => "localhost:2181", 
  :group_id => "my_consumer_group", 
  :reset_beginning => "from-beginning", 
  :auto_offset_reset => "smallest"
}

consumer_group = Kafka::Group.new(consumer_options)
queue = SizedQueue.new(20)
consumer_group.run(1,queue)

count = 0

trap('SIGINT') do
  consumer_group.shutdown()
  puts "Consumed #{count} messages"
  exit
end

loop do
  if !queue.empty?
    puts "#{count}\t#{queue.pop.message.to_s}"
    count += 1
  end
end

Using in logstash:

Check out this repo: https://github.com/joekiller/logstash-kafka

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request