POC testing Kafka, Kafka Streams & Kafka Connect
# this will build the maven projects and build the docker images.
mvn package
Production alike
# launch all services
docker-compose up -d --scale heart-beat-producer=3 --scale heart-rate-computor=3
# wait until all services are started then setup environment
./scripts/setup.sh
# go to the heart-rate-consumer admin interface
firefox http://localhost/ &
Local environment
If you want to run apps directly from your IDE, you can’t use the kafka cluster from docker-compose.yml because the
domain name configured in Kafka is only available for service in the docker network, which is not possible to hook
from IDE launched app. Thus, a docker-compose-local.yml
file is here to launch a single kafka instance.
# launch all services
docker-compose -f docker-compose-dep.yml up -d
# then use same commands as above
More information on the schema-registry project documentation.
The following examples use HTTPie as the HTTP client to perform the HTTP requests:
# list all subjects
http :8081/subjects
# list all schema versions registered under the subject "heart-beats-value"
http :8081/subjects/heart-beats-value/versions
# fetch version 1 of the schema registered under the subject "heart-beats-value"
http :8081/subjects/heart-beats-value/versions/1
# fetch the most recently registered schema registered under the subject "heart-beats-value"
http :8081/subjects/heart-beats-value/versions/latest
# create heart beat avro schema in the schema registry
echo "{\"schema\":\"$(jq -c . < heart-models/src/main/resources/avro/HeartRate.avsc | sed 's/"/\\"/g')\"}" | http :8081/subjects/heart-rates-value/versions "Content-Type: application/vnd.schemaregistry.v1+json"
# create heart rate avro schema in the schema registry
echo "{\"schema\":\"$(jq -c . < heart-models/src/main/resources/avro/HeartRate.avsc | sed 's/"/\\"/g')\"}" | http :8081/subjects/heart-rates-value/versions "Content-Type: application/vnd.schemaregistry.v1+json"
# consume Avro messages to check what was sent to Kafka:
docker exec -it schema-registry \
/usr/bin/kafka-avro-console-consumer \
--bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 \
--topic heart-beats \
--from-beginning
If breaking change, then the schema registry will throw a HTTP 409 and the application will get an error like:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error
code: 409
More information by Confluent on schema evolution and compatibility.
# list kafka connectors
http :8082/connectors
# add a new connector
curl -X "POST" "http://localhost:8082/connectors/" \
-H "Content-Type: application/json" \
-d '{
"name": "heart-rate-connector-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://db:5432/heart_monitor?applicationName=heart-rate-connector",
"connection.user": "postgres",
"connection.password": "postgres",
"auto.create":"true",
"auto.evolve":"true",
"pk.mode": "kafka",
"topics": "heart-rates",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"transforms": "ExtractTimestamp,RenameField",
"transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.ExtractTimestamp.timestamp.field" : "extract_ts",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames" : "userId:user_id,isReset:is_reset"
}
}'
Kafka
Avro
Spring
Reactor
KSQL
Front