项目作者: l-lin

项目描述 :
POC testing Kafka, Kafka Streams & Kafka Connect
高级语言: Java
项目地址: git://github.com/l-lin/poc-kafka.git
创建时间: 2020-04-07T08:44:50Z
项目社区:https://github.com/l-lin/poc-kafka

开源协议:MIT License

下载


Example project that uses kafka streams

Java
Go

heart-monitor

Getting started

Build

  1. # this will build the maven projects and build the docker images.
  2. mvn package

Usage

Production alike

  1. # launch all services
  2. docker-compose up -d --scale heart-beat-producer=3 --scale heart-rate-computor=3
  3. # wait until all services are started then setup environment
  4. ./scripts/setup.sh
  5. # go to the heart-rate-consumer admin interface
  6. firefox http://localhost/ &

Local environment

If you want to run apps directly from your IDE, you can’t use the kafka cluster from docker-compose.yml because the
domain name configured in Kafka is only available for service in the docker network, which is not possible to hook
from IDE launched app. Thus, a docker-compose-local.yml file is here to launch a single kafka instance.

  1. # launch all services
  2. docker-compose -f docker-compose-dep.yml up -d
  3. # then use same commands as above

Useful commands

Avro Schema Registry

More information on the schema-registry project documentation.

The following examples use HTTPie as the HTTP client to perform the HTTP requests:

  1. # list all subjects
  2. http :8081/subjects
  3. # list all schema versions registered under the subject "heart-beats-value"
  4. http :8081/subjects/heart-beats-value/versions
  5. # fetch version 1 of the schema registered under the subject "heart-beats-value"
  6. http :8081/subjects/heart-beats-value/versions/1
  7. # fetch the most recently registered schema registered under the subject "heart-beats-value"
  8. http :8081/subjects/heart-beats-value/versions/latest
  9. # create heart beat avro schema in the schema registry
  10. echo "{\"schema\":\"$(jq -c . < heart-models/src/main/resources/avro/HeartRate.avsc | sed 's/"/\\"/g')\"}" | http :8081/subjects/heart-rates-value/versions "Content-Type: application/vnd.schemaregistry.v1+json"
  11. # create heart rate avro schema in the schema registry
  12. echo "{\"schema\":\"$(jq -c . < heart-models/src/main/resources/avro/HeartRate.avsc | sed 's/"/\\"/g')\"}" | http :8081/subjects/heart-rates-value/versions "Content-Type: application/vnd.schemaregistry.v1+json"
  13. # consume Avro messages to check what was sent to Kafka:
  14. docker exec -it schema-registry \
  15. /usr/bin/kafka-avro-console-consumer \
  16. --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 \
  17. --topic heart-beats \
  18. --from-beginning

If breaking change, then the schema registry will throw a HTTP 409 and the application will get an error like:

  1. io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error
  2. code: 409

More information by Confluent on schema evolution and compatibility.

Kafka connect

  1. # list kafka connectors
  2. http :8082/connectors
  3. # add a new connector
  4. curl -X "POST" "http://localhost:8082/connectors/" \
  5. -H "Content-Type: application/json" \
  6. -d '{
  7. "name": "heart-rate-connector-sink",
  8. "config": {
  9. "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  10. "connection.url": "jdbc:postgresql://db:5432/heart_monitor?applicationName=heart-rate-connector",
  11. "connection.user": "postgres",
  12. "connection.password": "postgres",
  13. "auto.create":"true",
  14. "auto.evolve":"true",
  15. "pk.mode": "kafka",
  16. "topics": "heart-rates",
  17. "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  18. "transforms": "ExtractTimestamp,RenameField",
  19. "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  20. "transforms.ExtractTimestamp.timestamp.field" : "extract_ts",
  21. "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  22. "transforms.RenameField.renames" : "userId:user_id,isReset:is_reset"
  23. }
  24. }'

Resources

Kafka

Avro

Spring

Reactor

KSQL

Front