Thanks for the info Nagendra. Thanks C Suresh
On Wednesday, May 6, 2020, Nagendra Korrapati <nkorrap...@icloud.com.invalid> wrote: > When specific.avro.reader is set to true Deserializer tries to create the > instance of the Class. The class name is formed by reading the schema > (writer schema) from schema registry and concatenating the namespace and > record name. It is trying to create that instance and it is not found in > the class path. But I am not sure how it formed the name XYZ-Table (Check > the namespace and name of the record in the schema registry and making the > class available in the class path should solve it )This is my > understanding. I may be wrong!! > > Nagendra > > > On May 5, 2020, at 11:12 AM, Suresh Chidambaram <chida.sur...@gmail.com> > wrote: > > > > Hi All, > > > > Currently, I'm working on a usecase wherein I have to deserialie an Avro > > object and convert to some other format of Avro. Below is the flow. > > > > DB -> Source Topic(Avro format) -> Stream Processor -> Target Topic (Avro > > as nested object). > > > > When I deserialize the message from the Source Topic, the below exception > > is thrown. > > > > Could someone help me resolving this issue? > > > > 2020-05-05 10:29:34.218 INFO 13804 --- [-StreamThread-1] > > o.a.k.clients.consumer.KafkaConsumer : [Consumer > > clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer, > > groupId=null] Unsubscribed all topics or patterns and assigned partitions > > 2020-05-05 10:29:34.218 INFO 13804 --- [-StreamThread-1] > > o.a.k.clients.consumer.KafkaConsumer : [Consumer > > clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer, > > groupId=null] Unsubscribed all topics or patterns and assigned partitions > > 2020-05-05 10:29:34.218 INFO 13804 --- [-StreamThread-1] > > o.a.k.s.p.internals.StreamThread : stream-thread > > [confluent-kafka-poc-client-StreamThread-1] State transition from > > PARTITIONS_ASSIGNED to RUNNING > > 2020-05-05 10:29:34.219 INFO 13804 --- [-StreamThread-1] > > org.apache.kafka.streams.KafkaStreams : stream-client > > [confluent-kafka-poc-client] State transition from REBALANCING to RUNNING > > 2020-05-05 10:29:34.220 INFO 13804 --- [-StreamThread-1] > > o.a.k.c.c.internals.ConsumerCoordinator : [Consumer > > clientId=confluent-kafka-poc-client-StreamThread-1-consumer, > > groupId=confluent-kafka-poc] Found no committed offset for partition > > DEMO-poc-0 > > 2020-05-05 10:29:34.228 INFO 13804 --- [-StreamThread-1] > > o.a.k.c.c.internals.SubscriptionState : [Consumer > > clientId=confluent-kafka-poc-client-StreamThread-1-consumer, > > groupId=confluent-kafka-poc] Resetting offset for partition DEMO-poc-0 to > > offset 0. > > 2020-05-05 10:30:12.886 ERROR 13804 --- [-StreamThread-1] > > o.a.k.s.e.LogAndFailExceptionHandler : Exception caught during > > Deserialization, taskId: 0_0, topic: DEMO-poc, partition: 0, offset: 0 > > > > org.apache.kafka.common.errors.SerializationException: Error > deserializing > > Avro message for id 1421 > > > > *Caused by: org.apache.kafka.common.errors.SerializationException: Could > > not find class "XYZ-Table" specified in writer's schema whilst finding > > reader's schema for a SpecificRecord.* > > 2020-05-05 10:30:12.888 ERROR 13804 --- [-StreamThread-1] > > o.a.k.s.p.internals.StreamThread : stream-thread > > [confluent-kafka-poc-client-StreamThread-1] Encountered the following > > unexpected Kafka exception during processing, this usually indicate > Streams > > internal errors: > > > > org.apache.kafka.streams.errors.StreamsException: Deserialization > exception > > handler is set to fail upon a deserialization error. If you would rather > > have the streaming pipeline continue after a deserialization error, > please > > set the default.deserialization.exception.handler appropriately. > > at > > org.apache.kafka.streams.processor.internals.RecordDeserializer. > deserialize(RecordDeserializer.java:80) > > ~[kafka-streams-5.3.0-ccs.jar!/:na] > > at > > org.apache.kafka.streams.processor.internals.RecordQueue.updateHead( > RecordQueue.java:158) > > ~[kafka-streams-5.3.0-ccs.jar!/:na] > > at > > org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords( > RecordQueue.java:100) > > ~[kafka-streams-5.3.0-ccs.jar!/:na] > > at > > org.apache.kafka.streams.processor.internals. > PartitionGroup.addRawRecords(PartitionGroup.java:136) > > ~[kafka-streams-5.3.0-ccs.jar!/:na] > > at > > org.apache.kafka.streams.processor.internals.StreamTask.addRecords( > StreamTask.java:746) > > ~[kafka-streams-5.3.0-ccs.jar!/:na] > > at > > org.apache.kafka.streams.processor.internals.StreamThread. > addRecordsToTasks(StreamThread.java:1023) > > ~[kafka-streams-5.3.0-ccs.jar!/:na] > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce( > StreamThread.java:861) > > ~[kafka-streams-5.3.0-ccs.jar!/:na] > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:805) > > ~[kafka-streams-5.3.0-ccs.jar!/:na] > > at > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:774) > > ~[kafka-streams-5.3.0-ccs.jar!/:na] > > Caused by: org.apache.kafka.common.errors.SerializationException: Error > > deserializing Avro message for id 1421 > > Caused by: org.apache.kafka.common.errors.SerializationException: Could > not > > find class "XYZ-Table" specified in writer's schema whilst finding > reader's > > schema for a SpecificRecord. > > > > 2020-05-05 10:30:12.888 INFO 13804 --- [-StreamThread-1] > > o.a.k.s.p.internals.StreamThread : stream-thread > > [confluent-kafka-poc-client-StreamThread-1] State transition from > RUNNING > > to PENDING_SHUTDOWN > > 2020-05-05 10:30:12.888 INFO 13804 --- [-StreamThread-1] > > o.a.k.s.p.internals.StreamThread : stream-thread > > [confluent-kafka-poc-client-StreamThread-1] Shutting down > > 2020-05-05 10:30:12.891 INFO 13804 --- [-StreamThread-1] > > o.a.k.clients.consumer.KafkaConsumer : [Consumer > > clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer, > > groupId=null] Unsubscribed all topics or patterns and assigned partitions > > 2020-05-05 10:30:12.891 INFO 13804 --- [-StreamThread-1] > > o.a.k.clients.producer.KafkaProducer : [Producer > > clientId=confluent-kafka-poc-client-StreamThread-1-producer] Closing the > > Kafka producer with timeoutMillis = 9223372036854775807 ms. > > 2020-05-05 10:30:12.895 INFO 13804 --- [-StreamThread-1] > > o.a.k.s.p.internals.StreamThread : stream-thread > > [confluent-kafka-poc-client-StreamThread-1] State transition from > > PENDING_SHUTDOWN to DEAD > > 2020-05-05 10:30:12.895 INFO 13804 --- [-StreamThread-1] > > org.apache.kafka.streams.KafkaStreams : stream-client > > [confluent-kafka-poc-client] State transition from RUNNING to ERROR > > 2020-05-05 10:30:12.895 ERROR 13804 --- [-StreamThread-1] > > org.apache.kafka.streams.KafkaStreams : stream-client > > [confluent-kafka-poc-client] All stream threads have died. The instance > > will be in error state and should be closed. > > 2020-05-05 10:30:12.895 INFO 13804 --- [-StreamThread-1] > > o.a.k.s.p.internals.StreamThread : stream-thread > > [confluent-kafka-poc-client-StreamThread-1] Shutdown complete > > Exception in thread "confluent-kafka-poc-client-StreamThread-1" > > org.apache.kafka.streams.errors.StreamsException: Deserialization > exception > > handler is set to fail upon a deserialization error. If you would rather > > have the streaming pipeline continue after a deserialization error, > please > > set the default.deserialization.exception.handler appropriately. > > at > > org.apache.kafka.streams.processor.internals.RecordDeserializer. > deserialize(RecordDeserializer.java:80) > > at > > org.apache.kafka.streams.processor.internals.RecordQueue.updateHead( > RecordQueue.java:158) > > at > > org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords( > RecordQueue.java:100) > > at > > org.apache.kafka.streams.processor.internals. > PartitionGroup.addRawRecords(PartitionGroup.java:136) > > at > > org.apache.kafka.streams.processor.internals.StreamTask.addRecords( > StreamTask.java:746) > > at > > org.apache.kafka.streams.processor.internals.StreamThread. > addRecordsToTasks(StreamThread.java:1023) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce( > StreamThread.java:861) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:805) > > at > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:774) > > Caused by: org.apache.kafka.common.errors.SerializationException: Error > > deserializing Avro message for id 1421 > > Caused by: org.apache.kafka.common.errors.SerializationException: Could > not > > find class "XYZ-Table" specified in writer's schema whilst finding > > reader's schema for a SpecificRecord. > > > > Thanks > > C Suresh > >