项目作者: marcadamsge

项目描述 :
Fluentd filter plugin to deduplicate records for InfluxDB
高级语言: Ruby
项目地址: git://github.com/marcadamsge/fluent-plugin-influxdb-deduplication.git
创建时间: 2021-02-23T20:46:38Z
项目社区:https://github.com/marcadamsge/fluent-plugin-influxdb-deduplication

开源协议:MIT License

下载


Fluentd filter plugin to deduplicate records for InfluxDB

A filter plugin that implements the deduplication techniques described in
the InfluxDB doc.

Installation

Using RubyGems:

  1. fluent-gem install fluent-plugin-influxdb-deduplication

Configuration

Deduplicate by incrementing the timestamp

Each data point is assigned a unique timestamp. The filter plugin reads the fluentd record event time with a precision
to the second, and stores it in a field with a precision to the nanosecond. Any sequence of record with the same
timestamp has a timestamp incremented by 1 nanosecond.

  1. <filter pattern>
  2. @type influxdb_deduplication
  3. <time>
  4. # field to store the deduplicated timestamp
  5. key my_key_field
  6. </time>
  7. </filter>

For example, the following input records:

Fluentd Event Time Record
1613910640 { “k1” => 0, “k2” => “value0” }
1613910640 { “k1” => 1, “k2” => “value1” }
1613910640 { “k1” => 2, “k2” => “value2” }
1613910641 { “k1” => 3, “k3” => “value3” }

Would become on output:

Fluentd Event Time Record
1613910640 { “k1” => 0, “k2” => “value0”, “my_key_field” => 1613910640000000000 }
1613910640 { “k1” => 1, “k2” => “value1”, “my_key_field” => 1613910640000000001 }
1613910640 { “k1” => 2, “k2” => “value2”, “my_key_field” => 1613910640000000002 }
1613910641 { “k1” => 3, “k3” => “value3”, “my_key_field” => 1613910643000000000 }

The time key field can then be passed as is to
the fluent-plugin-influxdb-v2. Example configuration on nginx
logs:

  1. <filter nginx.access>
  2. @type influxdb_deduplication
  3. <time>
  4. # field to store the deduplicated timestamp
  5. key my_key_field
  6. </time>
  7. </filter>
  8. <match nginx.access>
  9. @type influxdb2
  10. # setup the access to your InfluxDB v2 instance
  11. url https://localhost:8086
  12. token my-token
  13. bucket my-bucket
  14. org my-org
  15. # the influxdb2 time_key must be set to the same value as the influxdb_deduplication time.key
  16. time_key my_key_field
  17. # the timestamp precision must be set to ns
  18. time_precision ns
  19. tag_keys ["request_method", "status"]
  20. field_keys ["remote_addr", "request_uri"]
  21. </match>

The data can then be queried as a table and viewed in Grafana for example with the flux query:

  1. from(bucket: "my-bucket")
  2. |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  3. |> pivot(
  4. rowKey: ["_time"],
  5. columnKey: ["_field"],
  6. valueColumn: "_value"
  7. )
  8. |> keep(columns: ["_time", "request_method", "status", "remote_addr", "request_uri"])

Deduplicate by adding a sequence tag

Each record is assigned a sequence number, the output record can be uniquely identified by the pair (fluentd_event_time,
sequence_number). The event time is untouched so no precision is lost for time.

  1. <filter pattern>
  2. @type influxdb_deduplication
  3. <tag>
  4. # field to store the deduplicated timestamp
  5. key my_key_field
  6. </tag>
  7. </filter>

For example, the following input records:

Fluentd Event Time Record
1613910640 { “k1” => 0, “k2” => “value0” }
1613910640 { “k1” => 1, “k2” => “value1” }
1613910640 { “k1” => 2, “k2” => “value2” }
1613910641 { “k1” => 3, “k3” => “value3” }

Would become on output:

Fluentd Event Time Record
1613910640 { “k1” => 0, “k2” => “value0”, “my_key_field” => 0 }
1613910640 { “k1” => 1, “k2” => “value1”, “my_key_field” => 1 }
1613910640 { “k1” => 2, “k2” => “value2”, “my_key_field” => 2 }
1613910641 { “k1” => 3, “k3” => “value3”, “my_key_field” => 0 }

The sequence tag should be passed in the tag parameters
of fluent-plugin-influxdb-v2. Example configuration on nginx
logs:

  1. <filter nginx.access>
  2. @type influxdb_deduplication
  3. <time>
  4. # field to store the deduplicated timestamp
  5. key my_key_field
  6. </time>
  7. </filter>
  8. <match nginx.access>
  9. @type influxdb2
  10. # setup the access to your InfluxDB v2 instance
  11. url https://localhost:8086
  12. token my-token
  13. bucket my-bucket
  14. org my-org
  15. # the influxdb2 time_key is not specified so the fluentd event time is used
  16. # time_key
  17. # there's no requirements on the time_precision value this time
  18. # time_precision ns
  19. # "my_key_field" must be passed to influxdb's tag_keys
  20. tag_keys ["request_method", "status", "my_key_field"]
  21. field_keys ["remote_addr", "request_uri"]
  22. </match>

The data can then be queried as a table and viewed in Grafana for example with the flux query:

  1. from(bucket: "my-bucket")
  2. |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  3. |> pivot(
  4. rowKey: ["_time", "my_key_field"],
  5. columnKey: ["_field"],
  6. valueColumn: "_value"
  7. )
  8. |> keep(columns: ["_time", "request_method", "status", "remote_addr", "request_uri"])

Detecting out of order records

This filter plugin expects the fluentd event timestamps of the incoming record to increase and never decrease.
Optionally, a order key can be added to indicate if the record arrived in order or not. For example with this config

  1. <filter pattern>
  2. @type influxdb_deduplication
  3. order_key order_field
  4. <time>
  5. # field to store the deduplicated timestamp
  6. key my_key_field
  7. </time>
  8. </filter>

Without order key, out of order records are dropped to avoid previous data points being overridden. With a order key,
out of order records will still be pushed but with order_field = false. Out of order records are not deduplicated but
they will be apparent in influxdb.