项目作者: tqlihuiqi

项目描述 :
导出指定时间范围的Kafka Topic数据
高级语言: Python
项目地址: git://github.com/tqlihuiqi/kafka_extract.git
创建时间: 2017-07-20T07:11:48Z
项目社区:https://github.com/tqlihuiqi/kafka_extract

开源协议:MIT License

关键词:
etl extract kafka kafka-extract

下载


导出指定时间范围的Kafka Topic数据

系统定期对Topic进行采样,用户可使用命令指定时间范围进行数据导出,导出的数据可以输出到文本文件或指定的Kafka Topic中。支持对AVRO数据导出。

安装配置

  1. cd kafka_extract; pip install -r requirements.txt
  • etc/config.conf 采集数据相关配置

管理收集进程

  1. cd kafka_extract/sbin; ./collectd start | stop | status

列出可用topic

  1. ./list --cluster testCluster
  2. Cluster: testCluster
  3. Topic: testTopic1
  4. Topic: testTopic2

模糊查询可导出的数据时间范围

  1. ./query -c testCluster -t testTopic1 -s "2017-07-01 00:00:01" -e "2017-07-30 00:00:01"
  2. 2017-07-19 17:25:42
  3. 2017-07-19 17:27:15
  4. 2017-07-19 17:37:19
  5. 2017-07-19 17:47:23

导出数据到本地文件

  1. ./extract -c testCluster -t testTopic1 -s "2017-07-19 17:37:19" -e "2017-07-19 17:47:23" \
  2. --output disk \
  3. --disk_dir ./
  4. Partition 0: 1973162837/1973162837 Offsets.
  5. Partition 1: 1973094402/1973094402 Offsets.
  6. Partition 2: 1973095771/1973095771 Offsets.
  7. Partition 3: 1973163811/1973163811 Offsets.
  8. Partition 4: 1973174725/1973174725 Offsets.
  9. Partition 5: 1973183975/1973183975 Offsets.
  10. Partition 6: 1973269887/1973269887 Offsets.
  11. Partition 7: 1973125716/1973125716 Offsets.

导出数据到指定Kafka Topic

  1. ./extract -c testCluster -t testTopic1 -s "2017-07-19 17:37:19" -e "2017-07-19 17:47:23" \
  2. --output kafka \
  3. --target_brokers 10.0.0.1:9092,10.0.0.2:9092 \
  4. --target_topic targetTopic \
  5. --compression_type snappy
  6. Partition 0: 1973162837/1973162837 Offsets.
  7. Partition 1: 1973094402/1973094402 Offsets.
  8. Partition 2: 1973095771/1973095771 Offsets.
  9. Partition 3: 1973163811/1973163811 Offsets.
  10. Partition 4: 1973174725/1973174725 Offsets.
  11. Partition 5: 1973183975/1973183975 Offsets.
  12. Partition 6: 1973269887/1973269887 Offsets.
  13. Partition 7: 1973125716/1973125716 Offsets.

导出AVRO数据到磁盘

  1. ./extract -c testCluster -t testTopic2 -s "2017-07-19 17:38:19" -e "2017-07-19 17:48:23" \
  2. --output disk \
  3. --disk_dir ./ \
  4. --avro_schema "{'doc': 'A weather reading.', 'name': 'Weather', 'namespace': 'test', 'type': 'record', 'fields': [{'name': 'station', 'type': 'string'}, {'name': 'time', 'type': 'long'}, {'name': 'temp', 'type': 'int'}]}"
  5. Partition 0: 1837/1837 Offsets.
  6. Partition 1: 1402/1402 Offsets.
  7. Partition 2: 1971/1971 Offsets.
  8. Partition 3: 1790/1790 Offsets.