No release in over 3 years
Low commit activity in last 3 years
If you are eg. downloading a large file, processing that file, then writing the results to a database, ThreadedPipeline may be for you. Download in one thread, process in another, and write in a third, etc.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
 Dependencies

Development

 Project Readme

ThreadedPipeline

Build status: CircleCI

Recently I have been doing a lot of the pattern:

  1. Download file from list of URLs
  2. Process file
  3. Record results

Part 1 is network bound. Part 2 is CPU bound. Part 3 is service bound (database in my case). There is no reason I should not run these three in parallel, so this gem is the encapsulation of the general pattern of running parts of a pipeline in parallel.

Greatly inspired by the parallel gem.

Tested with MRI and JRuby.

Installation

Add this line to your application's Gemfile:

gem 'threaded_pipeline'

And then execute:

$ bundle

Or install it yourself as:

$ gem install threaded_pipeline

Usage

threaded_pipeline = ThreadedPipeline.new
threaded_pipeline.stages << -> (url) { fetch_large_csv(url) }
threaded_pipeline.stages << -> (local_file) { process_local_file(local_file) }
threaded_pipeline.stages << -> (processed_results) { record_results_in_database(processed_results) }
results = threaded_pipeline.process([list, of, large, csv, urls])

Or, if you want to feed it yourself - and also discard the results:

another_pipeline = ThreadedPipeline.new(discard_results: true)
another_pipeline.stages << -> (url) { api_query(url) }
another_pipeline.stages << -> (returned_data) { process_returned_data(returned_data) }
another_pipeline.stages << -> (processed_results) { record_results_in_database(processed_results) }
while url = web_crawl_urls
  another_pipeline.feed(url)
end
another_pipeline.finish

This is even handy if you just want to process output from some other task in the background:

simple_pipeline = ThreadedPipeline.new(discard_results: true)
simple_pipeline.stages << -> (some_data) { process_data(some_data) }
while (some_data = some_object_that.generates_data)
  simple_pipeline.feed(some_data)
end
simple_pipeline.finish

Which just encapsulates setting up the queue, settin up a completion condition/object, etc.

Development

I use docker and guard. make guard

After checking out the repo, run bin/setup to install dependencies. Then, run rake test to run the tests. You can also run bin/console for an interactive prompt that will allow you to experiment.

To install this gem onto your local machine, run bundle exec rake install. To release a new version, update the version number in version.rb, and then run bundle exec rake release, which will create a git tag for the version, push git commits and tags, and push the .gem file to rubygems.org.

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/[USERNAME]/threaded_pipeline.

License

The gem is available as open source under the terms of the MIT License.