No commit activity in last 3 years
No release in over 3 years
Amazon DynamoDB Streams input plugin for Fluentd
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
 Dependencies

Development

~> 1.7
~> 10.0
>= 3.1.0

Runtime

< 2, >= 0.10.58
 Project Readme

fluent-plugin-dynamodb-streams

Gem Version Build Status Test Coverage Code Climate Codacy Badge

Fluentd input plugin for AWS DynamoDB Streams.

Preparation

Create IAM user with a policy like the following.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "dynamodb:GetRecords",
        "dynamodb:GetShardIterator",
        "dynamodb:DescribeStream",
        "dynamodb:ListStreams"
      ],
      "Resource": "*"
    }
  ]
}

Or define aws_key_id and aws_sec_key in your config file.

Installation

Add this line to your application's Gemfile:

gem 'fluent-plugin-dynamodb-streams'

And then execute:

$ bundle

Or install it yourself as:

$ gem install fluent-plugin-dynamodb-streams

Configuration

<source>
  type dynamodb_streams
  #aws_key_id  AWS_ACCESS_KEY_ID
  #aws_sec_key AWS_SECRET_ACCESS_KEY
  #aws_region  AWS_DEFAULT_REGION
  stream_arn arn:aws:dynamodb:ap-northeast-1:000000000000:table/table_name/stream/2015-01-01T00:00:00.000
  pos_file /var/lib/fluent/dynamodb_streams_table_name
  fetch_interval 1
  fetch_size 1
</source>
  • tag: Fluentd tag.
  • stream_arn: DynamoDB Streams ARN.
  • pos_file: File to store last sequence number.
  • fetch_interval: The interval to fetch records in seconds. Default is 1 sec.
  • fetch_size: The maximum number of records fetches in each iteration. Default is 1.

Output

{
  "aws_region": "ap-northeast-1",
  "event_source": "aws:dynamodb",
  "event_version": "1.0",
  "event_id": "dfbdf4fe-6f2b-4b34-9b17-4b8caae561fa",
  "event_name": "INSERT",
  "dynamodb": {
    "stream_view_type": "NEW_AND_OLD_IMAGES",
    "sequence_number": "000000000000000000001",
    "size_bytes": 14,
    "keys": {
      "key": "value2"
    },
    "old_image": {
      "key": "value1"
    },
    "new_image": {
      "key": "value2"
    }
  }
}

TODO

  • store sequence number to DynamoDB
  • fetch records from each shards concurrently