Project

qswarm

0.0
No commit activity in last 3 years
No release in over 3 years
Defines a DSL to allow stream processing from various sources for output to various sinks
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
 Dependencies

Runtime

 Project Readme

Qswarm - stream processing for Ruby

Qswarm is a Ruby DSL for manipulating real-time streams of messages. It defines three basic concepts - connections, sources, and sinks. Connections emit messages which sources can catch, sinking them to other connections. In this way you can construct data flows between systems and transform messages in-flight with Ruby.

Install with:

gem install qswarm

Agent

Use the agent command to wrap a set of DSL commands.

agent :bob do
  ...
end

Alternatively you could save each agent in a separate file and use a process manager such as supervisord, god, bluepill to manage the swarm.

Connections

Use connect to setup connections to services. Currently Logger, AMQP, XMPP, and Twitter are supported. You can also pass an optional block which will be executed once the connection is set up.

Logger

  connect :mylog,
          :type            => :logger,
          :filename        => 'foo.log'

Logger is a very simple connection type which can be used to append a stream of messages to a file. It can only sink messages (i.e. doesn't not emit any data) and it provides no arguments to sink.

AMQP

  connect :messages,
          :type            => :amqp,
          :uri             => 'guest:guest@localhost:5672/',
          :exchange_type   => :headers,
          :exchange_name   => 'myexchange',
          :exchange_args   => { :durable => true },
          :queue_args      => { :auto_delete => true, :durable => true, :exclusive => true },
          :subscribe_args  => { :exclusive => false, :ack => false },
          :bind_args       => {},
          :prefetch        => 0,
          :bind            => 'foo.bar.#',
          :format          => :json

This sets up an AMQP connection called :messages using the credentials in :uri (user:pass@host:port/vhost) and creates the exchange if it doesn't exist already (using :exchange_args). If a routing key is passed with :bind then a queue will be created with the dotted concatenation of the agent name and the connection name, e.g. bob.messages, and bound to the exchange specified (you can pass :uniq => true if you want a UUID appended to the queue name to make it unique for situations such as load balancing). At the moment you can't bind a queue to an exchange without specifying a routing key in :bind. You can pass configuration to the binding with :bind_args. Similarly :queue_args allow you to pass configuration options to the queue creation. Defaults for *_args are as in the example.

The agent is automatically subscribed to the created queue and you can pass :subscribe_args to configure the subscription. If you specified :ack to be true then you can use :prefetch to specify how many messages you want to have from the queue at a time.

The :format argument determines what Qswarm does with the payloads it receives and how it transforms messages to be sent, see section Payload.

AMQP sets the following headers for source to use as guards:

  • :routing_key
  • Any headers from a headers exchange will be passed verbatim

AMQP supports the following arguments to sink:

  • :routing_key - the key to post the message under
  • :headers - a Hash that will be used instead of the payload.headers for posting to headers exchanges

XMPP

  connect :hipchat,
          :type            => :xmpp,
          :jid             => '54321_123456@chat.hipchat.com',
          :real_name       => 'My bot',
          :channel         => ['54321_lounge@conf.hipchat.com', '54321_chat@conf.hipchat.com'],
          :password        => 'foobar'

The above example connects to an XMPP service called :hipchat using the JID and password provided. The :real_name will be used when joining groupchat rooms and for some services (like Hipchat) needs to match exactly your registered name including case. The script will automatically join the groupchat channel(s) specified in :channel and will use these channels list for sinks which don't specify a channel destination. XMPP support is provided using the Blather library, which means that you can include Blather DSL in connect's block to implement bot behaviours. This block will execute once the connection to the XMPP server has been established (when_ready).

Currently there is no support to source messages from an XMPP connection (i.e. you can only talk not listen) so the Blather DSL is your only option if you want interactivity at the moment.

XMPP supports the following arguments to sink:

  • :channel - an Array or String of the groupchat channel(s) to sink the message to (will join if not already present)

Twitter

  connect :tweetstream,
          :type            => :twitter,
          :consumer_key    => 'YOURKEYHERE',
          :consumer_secret => 'YOURSECRETHERE',
          :oauth_token     => 'YOURTOKENHERE',
          :oauth_token_secret => 'YOURSECRETHERE',
          :track           => {
            :colours         => ['red', 'green', 'blue'],
            :feelings        => ['happy', 'sad'],
            :tech            => ['ruby', 'python'],
          },
          :follow          => {
            :tech            => [11987892]
          },
          :list            => {
            :flibbertigibbets     => { 'Scobleizer' => 'most-influential-in-tech' }
          }

A Twitter connection uses the Tweetstream and Twitter gems and requires oAuth credentials which you can get from dev.twitter.com. There are three options that the Twitter API gives you - you can :track keywords in the global tweet stream using track, you can :follow the full stream of particular users (by twitter ID as Tweetstream doesn't let you use handles), or you can get updates from everyone included on a :list (this uses the REST API and the list is polled every minute).

You specify groups (:colours/:feelings/:tech/:flibbertigibbets above) to allow for easy filtering later on with guards. Twitter messages are always JSON so there is no :format option for connect.

Twitter would set the following example headers for source to use depending on the :type that generated the message.

  • :type => 'track', :group => 'colours|feelings|tech', :matches => [red|green|blue|happy|sad|ruby|python]
  • :type => 'follow', :group => 'tech', :user_id => 11987892
  • :type => 'list', :group => 'flibbertigibbets', :user_id => 'Scobleizer', :slug => 'most-influential-in-tech'

There is currently no support to sink messages to a Twitter connection - i.e. you cannot Tweet.

Payload

Payload is a Hash containing the following data by default:

  • payload.raw
  • payload.data
  • payload.format (set by arguments to the originating connect)
  • payload.headers

When a message is received from a connection, it is accessible in the DSL with payload. The :format option in a connect declares the format of messages emitted by this connection and determines processing that will be applied to the raw payload. What this means is that if the :format is set to JSON, payload.data will be set to a Ruby Hash created by JSON.parse(payload.raw, :symbolize_names => true). If :format is :xml then payload.data will be set to Nokogiri::XML(payload.raw). If :raw then payload.data will equal payload.raw.

Sinks can also set :format to define the reverse as messages are converted back from their Ruby objects for transmission. If no argument is supplied a sink will assume the connect specified value as a default.

Some connection types add payload.headers which will contain a Ruby Hash of relevant data.

Filters and Guards

You can use before and after as filters which will execute on receipt of a message from a specified connection. They will execute before or after your source commands. This example creates a plain text format of a tweet, expanding twitter handles, which can then be used in all source blocks.

before :tweetstream do
    @pp = "<#{payload.data[:user][:name]}/#{payload.data[:user][:screen_name]}> #{payload.data[:text]}"
    payload.data[:entities][:user_mentions].each do |u|
      @pp.gsub!(/@#{u[:screen_name]}/,"<#{u[:name]}/#{u[:screen_name]}>")
    end
  end

Guards (shamelessly copied from Blather) allow you to put conditional execution on before, after, and source blocks by filtering on data passed in payload.headers. Please note that header values are always Strings not Symbols.

The types of guards are:

# Hash with any value
#   Equivalent to payload.headers[:body] == 'exit'
source :messages, :body => 'exit'

# Hash with regular expression
#   Equivalent to payload.headers[:body].match /exit/
source :messages, :body => /exit/

# Hash with array
#   Equivalent to ['gone', 'forbidden'].include?(payload.headers[:name])
source :messages, :name => ['gone', 'forbidden']

# Proc
#   Calls the proc passing in payload.headers
#   Checks that the ID is modulo 3
source :messages, Proc { |header| header[:id] % 3 == 0 }

# Array
#   Use arrays with the previous types effectively turns the guard into
#   an OR statement.
#   Equivalent to payload.headers[:body] == 'foo' || payload.headers[:body] == 'baz'
source :messages, [{:body => 'foo'}, {:body => 'baz'}]

Sources

Sources listen to messages from connections and process them using their blocks which are executed on receipt of a message. The headers available for [guards][Filters and Guards] will be dependant on the connection that sent the message. All sources that match will receive the message and execute.

  source  :tweetstream, :type => 'follow', :user_id => 224662544 do
    if payload.data[:text].match(/A14/)
      ...
    end
  end

The above will listen to messages from the :tweetstream connection. The guards will eliminate any tweet which doesn't come from a :follow (rather than :track or :list) and where the user doesn't match the provided ID which happens to be the Highways Agency twitter account for East of England travel news. The pattern match for A14 is done in the block because the tweet text isn't available in the headers.

Sinks

Sinks publish to connections the output of their blocks. Here's an example of sinking a text message generated from a connection sending XML messages.

    sink  :hipchat,
          :format          => :xml,
          :channels        => ['12345_errors@conf.hipchat.com'] do

      message = payload.data.at_xpath('error')['message']
      "*** ERROR: " + message[0..140] + (message.size > 140 ? ' ... ' : ' ' ) + payload.headers.to_s
    end

In this case the :format argument isn't really needed because a return payload is specified by the block, but if the block was absent Qswarm would use it to know it needed to do a payload.data.to_xml before sending to the connection. You can have multiple sinks in a single source block that will all process the same payload.

Full Example

agent :bob do
  connect :hipchat,
          :type            => :xmpp,
          :jid             => '54321_123456@chat.hipchat.com',
          :channel         => ['54321_lounge@conf.hipchat.com', '54321_chat@conf.hipchat.com'],
          :password        => 'foobar'

  connect :tweetstream,
          :type            => :twitter,
          :consumer_key    => 'YOURKEYHERE',
          :consumer_secret => 'YOURSECRETHERE',
          :oauth_token     => 'YOURTOKENHERE',
          :oauth_token_secret => 'YOURSECRETHERE',
          :track           => {
            :colours         => ['red', 'green', 'blue'],
            :feelings        => ['happy', 'sad'],
            :tech            => ['ruby', 'python'],
          },
          :follow          => {
            :tech            => [11987892]
          },
          :list            => {
            :flibbertigibbets     => { 'Scobleizer' => 'most-influential-in-tech' }
          }

  source  :tweetstream, :type => %w( follow list ) do
    sink  :hipchat,
          :channel => '54321_influencers@conf.hipchat.com'
  end

  source  :tweetstream, :group => 'tech' do
    sink  :hipchat,
          :channel => '54321_cool_stuff@conf.hipchat.com'
  end
end

More examples can be found in these blog posts: