No release in over 3 years
Low commit activity in last 3 years
There's a lot of open issues
Google Cloud Pub/Sub input/output plugin for Fluentd event collector
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
 Dependencies

Development

Runtime

>= 0.14.15, < 2
 Project Readme

fluent-plugin-gcloud-pubsub-custom

Test Gem Version

This plugin is forked from https://github.com/mdoi/fluent-plugin-gcloud-pubsub

Overview

Google Cloud Pub/Sub Input/Output(BufferedOutput) plugin for Fluentd with google-cloud gem

  • Publish messages to Google Cloud Pub/Sub
  • Pull messages from Google Cloud Pub/Sub

Preparation

  • Create a project on Google Developer Console
  • Add a topic of Cloud Pub/Sub to the project
  • Add a pull style subscription to the topic
  • Download your credential (json) or set scope on GCE instance

When using output plugin, you need to grant Pub/Sub Publisher and Pub/Sub Viewer role to IAM.

Also, when using input plugin, you need to grant Pub/Sub Subscriber and Pub/Sub Viewer role to IAM.

Requirements

fluent-plugin-gcloud-pubsub-custom fluentd ruby
>= 1.0.0 >= v0.14.0 >= 2.1
< 1.0.0 >= v0.12.0 >= 1.9

Installation

Install by gem:

$ gem install fluent-plugin-gcloud-pubsub-custom

Caution

This plugin doesn't work in td-agent.

Please use in Fluentd installed by gem.

Configuration

Publish messages

Use gcloud_pubsub output plugin.

<match example.publish>
  @type gcloud_pubsub
  project <YOUR PROJECT>
  key <YOUR KEY>
  topic <YOUR TOPIC>
  autocreate_topic false
  max_messages 1000
  max_total_size 9800000
  max_message_size 4000000
  endpoint <Endpoint URL>
  emulator_host <PUBSUB_EMULATOR_HOST>
  timeout 60
  <buffer>
    @type memory
    flush_interval 1s
  </buffer>
  <format>
    @type json
  </format>
</match>
  • project (optional)
    • Set your GCP project.
    • Running fluentd on GCP, you don't have to specify.
    • You can also use environment variable such as GCLOUD_PROJECT.
  • key (optional)
    • Set your credential file path.
    • Running fluentd on GCP, you can use scope instead of specifying this.
    • You can also use environment variable such as GCLOUD_KEYFILE.
  • topic (required)
  • dest_project (optional, default: nil)
    • Set your destination GCP project if you publish messages cross project.
  • autocreate_topic (optional, default: false)
    • If set to true, specified topic will be created when it doesn't exist.
  • max_messages (optional, default: 1000)
  • max_total_size (optional, default: 9800000 = 9.8MB)
  • max_message_size (optional, default: 4000000 = 4MB)
    • Messages exceeding max_message_size are not published because Pub/Sub clients cannot receive it.
  • attribute_keys (optional, default: [])
    • Publishing the set fields as attributes generated from input message.
  • attribute_key_values (optional, default: {})
    • Publishing the set fields as attributes generated from input params
  • endpoint(optional)
  • compression (optional, default: nil)
    • If set to gzip, messages will be compressed with gzip.
  • emulator_host(optional)
  • timeout (optional)
    • Set default timeout to use in publish requests.

Pull messages

Use gcloud_pubsub input plugin.

<source>
  @type gcloud_pubsub
  tag example.pull
  project <YOUR PROJECT>
  key <YOUR KEY>
  topic <YOUR TOPIC>
  subscription <YOUR SUBSCRIPTION>
  max_messages 1000
  return_immediately true
  pull_interval 0.5
  pull_threads 2
  parse_error_action exception
  enable_rpc true
  rpc_bind 0.0.0.0
  rpc_port 24680
  <parse>
    @type json
  </parse>
</source>
  • tag (required)
    • Set tag of messages.
    • If tag_key is specified, tag is used as tag when record don't have specified key.
  • tag_key (optional)
    • Set key to be used as tag.
  • project (optional)
    • Set your GCP project
    • Running fluentd on GCP, you don't have to specify.
    • You can also use environment variable such as GCLOUD_PROJECT.
  • key (optional)
    • Set your credential file path.
    • Running fluentd on GCP, you can use scope instead of specifying this.
    • You can also use environment variable such as GCLOUD_KEYFILE.
  • topic (optional)
    • Set topic name that the subscription belongs to.
  • subscription (required)
    • Set subscription name to pull.
  • max_messages (optional, default: 100)
  • return_immediately (optional, default: true)
  • pull_interval (optional, default: 5.0)
    • Pulling messages by intervals of specified seconds.
  • pull_threads (optional, default: 1)
    • Set number of threads to pull messages.
  • attribute_keys (optional, default: [])
    • Specify the key of the attribute to be emitted as the field of record.
  • parse_error_action (optional, default: exception)
    • Set error type when parsing messages fails.
      • exception: Raise exception. Messages are not acknowledged.
      • warning: Only logging as warning.
  • enable_rpc (optional, default: false)
    • If true is specified, HTTP RPC to stop or start pulling message is enabled.
  • rpc_bind (optional, default: 0.0.0.0)
    • Bind IP address for HTTP RPC.
  • rpc_port (optional, default: 24680)
    • Port for HTTP RPC.
  • decompression (optional, default: nil)
    • If set to gzip, messages will be decompressed with gzip.

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create a new Pull Request

TODO

  • Add tag attribute in output plugin and use tag attribute as tag in input plugin.
  • Send ack after other output plugin committed (if possible).

Authors