Sunday, August 25, 2019

Kafka Avro schema registry trouble

Here is my kafka consumer exception :

23:24:51.268 [main] DEBUG i.c.k.s.client.rest.RestService - Sending GET with input null to http://localhost:8081/schemas/ids/1
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition rss-0 at offset 4091. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class org.rssnews.producer.AvroRSSItem specified in writer's schema whilst finding reader's schema for a SpecificRecord.



When I looked at producer code, I realized I renamed the org.rssnews.producer.AvroRSSItem to org.rssnews.producer.avro.AvroRSItem.




I am looking if I can fix the schema definition :

{"namespace": "org.rssnews.producer",
  "type": "record",
  "name": "AvroRSSItem",
  "fields": [
    {"name": "source", "type": "string"},
    {"name": "title",  "type": "string"},
    {"name": "url", "type": "string"},
    {"name": "description", "type": ["string","null"]},
    {"name": "publishedAt", "type": ["null",{"type":"long", "logicType":"timestamp-millis"}]}
  ]
}


I believe if I can fix the namespace in here. I should get the correct derserializer for the consumer.

Try to delete, but get the following error message :

D:\apps\kafka_2.11-2.3.0>curl -X GET http://127.0.0.1:8081/schemas/ids/1
{"schema":"{\"type\":\"record\",\"name\":\"AvroRSSItem\",\"namespace\":\"org.rssnews.producer\",\"fields\":[{\"name\":\"source\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"title\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"url\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"description\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\"},\"null\"]},{\"name\":\"publishedAt\",\"type\":[\"null\",{\"type\":\"long\",\"logicType\":\"timestamp-millis\"}]}]}"}                                   D:\apps\kafka_2.11-2.3.0>curl -X DELETE http://127.0.0.1:8081/schemas/ids/1
{"error_code":405,"message":"HTTP 405 Method Not Allowed"}                                                              

D:\apps\kafka_2.11-2.3.0>

1 comment:

Alex said...

To fix it, just redo the schema registration.