Guozhang, I tried your suggestion. Below is the log from Serde, Serializer & Deserializer. Confirmed that KafkaAvroDeserializer.configure does get invoked.
Line 379: 16/09/22 15:28:46 WARN GenericAvroSerdeWithSchemaRegistry: In > configure {num.standby.replicas=1, replication.factor=3, > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092, > schema.registry.url=http://10.200.184.41:8081, > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde, > zookeeper.connect=10.200.184.26:2181, value.serde=class > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry, > auto.offset.reset=earliest, num.stream.threads=1, client.id=KStreams-Test, > application.id=testing-app} > Line 380: 16/09/22 15:28:46 WARN GenericAvroSerializerWithSchemaRegistry: > In configure{num.standby.replicas=1, replication.factor=3, > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092, > schema.registry.url=http://10.200.184.41:8081, > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde, > zookeeper.connect=10.200.184.26:2181, value.serde=class > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry, > auto.offset.reset=earliest, num.stream.threads=1, client.id=KStreams-Test, > application.id=testing-app} > Line 385: 16/09/22 15:28:46 WARN > GenericAvroDeserializerWithSchemaRegistry: In > configure{num.standby.replicas=1, replication.factor=3, > commit.interval.ms=125000, > bootstrap.servers=10.200.184.29:9092, schema.registry.url= > http://10.200.184.41:8081, > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde, > zookeeper.connect=10.200.184.26:2181, value.serde=class > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry, > auto.offset.reset=earliest, num.stream.threads=1, client.id=KStreams-Test, > application.id=testing-app} Still the same exception 16/09/22 15:28:47 INFO StreamThread: Stream thread shutdown complete > [StreamThread-1] > Exception in thread "StreamThread-1" > org.apache.kafka.common.errors.SerializationException: Error deserializing > Avro message for id 7 > Caused by: java.lang.NullPointerException > at > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:120) > at > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92) Walter On Mon, Sep 19, 2016 at 8:25 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Walter, > > The WARN log entry should not be the cause of this issue. > > I double checked the 0.10.0.0 release and this issue should not really > happen, so your observation is a bit weird to me. Could your add a log > entry in the `configure` function which constructs the registry client to > make sure it is indeed triggered when the streams app start up? > > > Guozhang > > > > On Fri, Sep 16, 2016 at 2:27 PM, Walter rakoff <walter.rak...@gmail.com> > wrote: > > > Guozhang, > > > > Any clues on this one? > > > > Walter > > > > On Wed, Sep 14, 2016 at 9:46 PM, Walter rakoff <walter.rak...@gmail.com> > > wrote: > > > > > Guozhang, > > > > > > I am using 0.10.0.0. Could the below log be the cause? > > > > > > 16/09/14 17:24:35 WARN ConsumerConfig: The configuration > > > schema.registry.url = http://192.168.50.6: <http://192.168.50.6:8081/> > > 8081 > > > was supplied but isn't a known config. > > > 16/09/14 17:24:35 INFO AppInfoParser: Kafka version : 0.10.0.0 > > > 16/09/14 17:24:35 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13 > > > > > > Walter > > > > > > > > > On Wed, Sep 14, 2016 at 8:11 PM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > >> Hello Walter, > > >> > > >> Which version of Kafka were you using? > > >> > > >> I ask this because there was a bug causing the serde passed through > > config > > >> to NOT being configured when constructed: > > >> https://issues.apache.org/jira/browse/KAFKA-3639 > > >> > > >> > > >> Which is fixed in the 0.10.0.0 release, which means you will only hit > it > > >> if > > >> you are using the tech-preview release version. > > >> > > >> > > >> Guozhang > > >> > > >> > > >> On Wed, Sep 14, 2016 at 11:10 AM, Walter rakoff < > > walter.rak...@gmail.com> > > >> wrote: > > >> > > >> > Hello, > > >> > > > >> > I get the below exception when deserilaizing avro records > > >> > using KafkaAvroDeserializer. > > >> > > > >> > 16/09/14 17:24:39 INFO StreamThread: Stream thread shutdown complete > > >> > [StreamThread-1] > > >> > Exception in thread "StreamThread-1" > > >> > org.apache.kafka.common.errors.SerializationException: > > >> > Error deserializing Avro message for id 4 > > >> > Caused by: java.lang.NullPointerException > > >> > at io.confluent.kafka.serializers. > > AbstractKafkaAvroDeserializer > > >> . > > >> > deserialize(AbstractKafkaAvroDeserializer.java:120) > > >> > at io.confluent.kafka.serializers. > > AbstractKafkaAvroDeserializer > > >> . > > >> > deserialize(AbstractKafkaAvroDeserializer.java:92) > > >> > > > >> > I can confirm that schema registry URL is accessible and > > >> url/schemas/ids/4 > > >> > does return valid schema. > > >> > May be some initialization didn't happen correctly? > > >> > > > >> > props.put(AbstractKafkaAvroSerDeConfig. > > SCHEMA_REGISTRY_URL_CONFIG, > > >> " > > >> > 192.168.50.6:8081") > > >> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > >> > Serdes.Long.getClass.getName) > > >> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[ > > >> > GenericAvroSerdeWithSchemaRegistry]) > > >> > > > >> > GenericAvroSerdeWithSchemaRegistry code --> > > https://www.dropbox.com/s/ > > >> > y471k9nj94tlxro/avro_serde.txt?dl=0 > > >> > > > >> > Walter > > >> > > > >> > > >> > > >> > > >> -- > > >> -- Guozhang > > >> > > > > > > > > > > > > -- > -- Guozhang >