Thanks for the info Nagendra.

Thanks
C Suresh

On Wednesday, May 6, 2020, Nagendra Korrapati <nkorrap...@icloud.com.invalid>
wrote:

>  When specific.avro.reader is set to true Deserializer tries to create the
> instance of the Class. The class name is formed by reading the schema
> (writer schema) from schema registry and concatenating the namespace and
> record name. It is trying to create that instance and it is not found in
> the class path. But I am not sure how it formed the name XYZ-Table (Check
> the namespace and name of the record in the schema registry and making the
> class available in the class path should solve it )This is my
> understanding. I may be wrong!!
>
> Nagendra
>
> > On May 5, 2020, at 11:12 AM, Suresh Chidambaram <chida.sur...@gmail.com>
> wrote:
> >
> > Hi All,
> >
> > Currently, I'm working on a usecase wherein I have to deserialie an Avro
> > object and convert to some other format of Avro. Below is the  flow.
> >
> > DB -> Source Topic(Avro format) -> Stream Processor -> Target Topic (Avro
> > as nested object).
> >
> > When I deserialize the message from the Source Topic, the below exception
> > is thrown.
> >
> > Could someone help me resolving this issue?
> >
> > 2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
> > o.a.k.clients.consumer.KafkaConsumer     : [Consumer
> > clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer,
> > groupId=null] Unsubscribed all topics or patterns and assigned partitions
> > 2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
> > o.a.k.clients.consumer.KafkaConsumer     : [Consumer
> > clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer,
> > groupId=null] Unsubscribed all topics or patterns and assigned partitions
> > 2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
> > o.a.k.s.p.internals.StreamThread         : stream-thread
> > [confluent-kafka-poc-client-StreamThread-1] State transition from
> > PARTITIONS_ASSIGNED to RUNNING
> > 2020-05-05 10:29:34.219  INFO 13804 --- [-StreamThread-1]
> > org.apache.kafka.streams.KafkaStreams    : stream-client
> > [confluent-kafka-poc-client] State transition from REBALANCING to RUNNING
> > 2020-05-05 10:29:34.220  INFO 13804 --- [-StreamThread-1]
> > o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer
> > clientId=confluent-kafka-poc-client-StreamThread-1-consumer,
> > groupId=confluent-kafka-poc] Found no committed offset for partition
> > DEMO-poc-0
> > 2020-05-05 10:29:34.228  INFO 13804 --- [-StreamThread-1]
> > o.a.k.c.c.internals.SubscriptionState    : [Consumer
> > clientId=confluent-kafka-poc-client-StreamThread-1-consumer,
> > groupId=confluent-kafka-poc] Resetting offset for partition DEMO-poc-0 to
> > offset 0.
> > 2020-05-05 10:30:12.886 ERROR 13804 --- [-StreamThread-1]
> > o.a.k.s.e.LogAndFailExceptionHandler     : Exception caught during
> > Deserialization, taskId: 0_0, topic: DEMO-poc, partition: 0, offset: 0
> >
> > org.apache.kafka.common.errors.SerializationException: Error
> deserializing
> > Avro message for id 1421
> >
> > *Caused by: org.apache.kafka.common.errors.SerializationException: Could
> > not find class "XYZ-Table" specified in writer's schema whilst finding
> > reader's schema for a SpecificRecord.*
> > 2020-05-05 10:30:12.888 ERROR 13804 --- [-StreamThread-1]
> > o.a.k.s.p.internals.StreamThread         : stream-thread
> > [confluent-kafka-poc-client-StreamThread-1] Encountered the following
> > unexpected Kafka exception during processing, this usually indicate
> Streams
> > internal errors:
> >
> > org.apache.kafka.streams.errors.StreamsException: Deserialization
> exception
> > handler is set to fail upon a deserialization error. If you would rather
> > have the streaming pipeline continue after a deserialization error,
> please
> > set the default.deserialization.exception.handler appropriately.
> >        at
> > org.apache.kafka.streams.processor.internals.RecordDeserializer.
> deserialize(RecordDeserializer.java:80)
> > ~[kafka-streams-5.3.0-ccs.jar!/:na]
> >        at
> > org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(
> RecordQueue.java:158)
> > ~[kafka-streams-5.3.0-ccs.jar!/:na]
> >        at
> > org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(
> RecordQueue.java:100)
> > ~[kafka-streams-5.3.0-ccs.jar!/:na]
> >        at
> > org.apache.kafka.streams.processor.internals.
> PartitionGroup.addRawRecords(PartitionGroup.java:136)
> > ~[kafka-streams-5.3.0-ccs.jar!/:na]
> >        at
> > org.apache.kafka.streams.processor.internals.StreamTask.addRecords(
> StreamTask.java:746)
> > ~[kafka-streams-5.3.0-ccs.jar!/:na]
> >        at
> > org.apache.kafka.streams.processor.internals.StreamThread.
> addRecordsToTasks(StreamThread.java:1023)
> > ~[kafka-streams-5.3.0-ccs.jar!/:na]
> >        at
> > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:861)
> > ~[kafka-streams-5.3.0-ccs.jar!/:na]
> >        at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:805)
> > ~[kafka-streams-5.3.0-ccs.jar!/:na]
> >        at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:774)
> > ~[kafka-streams-5.3.0-ccs.jar!/:na]
> > Caused by: org.apache.kafka.common.errors.SerializationException: Error
> > deserializing Avro message for id 1421
> > Caused by: org.apache.kafka.common.errors.SerializationException: Could
> not
> > find class "XYZ-Table" specified in writer's schema whilst finding
> reader's
> > schema for a SpecificRecord.
> >
> > 2020-05-05 10:30:12.888  INFO 13804 --- [-StreamThread-1]
> > o.a.k.s.p.internals.StreamThread         : stream-thread
> > [confluent-kafka-poc-client-StreamThread-1] State transition from
> RUNNING
> > to PENDING_SHUTDOWN
> > 2020-05-05 10:30:12.888  INFO 13804 --- [-StreamThread-1]
> > o.a.k.s.p.internals.StreamThread         : stream-thread
> > [confluent-kafka-poc-client-StreamThread-1] Shutting down
> > 2020-05-05 10:30:12.891  INFO 13804 --- [-StreamThread-1]
> > o.a.k.clients.consumer.KafkaConsumer     : [Consumer
> > clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer,
> > groupId=null] Unsubscribed all topics or patterns and assigned partitions
> > 2020-05-05 10:30:12.891  INFO 13804 --- [-StreamThread-1]
> > o.a.k.clients.producer.KafkaProducer     : [Producer
> > clientId=confluent-kafka-poc-client-StreamThread-1-producer] Closing the
> > Kafka producer with timeoutMillis = 9223372036854775807 ms.
> > 2020-05-05 10:30:12.895  INFO 13804 --- [-StreamThread-1]
> > o.a.k.s.p.internals.StreamThread         : stream-thread
> > [confluent-kafka-poc-client-StreamThread-1] State transition from
> > PENDING_SHUTDOWN to DEAD
> > 2020-05-05 10:30:12.895  INFO 13804 --- [-StreamThread-1]
> > org.apache.kafka.streams.KafkaStreams    : stream-client
> > [confluent-kafka-poc-client] State transition from RUNNING to ERROR
> > 2020-05-05 10:30:12.895 ERROR 13804 --- [-StreamThread-1]
> > org.apache.kafka.streams.KafkaStreams    : stream-client
> > [confluent-kafka-poc-client] All stream threads have died. The instance
> > will be in error state and should be closed.
> > 2020-05-05 10:30:12.895  INFO 13804 --- [-StreamThread-1]
> > o.a.k.s.p.internals.StreamThread         : stream-thread
> > [confluent-kafka-poc-client-StreamThread-1] Shutdown complete
> > Exception in thread "confluent-kafka-poc-client-StreamThread-1"
> > org.apache.kafka.streams.errors.StreamsException: Deserialization
> exception
> > handler is set to fail upon a deserialization error. If you would rather
> > have the streaming pipeline continue after a deserialization error,
> please
> > set the default.deserialization.exception.handler appropriately.
> >        at
> > org.apache.kafka.streams.processor.internals.RecordDeserializer.
> deserialize(RecordDeserializer.java:80)
> >        at
> > org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(
> RecordQueue.java:158)
> >        at
> > org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(
> RecordQueue.java:100)
> >        at
> > org.apache.kafka.streams.processor.internals.
> PartitionGroup.addRawRecords(PartitionGroup.java:136)
> >        at
> > org.apache.kafka.streams.processor.internals.StreamTask.addRecords(
> StreamTask.java:746)
> >        at
> > org.apache.kafka.streams.processor.internals.StreamThread.
> addRecordsToTasks(StreamThread.java:1023)
> >        at
> > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:861)
> >        at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:805)
> >        at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:774)
> > Caused by: org.apache.kafka.common.errors.SerializationException: Error
> > deserializing Avro message for id 1421
> > Caused by: org.apache.kafka.common.errors.SerializationException: Could
> not
> > find class "XYZ-Table"  specified in writer's schema whilst finding
> > reader's schema for a SpecificRecord.
> >
> > Thanks
> > C Suresh
>
>

Reply via email to