Wukong-Load
This Wukong plugin makes it easy to load data from the command-line into and out of various data stores.
It is assumed that you will independently deploy and configure each data store yourself (but see Ironfan).
## Installation & SetupWukong-Load can be installed as a RubyGem:
$ sudo gem install wukong-load
Wukong-Load provides a command-line program wu-load you can use to
load data fed in over STDIN into a data store. It's designed to work
effectively with wu-local and wu-dump as part of UNIX pipelines.
Get help on wu-load by running
$ wu-load --help
All input to wu-load should consist of newline-separated records
over STDIN. For some data stores, JSON-formatted, Hash-like input is
expected. Keys in the record may be interpreted as metadata about the
record or about how to route the record within the data store.
Here are some quick examples which do what you think they do:
$ echo 'an arbitrary line of text' | wu-load kafka --topic=foo
$ echo '{"this": "record", "will": { "be": "indexed"}} | wu-load elasticsearch --index=foo --es_type=bar
$ echo '{"this": "record", "will": { "be": "updated"}, "_id": "existing_id"}' | wu-load elasticsearch --index=foo --es_type=bar
$ echo '{"this": "record", "will": { "be": "indexed"}} | wu-load mongodb --database=foo --collection=bar
$ echo '{"this": "record", "will": { "be": "upserted"}, "_id": "existing_id"}' | wu-load mongodb --database=foo --collection=bar
$ echo '{"this": "record", "is": "indexed"} | wu-load sql --database=foo --table=bar
$ echo '{"this": "record", "is": "upserted"}, "_id": "existing_id"}' | wu-load sql --database=foo --table=bar
Further details and options will depend on the data store you're writing to. See more information on each specific data store below. You can also get help for a specific data store with:
$ wu-load STORE_TYPE --help
Note: The wu-load program is not designed to handle any
significant scale of data. It is only intended as a convenience
tool for modeling how Wukong dataflows (which can scale) will
interact with data stores.
<a name="wu-dump>
Wukong-Load provides a command-line program wu-dump you can use to
dump data from a data store to STDOUT. It's designed to work
effectively with wu-local and wu-load in UNIX pipelines.
Get help on wu-dump by running
wu-dump --help
All output from wu-dump will be newline-separated records over
STDOUT. For some data stores, JSON-formatted, Hash-like output is
produced.
Here are some quick examples which do what you think they do:
$ wu-dump kafka --topic=foo
line 1
line 2
line 3
...
$ wu-dump file --input=/data/foo.tsv
foo a
foo b
foo c
...
$ wu-dump file --input=/data/bar.tsv.gz
bar x
bar y
bar z
...
$ wu-dump file --input=data.zip
data.zip 1 foo a
data.zip 2 foo b
...
data.zip 1 bar x
data.zip 2 bar y
...
$ wu-dump directory --input=/data
/data/foo.tsv 1 foo a
/data/foo.tsv 2 foo b
...
/data/bar.tsv.gz 1 bar x
/data/bar.tsv.gz 2 bar y
...
Further details and options will depend on the data store you're reading from. See more information on each specific data store below. You can also get help for a specific data store with:
$ wu-dump STORE_TYPE --help
Note: The wu-load program is not designed to handle any
significant scale of data. It is only intended as a convenience
tool for modeling how Wukong dataflows (which can scale) will
interact with data stores.
<a name="wu-sync>
wu-sync
Wukong-Load also provides a program wu-sync which can be used to
sync data between filesystem like data stores.
wu-sync itself provides several types of syncs that can be chained
together to create transactional, scalable processing for batch files.
Here's an example which syncs data from an FTP server to S3, providing a per-file transactional guarantee throughout:
$ wu-sync ftp --host=ftp.example.com --path=/remote/data --output=/data/incoming
$ wu-sync prepare --input=/data/incoming --output=/data/received
$ wu-sync prepare --input=/data/incoming --output=/data/received
$ wu-sync s3 --input=/data/received --bucket=s3://example.com/data
Further details and options will depend on the type of sync being performed. See more information on each specific sync below. You can also get help for a specific sync with:
$ wu-sync SYNC_TYPE --help
Kafka uses the concept of "topics" which contain numbered "partitions" that contain a sequence of events concatenated together. Producers (writers) to Kafka dump in events and consumers (readers) request data between particular byte offsets on particular partitions of particular topics. Kafka's data model is very simple (everything is a sequence of bytes) so it can store any kind of data.
Get options for loading data from or dumping data to Kafka:
$ wu-load kafka --help
$ wu-dump kafka --help
Connecting
wu-load and wu-dump try to connect to a Kafka broker at a default
host (localhost) and a port (9092). You can change this:
$ cat data.txt | wu-load kafka --host=10.122.123.124 --port=1234
$ wu-dump kafka --host=10.122.123.124 --port=1234
Routing
wu-load and wu-dump both assume a default topic (wukong) and
partition (0), but you can change these:
$ cat data.txt | wu-load kafka --topic=messages --partition=6
$ wu-dump kafka --topic=messages --partition=6
Producing
Writing data to Kafka is simple: pick a topic and partition and push in some bytes:
$ cat data.txt | wu-load kafka --topic=messages
To achieve higher throughput (though, remember, wu-dump and
wu-load are not designed for significant load), set the
--batch_size option up a bit:
$ cat data.txt | wu-load kafka --topic=messages --batch_size=1000
Consuming
Consuming data from Kafka requires picking a topic, partition, and a byte-offset.
It's rare to want to choose a specific byte-offset (typically only occurs in delicate situations that no one wants to be in) but you can do it easily:
$ wu-dump kafka --topic=foo --offset=65536
The usual approach is to consume either from the beginning or end of
available data on a topic & partition. wu-dump's default behavior
is to start from the end of a topic:
$ wu-dump kafka --topic=foo
but you can also start from the beginning:
$ wu-dump kafka --topic=foo --from_beginning
Another common pattern among Kafka consumers is to choose an offset
that is "wherever you last left off". Kafka does not provide a
mechansim to remember per-consumer last-offsets so it's up to
consumers to do this for themselves. Since wu-dump is not designed
to run continuously, it does not store "wherever you last left off".
Note:: ElasticSearch support for wu-dump is not currently
implemented.
ElasticSearch uses the concept of "indices" which contain "types" to store "documents", each of which is a schema-less, Hash-like structure. Every document must also have an ID which can be generated by ElasticSearch at index (write) time.
wu-load, like many tools that interact with ElasticSearch, uses
JSON-serialization. See full options with
$ wu-load elasticsearch --help
Connecting
wu-load tries to connect to an Elasticsearch server at a default
host (localhost) and port (9200). You can change these:
$ cat data.json | wu-load elasticsearch --host=10.122.123.124 --port=80
Routing
wu-load loads each document into default index (wukong) and type
(streaming_record), but you can change these:
$ cat data.json | wu-load elasticsearch --index=publication --es_type=book
A record with an _index or _type field will override these default
settings. You can change the names of the fields used with the
--index_field and --es_type_field options:
$ cat data.json | wu-load elasticsearch --index=industry --es_type=publication --index_field=publisher --es_type_field=classification
Creates vs. Updates
If an input record contains a value for the field _id then that
value will be as the ID of the document when written, possibly
overwriting a document that already exists -- an update.
You can change the field you use for the Elasticsearch ID property:
$ cat data.json | wu-load elasticsearch --index=media --es_type=books --id_field="ISBN"
Note:: MongoDB support for wu-dump is not currently implemented.
MongoDB uses the concept of "databases" which contain "collections" to store "documents", each of which is a schema-less, Hash-like structure. Every document must also have an ID which can be generated by MongoDB at insert (write) time.
wu-load, like many tools that interact with MongoDB, uses
JSON-serialization. See full options with
$ wu-load mongodb --help
Connecting
wu-load tries to connect to an MongoDB server at a default host
(localhost) and port (27017). You can change these:
$ cat data.json | wu-load mongodb --host=10.122.123.124 --port=1234
Routing
wu-load loads each document into default database (wukong) and
collection (streaming_record), but you can change these:
$ cat data.json | wu-load mongodb --database=publication --collection=book
A record with a _database or _collection field will override these
default settings. You can change the names of the fields used with
the --database_field and --collection_field options:
$ cat data.json | wu-load mongodb --database=industry --collection=publication --database_field=publisher --collection_field=classification
Creates vs. Updates
If an input record contains a value for the field _id then that
value will be as the ID of the document when written, possibly
overwriting a document that already exists -- an update.
You can change the field you use for the MongoDB ID property:
$ cat data.json | wu-load mongodb --database=publication --collection=books --id_field="ISBN"
Syncing data between filesystems is always challenging to do well though there are many good tools available.
In a Big Data setting, syncs are even more challenging because of the size of the files involved which exacerbates problems of blocking, atomicity, transactionality, throughput, memory, &c. The best approach is to split the process up into several steps, each of which does one simple operation robustly, correctly, and in a way that can be tracked.
The wu-sync provides several types of syncs that can be used
together to create transactional, scalable processing for batch files.
Each wu-sync command is meant to be run on a regular schedule every
few minutes, perhaps via cron.
All of the wu-sync sync types accept the --dry_run option which
will go through the sync showing what would be done but not doing it.
From FTP
The ftp sync will sync data from an FTP server to a local disk. It
supports several different flavors of
FTP, including
FTPS and
SFTP.
On each invocation, wu-sync ftp will download any files (or parts of
files) present under --path on the remote server but not present in
the local --output directory. Files present on the local filesystem
but not on the remote server will be ignored.
The lftp program is required for wu-ftp to
function.
Get general help on wu-sync ftp with
$ wu-sync ftp --help
Note: Since FTP is conceptually "single-threaded", no throughput
gains are achieved by having multiple processes try and read the same
file on the same FTP server. For this reason, wu-sync ftp can
be used in a production setting, acting as the fundamental building
block of a more distributed system of which each wu-sync ftp process
is responsible for some files on some FTP servers.
Specifying FTP protocol, host, and credentials
By default, wu-sync ftp will try to connect anonymously to an FTP
server running on the local machine. The --protocol, --host,
--port, --username, and --password options can be used to
configure this behavior. Here's an example of connecting to a remote
FTP server using a secure FTPS connection:
$ wu-sync ftp --host=ftp.example.com --protocol=ftps --username=bob --password=<password> --output=/data/ftp
You can use the FTP_PASSWORD environment variable if you don't want
to pass the password on the command-line.
The port is determined automatically from the protocol (e.g. - 21 for
ftp, 22 for sftp, 443 for ftps) but can be explicitly given with
the --port flag.
Preparing Files for Downstream Consumption
One of the immediate issues that arises when syncing files is that it may not be clear to a client when the file on the remote server the client is connected to has finished uploading. It may also not be clear if the file has finished downloading to the local filesystem.
The approach taken by Wukong is to run a separate "archival" step
which syncs data in a local --input directory with data in a local
--output directory. Files will only show up in the --output
directory when they are complete, as measured by them having stopped
growing in the --input directory.
wu-sync is not a continuously running process, however, and so the
only way for it to reliably know when files have stopped growing is
for it to store file sizes from one invocation in order to compare
them to file sizes on the next invocation. For this reason, creating
files in the --output directory requires at at least two
invocations of wu-sync (with the same set of parameters).
Here's an example of the --input directory /data/ftp:
/data/ftp
├── alice
│ └── project_1
│ └── file_1
├── bob
│ ├── project_1
│ │ ├── file_1
│ │ ├── file_2
│ │ └── file_3
│ └── project_2
│ ├── file_1
│ └── file_2
└── README
After running wu-sync prepare twice
$ wu-sync prepare --input=/data/ftp --output=/data/clean
$ wu-sync prepare --input=/data/ftp --output=/data/clean
the --output directory /data/clean should look exactly the same
as the input directory:
/data/clean
├── alice
│ └── project_1
│ └── file_1
├── bob
│ ├── project_1
│ │ ├── file_1
│ │ ├── file_2
│ │ └── file_3
│ └── project_2
│ ├── file_1
│ └── file_2
└── README
Nothing here seems magical, but the log output of wu-sync prepare
reveals a lot that's going on to ensure that only complete files are
processed.
Note: The files created in the --output directory are
hardlinks pointing at the
original files in the --input directory.
Splitting input files
The --split option will make wu-sync prepare split large files in
the --input directory into many manageable, smaller files in the
--output directory.
Assume that Alice has been busy and that the file
/data/clean/alice/project_1/file_1 is big. After running wu-sync prepare twice:
$ wu-sync prepare --input=/data/ftp --output=/data/clean --split
$ wu-sync prepare --input=/data/ftp --output=/data/clean --split
the --output directory /data/clean will look like this:
/data/clean
├── alice
│ └── project_1
│ ├── file_1.part-0000
│ ├── file_1.part-0001
│ └── file_1.part-0002
├── bob
│ ├── project_1
│ │ ├── file_1.part-0000
│ │ ├── file_2.part-0000
│ │ └── file_3.part-0000
│ └── project_2
│ ├── file_1.part-0000
│ └── file_2.part-0000
└── README.part-0000
Alice's big --input file has been split into three --output files.
Bob's --input files were smaller than the split size so they each
resulted in a single --output file.
You can change the number of lines in each file split with the
--lines option and switch to splitting by bytes with the --bytes
option.
Note: When splitting, the files in the --output directory are
real files instead of hardlinks as they are by default.
Ordering output files
Some tools require their input to be ordered and for these tools
wu-sync prepare provides an --ordered option which will reorganize
files in the --output directory so that they are totally ordered:
$ wu-sync prepare --input=/data/ftp --output=/data/clean --ordered
$ wu-sync prepare --input=/data/ftp --output=/data/clean --ordered
This would result in the following structure for the --output
directory:
/data/clean
├── alice
│ └── 2013
│ └── 09
│ └── 20
│ └── 20130920-071142-1-alice-project_1-file_1
├── bob
│ └── 2013
│ └── 09
│ └── 20
│ ├── 20130920-071142-2-bob-project_1-file_1
│ ├── 20130920-071142-3-bob-project_1-file_2
│ ├── 20130920-071142-4-bob-project_1-file_3
│ ├── 20130920-071142-5-bob-project_2-file_1
│ └── 20130920-071142-6-bob-project_2-file_2
└── root
└── 2013
└── 09
└── 20
└── 20130920-071142-0-README
The ordering is built up from:
-
Each top-level subdirectory of the
--inputdirectory appears in the root of the--outputdirectory. (Files that were in the--inputdirectory itself are put in therootsubdirectory of the--outputdirectory.) -
Within each top-level subdirectory, a daily directory is created based on the time the corresponding
--inputfile was recognized as completed. -
All files within the top-level subdirectory of the
--inputdirectory are placed within this daily directory with basenames constructed froma. the time the corresponding
--inputfile was recognized as completed b. an incrementing counter c. the path of the--inputfile relative to the--inputdirectory
The --ordered option can be combined with the --split option:
$ wu-sync prepare --input=/data/ftp --output=/data/clean --ordered
$ wu-sync prepare --input=/data/ftp --output=/data/clean --ordered
to get
/data/clean
├── alice
│ └── 2013
│ └── 09
│ └── 20
│ ├── 20130920-071643-1-alice-project_1-file_1.part-0000
│ ├── 20130920-071643-1-alice-project_1-file_1.part-0001
│ └── 20130920-071643-1-alice-project_1-file_1.part-0002
├── bob
│ └── 2013
│ └── 09
│ └── 20
│ ├── 20130920-071643-2-bob-project_1-file_1.part-0000
│ ├── 20130920-071643-3-bob-project_1-file_2.part-0000
│ ├── 20130920-071643-4-bob-project_1-file_3.part-0000
│ ├── 20130920-071643-5-bob-project_2-file_1.part-0000
│ └── 20130920-071643-6-bob-project_2-file_2.part-0000
└── root
└── 2013
└── 09
└── 20
└── 20130920-071643-0-README.part-0000
Including metadata
The --metadata option produces a JSON-formatted metadata file for
each data file produced in the --output directory.
Metadata files are stored in a separate hierarchy in the --output
directory from the data files. Within this separate hierarchy they
have the same relative path as their corresponding data files, but
with an extra suffix of .meta. Here's an example:
$ wu-sync prepare --input=/data/ftp --output=/data/clean --metadata
$ wu-sync prepare --input=/data/ftp --output=/data/clean --metadata
which produces
/data/clean
├── alice
│ └── 2013
│ └── 09
│ └── 20
│ └── 20130920-072539-1-alice-project_1-file_1
├── alice_meta
│ └── 2013
│ └── 09
│ └── 20
│ └── 20130920-072539-1-alice-project_1-file_1.meta
├── bob
│ └── 2013
│ └── 09
│ └── 20
│ ├── 20130920-072539-2-bob-project_1-file_1
│ ├── 20130920-072539-3-bob-project_1-file_2
│ ├── 20130920-072539-4-bob-project_1-file_3
│ ├── 20130920-072539-5-bob-project_2-file_1
│ └── 20130920-072539-6-bob-project_2-file_2
├── bob_meta
│ └── 2013
│ └── 09
│ └── 20
│ ├── 20130920-072539-2-bob-project_1-file_1.meta
│ ├── 20130920-072539-3-bob-project_1-file_2.meta
│ ├── 20130920-072539-4-bob-project_1-file_3.meta
│ ├── 20130920-072539-5-bob-project_2-file_1.meta
│ └── 20130920-072539-6-bob-project_2-file_2.meta
├── root
│ └── 2013
│ └── 09
│ └── 20
│ └── 20130920-072539-0-README
└── root_meta
└── 2013
└── 09
└── 20
└── 20130920-072539-0-README.meta
Notice that the --metadata option implies the --ordered option.
This is so metadata files are guaranteed to come later than their
corresponding data file in any lexicographically ordered walking of
the --output directory.
This is so that syncing tools will transfer a metadata file after
they transfer its corresponding data file. Since metadata files are
also small, this means that downstream tools can use the presence or
absence of a metadata file to know for sure whether a data file has
already been transferred completely -- they won't have to do the dance
that wu-sync prepare is doing.
The content of a metadata file is very simple. Here's
/data/clean/alice_meta/2013/09/20/20130920-072539-1-alice-project_1-file_1.meta:
{
"path": "alice/2013/09/20/20130920-072539-1-alice-project_1-file_1",
"meta_path": "alice_meta/2013/09/20/20130920-072539-1-alice-project_1-file_1.meta",
"size": 168894,
"md5": "0a61f0919f546ce04fc119b028b88a2e"
}
Both path and meta_path are relative to the --output directory.
The --metadata option can also be combined with the --split
option.
Multiple Output Directories
Using multiple --output directories mounted on different devices can
greatly speed up operation as large files are read & written. When
using multiple --output directories, each consecutively processed
file in the --input directory will be assigned one of the output
directories in a round-robin fashion:
$ wu-sync prepare --input=/data/ftp --output=/data/clean_1,/data/clean_2
$ wu-sync prepare --input=/data/ftp --output=/data/clean_1,/data/clean_2
To S3
The s3 sync will sync data from a local --input directory to an
S3 --bucket and path. It requires the
s3cmd program.
Here's an example.
$ wu-sync s3 --input=/data/clean --bucket=s3://example.com/archive
This example assumes that the underlying s3cmd has been installed
with appropriate credentials to write to the s3://example.com bucket
(or that the bucket s3://example.com is world-writable). If not, an
s3cmd configuration file can be passed in with the --s3cmd_config
file:
$ wu-sync s3 --input=/data/clean --bucket=s3://example.com/archive --s3cmd_config=config/s3cfg
The wu-sync s3 command also works with multiple --input
directories. This is to work nicely with the wu-sync prepare
command which has multiple --output directories:
$ wu-sync s3 --input=/data/clean_1,/data/clean_2 --bucket=s3://example.com/archive --s3cmd_config=config/s3cfg
Note: This functionality is designed to work in the context of a deploy pack only.
Syncing several FTP sources is possibly with wu-sync but it's a lot
to type. The wu-sync-all command can read a configuration file with
pre-defined data sources and run each of the sync types on each of the
sources (or just some of them).
Within one of the configuration files in your deploy pack (either
config/settings.yml or an environment-specific
config/environments/ENVIRONMENT.yml) create a listener for each of
your souces as follows:
---
listeners:
nasa:
ftp:
host: ftp.nasa.gov
username: narmstrong
password: first!
path: /data/latest
prepare:
ordered: true
metadata: true
s3:
bucket: s3://archive.example.com/nasa
usaf:
ftp:
host: ftp.usaf.gov
username: bobross
password: xxx
path: /data/latest
prepare:
split: true
lines: 1_000_000
s3:
bucket: s3://archive.example.com/usaf
...The top-level keys in the listeners Hash (nasa, usaf, &c.) are
each the name of a data source. The next-level keys (ftp,
prepare, s3) each name a sync type and give the options for that
type. These options are exactly the same as the usual options for
that sync-type. Options not supplied via the configuration file
(typically "system" parameters like --input and --output
directories) are expected to be supplied on the command-line at
runtime.
With a proper configuration with valid listeners, the following commands will perform a sync from FTP to S3 of all data sources:
$ wu-sync-all ftp --output=/data/incoming
$ wu-sync-all prepare --input=/data/incoming --output=/data/received
$ wu-sync-all prepare --input=/data/incoming --output=/data/received
$ wu-sync-all s3 --input=/data/received
The --only and --except options can be used to limit
wu-sync-all's action to a desired subset of sources:
$ wu-sync-all ftp --output=/data/incoming --only=nasa,usaf
$ wu-sync-all ftp --output=/data/incoming --except=wto