0.0
No release in over a year
This tool will read files from S3 by a prefix, and combine the files into batch-like files.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
 Dependencies

Development

~> 1.16
>= 0
~> 5.0
~> 10.0

Runtime

 Project Readme

S3DataPacker

This small packer will read a large amount of individual files on an S3 location that represent single items in JSON format, and pack them into larget batches with the option of compressing the final batch, decreasing the total storage size of the data (if compressed), and also reducing the total number of files.

The idea is to prepare data dumped on S3 in this way to a more optimal layout for AWS Athena to setup a querying system on top of it.

For now, S3DataPacker supports JSON items, with a 1 item per file layout, GZip compression if enabled, and only from S3 to S3, though the source and target bucket can be different buckets or even on different accounts if the proper credentials are provided.

Installation

Add this line to your application's Gemfile:

gem 's3_data_packer'

Or use the main branch from repo:

gem 's3_data_packer', git: 'https://github.com/rayko/s3_data_packer.git', branch: 'main'

And then execute:

$ bundle

Or install it yourself as:

$ gem install s3_data_packer

Configurations

There's a good number of options that can alter how the data is consumed. Below is the list of all defaults out of the box:

S3DataPacker.configure do |config|
  config.logger = Logger.new('log/s3_data_packer.log')     # Standard logger for information
  config.thread_count = 2                                  # How many threads to run
  config.thread_sleep_time = 1                             # How long to wait when there's no work in queue
  config.thread_lock_wait_time = 1                         # How long to wait when a lock error happens before retrying
  config.max_queue_size = 10000                            # How big can the queue get during processing
  config.max_queue_wait = 5                                # How long to wait when the queue reached max_queue_size before continuing
  config.workdir = 'tmp/s3_data_packer'                    # Where to keep output files until pushing to target location
  config.cleanup_batch = true                              # Whether to remove the pushed batches or not
  config.compress_batch = true                             # Whether to compresss with GZip or not
  config.batch_size = 100000                               # How many items to fit in a batch
  config.s3_api_key = nil                                  # Default API Key for an Aws account
  config.s3_api_secret = nil                               # Default API Secret for an AWS account
  config.s3_region = nil                                   # Default region for the buckets to use
  config.output_filename_prefix = nil                      # Static prefix to append on output filenames
  config.output_filename_suffix = 'batch'                  # Static suffix to insert on output filenames
  config.output_filename_pattern = %i[timecode_int suffix] # Simple pattern to construct output filenames (more on that below)
  config.output_filename_splitter = '_'                    # Character to join elements into a string that'll be a final filename
end

S3 Credentials

There are 2 main ways to do this depending on the context. Buckets can be configured in place with user provided credentials for both source and target locations.

If the source and target locations are on the same account, region and use the same credentials, the options above can be set to always set those credentials.

AWS credentials in the configuration here are optional, and just a shortcut to setting credentials for each run.

Thread options

Various thread options are available to moderate how the process run. Depending on the hardware available the thread counts can be adjusted to speed up the process. However, it there are enough threads, the queue might run empty too soon, in which case threads will sleep the given ammount of time to wait to gather some items to work on.

All timming settings should be adjusted depending on where this is going to run and the resources available.

Output filename options

There are a couple parameters that can be configured generally to generate filenames consistently. The simplest options :output_filename_prefix, :output_filename_suffix and :output_filename_splitter are straight forward. The :output_filename_pattern option is a bit more involved. It basically dictates order and what values to use when generating a filename. When a new name needs to be generated, each item in the pattern will be translated to a value of some kind, and merged toghether with the :output_filename_splitter character. The contents of the pattern array must be Symbol names and can only be one of the following:

  • :timecode_int -> current standard time in seconds (Time.now.to_i)
  • :timecode_dec -> current standard time with milliseconds (Time.now.to_f)
  • :number -> a simple number that grows as new names are generated
  • :timestamp -> simple time stamp with format YYYYMMDDhhmmss
  • :datestamp -> simple date stamp with format YYYYMMDD
  • :prefix -> given static string to use as prefix on the name
  • :suffix -> given static string to use as suffix on the name

Different patterns will generate different names with same structuring. The important part here is to always include a variable element so final files do not override previous data.

A few examples of different patterns, setting prefix as 'data' and suffix as 'batch':

  • [:timecode_int, :suffix] -> 1111111111_batch 1111111112_batch 1111111113_batch ...
  • [:datestamp, :number] -> 20200101_1 20200101_2 20200101_3 ...
  • [:prefix, :number, :suffix] -> data_1_batch data_2_batch data_3_batch ...

Usage

The simplest setup for this simple file processor is to set the AWS credentials and region through the configuration as shown above. Be sure that config.workdir is set and the location exists in the local machine.

To launch the packer, the only thing needed out of the box, is to instantiate 2 S3DataPacker::Bucket objects that will act as source and destination:

source_bucket = S3DataPacker::Sources::S3Bucket.new name: 'my-bucket', path: 'some/location'
target_bucket = S3DataPacker::Sources::S3Bucket.new name: 'other-bucket', path: 'my/destination'

You can override the configured AWS credentials with the :credentials option, as well as :region. :credentials needs to be an instance of Aws::Credentials. Having it setup this way should allow for more complex role invoking, since the instance passed :credentials option is fed direclty to Aws::S3::Resource and Aws::S3::Client to interface with the S3 buckets.

Once the buckets are instantiated you can call the packer:

packer = S3DataPacker::Packer.new source: source_bucket, target: target_bucket
packer.pack!

How it works?

Based on the sample above, what will happen once that #pack! is called, is that a set of threads will boot up, a new file will be opened in config.workdir that, without further configuration it will be named 123123123_batch.json (in general), and then the packer will start to iterate over all keys under the source path some/location.

Each key listed will enter the queue for the threads, and the threads will then take each key in queue, download the data in memory (it does not create a file for it), append the data into the currently opened batch, and continue with the next key.

As items are appended, if the target size config.batch_size is reached, the current batch is closed, compressed with GZip, and uploaded to target bucket in the location specificed my/destination. Once the file is pushed, the local copy is deleted, and a new batch is opened to continue appending items.

When all the keys have been listed, the packer will wait for the threads to finish any remaining items in the queue, and the last opened batch that likely hasn't reached target size, is then closed and pushed like the others.

And that's basically it. There are a few places in where additional processing may be introduced, but that's a feature for later.

There are no specialties regarding source and target buckets, they can be the same, on different accounts or region. However it is not recommended to setup source and target on the same bucket and path.

Custom Sources/Targets

It is possible to define a custom source and target for the packer to read data from some different place that is not an S3 bucket, as well as put the resultant batch into somewhere else. The S3DataPacker::Packer can take :source and :target parameters to use other things. At the moment, there are 2 source classes provided:

  • S3DataPacker::Sources::S3Bucket
  • S3DataPacker::Sources::Object

And 2 pre-defined target:

  • S3DataPacker::Targets::S3Bucket
  • S3DataPacker::Targets::Object

Both bucket related classes operate in the same way, you need to define the name and path of the buckets to read and write the data, as in the main example above. Be sure to configure credentials to use these.

The object source is pretty much a wrapper you can use with some other custom object, passing down which methods to call on it for the packer. Any object you pass down in the object source needs to respond to:

  • #name: which is mostly used for logging
  • #each: with a block to iterate over items
  • #fetch: with an identifier to find the actual data of the item

The #each and #fetch methods are like that mainly because the packer is threaded and it expects to iterate over keys, or IDs or some minor piece of information in one thread, and use that information to retrive the full object data on other threads. This keeps the queue small in byte size.

By default the object source expects those method names to be defined in the object provided. If there are other methods that do that already on the object but with different name, the method names can be passed like so:

S3DataPacker::Sources::Object.new object: my_object, 
                                  each_method: :iterate, 
                                  fetch_method: :find, 
                                  name_method: :display_name

As long as #each yields items (strings, IDs, whatever), and #fetch returns JSON data for an item, this should work.

For targets, there's also a S3DataPacker::Targets::Object that can be used in the a similar way, the only 2 methods for it are:

  • #name: for the same purposes as sources #name method
  • #save_file: with a path parameter

It can also be configured with other method names if needed:

S3DataPacker::Targets::Object.new object: my_object,
                                  name_method: :custom_name,
                                  save_file_method: :save!

It is also possible to construct a custom source/target class outside of the pre-defined ones that can do anything needed, and passed down to the packer instance to use. As long as the few needed methods are there, it should work just fine.

In some cases it might be useful to unify the get/fetch mechanics. This can be easily done by just bypassing the #fetch method and returning the data received. If for some reason the iterator for #each needs to output the actual data right there, by writing a #fetch method that returns whatever was passed in the parameter, effectively makes the packer's queue hold actual data. This might be useful in some cases, though it might need a smaller max size configuration to prevent having too much data in the queue.

I believe that with these tools, the packer can pretty much do the JSON packing in most cases, including:

  • Reading database records and serializing them into JSON
  • Reading S3 buckets (as originally intended)
  • Reading NoSQL items
  • Reading one file or a set of files
  • Writing batches into S3 buckets (as originally intended)
  • Writing batches into filesystem on some custom location
  • Writing batches into some other custom location

At least, it does cover the cases in where I intend to use it.

Development

After checking out the repo, run bin/setup to install dependencies. Then, run rake test to run the tests. You can also run bin/console for an interactive prompt that will allow you to experiment.

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/rayko/s3_data_packer. This project is intended to be a safe, welcoming space for collaboration, and contributors are expected to adhere to the Contributor Covenant code of conduct.

License

The gem is available as open source under the terms of the MIT License.

Code of Conduct

Everyone interacting in the S3DataPacker project’s codebases, issue trackers, chat rooms and mailing lists is expected to follow the code of conduct.