Hi All, Currently, I'm working on a usecase wherein I have to deserialie an Avro object and convert to some other format of Avro. Below is the flow.
DB -> Source Topic(Avro format) -> Stream Processor -> Target Topic (Avro as nested object). When I deserialize the message from the Source Topic, the below exception is thrown. Could someone help me resolving this issue? 2020-05-05 10:29:34.218 INFO 13804 --- [-StreamThread-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions 2020-05-05 10:29:34.218 INFO 13804 --- [-StreamThread-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions 2020-05-05 10:29:34.218 INFO 13804 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [confluent-kafka-poc-client-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING 2020-05-05 10:29:34.219 INFO 13804 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [confluent-kafka-poc-client] State transition from REBALANCING to RUNNING 2020-05-05 10:29:34.220 INFO 13804 --- [-StreamThread-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=confluent-kafka-poc-client-StreamThread-1-consumer, groupId=confluent-kafka-poc] Found no committed offset for partition DEMO-poc-0 2020-05-05 10:29:34.228 INFO 13804 --- [-StreamThread-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=confluent-kafka-poc-client-StreamThread-1-consumer, groupId=confluent-kafka-poc] Resetting offset for partition DEMO-poc-0 to offset 0. 2020-05-05 10:30:12.886 ERROR 13804 --- [-StreamThread-1] o.a.k.s.e.LogAndFailExceptionHandler : Exception caught during Deserialization, taskId: 0_0, topic: DEMO-poc, partition: 0, offset: 0 org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1421 *Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class "XYZ-Table" specified in writer's schema whilst finding reader's schema for a SpecificRecord.* 2020-05-05 10:30:12.888 ERROR 13804 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [confluent-kafka-poc-client-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors: org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately. at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:158) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:100) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:746) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1023) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:861) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) ~[kafka-streams-5.3.0-ccs.jar!/:na] Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1421 Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class "XYZ-Table" specified in writer's schema whilst finding reader's schema for a SpecificRecord. 2020-05-05 10:30:12.888 INFO 13804 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [confluent-kafka-poc-client-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN 2020-05-05 10:30:12.888 INFO 13804 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [confluent-kafka-poc-client-StreamThread-1] Shutting down 2020-05-05 10:30:12.891 INFO 13804 --- [-StreamThread-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions 2020-05-05 10:30:12.891 INFO 13804 --- [-StreamThread-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=confluent-kafka-poc-client-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 2020-05-05 10:30:12.895 INFO 13804 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [confluent-kafka-poc-client-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD 2020-05-05 10:30:12.895 INFO 13804 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [confluent-kafka-poc-client] State transition from RUNNING to ERROR 2020-05-05 10:30:12.895 ERROR 13804 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [confluent-kafka-poc-client] All stream threads have died. The instance will be in error state and should be closed. 2020-05-05 10:30:12.895 INFO 13804 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [confluent-kafka-poc-client-StreamThread-1] Shutdown complete Exception in thread "confluent-kafka-poc-client-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately. at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80) at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:158) at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:100) at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136) at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:746) at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1023) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:861) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1421 Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class "XYZ-Table" specified in writer's schema whilst finding reader's schema for a SpecificRecord. Thanks C Suresh