No commit activity in last 3 years
No release in over 3 years
Filter each record by ruby proc
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
 Dependencies

Development

>= 1.10.6
>= 0.8.1
>= 10.0
 Project Readme

Ruby Proc filter plugin for Embulk

This plugin is inspired by mgi166/embulk-filter-eval: Eval ruby code on filtering

This plugin apply ruby proc to each record.

Overview

  • Plugin type: filter

Configuration

  • columns: filter definition (hash, required)
  • requires: pre required libraries (array, default: [])

Example

input

id,account,time,purchase,comment,data
1,32864,2015-01-27 19:23:49,20150127,embulk,"{\"foo\": \"bar\", \"events\": [{\"id\": 1, \"name\": \"Name1\"}, {\"id\": 2, \"name\": \"Name2\"}]}"
2,14824,2015-01-27 19:01:23,20150127,embulk jruby,NULL
3,27559,2015-01-28 02:20:02,20150128,"Embulk ""csv"" parser plugin",NULL
4,11270,2015-01-29 11:54:36,20150129,NULL,NULL

config

# ...

filters:
  - type: ruby_proc
    requires:
      - cgi
    variables:
      multiply: 3
    before:
      - proc: |
          -> do
            puts "before proc"
            @started_at = Time.now
          end
    after:
      - proc: |
          -> do
            puts "after proc"
            p Time.now - @started_at
          end
    rows:
      - proc: |
          ->(record) do
            [record.dup, record.dup.tap { |r| r["id"] += 10 }]
          end
    skip_rows:
      - proc: |
          ->(record) do
            record["id"].odd?
          end
    columns:
      - name: data
        proc: |
          ->(data) do
            data["events"] = data["events"].map.with_index do |e, idx|
              e.tap { |e_| e_["idx"] = idx }
            end
            data
          end
      - name: id
        proc: |
          ->(id) do
            id * variables["multiply"]
          end
        type: string
      - name: comment
        proc_file: comment_upcase.rb
        skip_nil: false
        type: json
    pages:
      - proc: |
          ->(records) do
            records.map do |record|
              record.tap { |r| r["id"] += 1 }
            end
          end

# ...

If you want to skip record in "rows proc" or "columns proc", use throw :skip_record.

# comment_upcase.rb

->(comment, record) do
  return [record["account"].to_s].to_json unless comment
  comment.upcase.split(" ").map { |s| CGI.escape(s) }
end
  • before and after is executed at once
  • procs is evaluated on same binding (instance of Evaluator class)
    • instance variable is shared
  • rows proc must return record hash or array of record hash.
    • user must take care of object identity. Otherwise, error may be occurred when plugin applys column procs.
  • pages proc must return array of record hash.
    • use page_size option to increase size of processing record (ex. -X page_size=64KB)

proc execution order

  1. before procs
  2. per record
    1. all row procs
    2. per record applied row procs
      1. all skip_row procs
      2. column procs
  3. per page procs
  4. after procs

preview

+-----------+--------------+-------------------------+-------------------------+------------------------------------------+------------------------------------------------------------------------------------------+
| id:string | account:long |          time:timestamp |      purchase:timestamp |                             comment:json |                                                                                data:json |
+-----------+--------------+-------------------------+-------------------------+------------------------------------------+------------------------------------------------------------------------------------------+
|         3 |       32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC |                               ["EMBULK"] | {"events":[{"id":1,"name":"Name1","idx":0},{"id":2,"name":"Name2","idx":1}],"foo":"bar"} |
|        34 |       32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC |                               ["EMBULK"] | {"events":[{"id":1,"name":"Name1","idx":0},{"id":2,"name":"Name2","idx":1}],"foo":"bar"} |
|         6 |       14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC |                       ["EMBULK","JRUBY"] |                                                                                          |
|        37 |       14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC |                       ["EMBULK","JRUBY"] |                                                                                          |
|         9 |       27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | ["EMBULK","%22CSV%22","PARSER","PLUGIN"] |                                                                                          |
|        40 |       27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | ["EMBULK","%22CSV%22","PARSER","PLUGIN"] |                                                                                          |
|        12 |       11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC |                                ["11270"] |                                                                                          |
|        43 |       11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC |                                ["11270"] |                                                                                          |
+-----------+--------------+-------------------------+-------------------------+------------------------------------------+------------------------------------------------------------------------------------------+

Build

$ rake