Ensure that your lab environment is clean - remove all docker instances from your former lab:
docker stop $(docker ps -q)
docker rm $(docker ps -a -q)
Make sure that the exercise environment is up and running:
docker-compose up -d
Connect to the schema registry
docker-compose exec schema-registry bash
For this exercise, the schema registry is already bundled with some schemas.
💡 Have a look to the schema registry: Cloud AKHQ or Local AKHQ
Start a producer with the schema sensor-1.avcs
kafka-avro-console-producer \
--topic avro-sensor \
--broker-list broker:29092 \
--property schema.registry.url=http://localhost:8081 \
--property value.schema="$(< /schemas/avro/sensor-v1.avsc)"
Send some messages:
💡 You can ignore messages with LEADER_NOT_AVAILABLE
{"sensor_id":"mySensor","datetime":1234,"value":{"long": 999}}
{"sensor_id":"myMotor","datetime":1235,"value":{"string": "starting"}}
💡 Inspect the schema that has been registered to the topic avro-sensor
: Cloud AKHQ or Local AKHQ
💡 You can also use schemas for keys
Start a consumer with the schema sensor-1.avcs:
kafka-avro-console-consumer \
--topic avro-sensor \
--bootstrap-server broker:29092 \
--property schema.registry.url=http://localhost:8081 \
--from-beginning
Start a producer with the schema sensor-2.avcs
- This schema now includes a new mandatory field
type
kafka-avro-console-producer \
--topic avro-sensor \
--broker-list broker:29092 \
--property schema.registry.url=http://localhost:8081 \
--property value.schema="$(< /schemas/avro/sensor-v2.avsc)"
Try to send a message that include the new mandatory field:
{"sensor_id":"mySensor","datetime":1234,"value":{"long": 999},"type":"m"}
📝 What happend? Check the log output and the AKHQ page
Start a producer with the schema sensor-3.avcs
- This schema adds a default value for
type
and this makes it optional
kafka-avro-console-producer \
--topic avro-sensor \
--broker-list broker:29092 \
--property schema.registry.url=http://localhost:8081 \
--property value.schema="$(< /schemas/avro/sensor-v3.avsc)"
Try to send some messages with the new field:
{"sensor_id":"mySensor","datetime":1234,"value":{"long": 999},"type":"m"}
{"sensor_id":"myMotor","datetime":1235,"value":{"string": "starting"},"type":"state"}
💡 Inspect the schema that has been registered to the topic avro-sensor
: Cloud AKHQ or Local AKHQ
📝 What is the version number of the schema?
Start a consumer with the schema sensor-3.avcs:
kafka-avro-console-consumer \
--topic avro-sensor \
--bootstrap-server broker:29092 \
--property schema.registry.url=http://localhost:8081 \
--from-beginning
📝 Are you able to read the data? Are you able to see the new field?
Have a look at SensorProducer.java
📝 Do you have to manually register the schemas in schema registry?