Hi Walter, I downloaded the 0.10.0 jar and verified that the configure() function is auto-triggered when you get the serde classes from `context.keySerde / valueSerde`, which is auto-triggered if you use the DSL. And your Scala code is the same as to our examples code:
https://github.com/confluentinc/examples/blob/030343f9acbb9f73a13ab0f0fd31690dca97c606/kafka-streams/src/main/java/io/confluent/examples/streams/utils/GenericAvroSerializer.java Which demo example were you running? And are there any other jars co-located with the 0.10.0.0 jar that could cause another class be loaded? Guozhang On Thu, Sep 22, 2016 at 8:41 AM, Walter rakoff <walter.rak...@gmail.com> wrote: > Guozhang, > > I tried your suggestion. Below is the log from Serde, Serializer > & Deserializer. > Confirmed that KafkaAvroDeserializer.configure does get invoked. > > Line 379: 16/09/22 15:28:46 WARN GenericAvroSerdeWithSchemaRegistry: In > > configure {num.standby.replicas=1, replication.factor=3, > > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092, > > schema.registry.url=http://10.200.184.41:8081, > > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde, > > zookeeper.connect=10.200.184.26:2181, value.serde=class > > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry, > > auto.offset.reset=earliest, num.stream.threads=1, client.id > =KStreams-Test, > > application.id=testing-app} > > Line 380: 16/09/22 15:28:46 WARN GenericAvroSerializerWithSchem > aRegistry: > > In configure{num.standby.replicas=1, replication.factor=3, > > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092, > > schema.registry.url=http://10.200.184.41:8081, > > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde, > > zookeeper.connect=10.200.184.26:2181, value.serde=class > > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry, > > auto.offset.reset=earliest, num.stream.threads=1, client.id > =KStreams-Test, > > application.id=testing-app} > > Line 385: 16/09/22 15:28:46 WARN > > GenericAvroDeserializerWithSchemaRegistry: In > > configure{num.standby.replicas=1, replication.factor=3, > commit.interval.ms=125000, > > bootstrap.servers=10.200.184.29:9092, schema.registry.url= > > http://10.200.184.41:8081, > > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde, > > zookeeper.connect=10.200.184.26:2181, value.serde=class > > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry, > > auto.offset.reset=earliest, num.stream.threads=1, client.id > =KStreams-Test, > > application.id=testing-app} > > > Still the same exception > > 16/09/22 15:28:47 INFO StreamThread: Stream thread shutdown complete > > [StreamThread-1] > > Exception in thread "StreamThread-1" > > org.apache.kafka.common.errors.SerializationException: Error > deserializing > > Avro message for id 7 > > Caused by: java.lang.NullPointerException > > at > > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer. > deserialize(AbstractKafkaAvroDeserializer.java:120) > > at > > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer. > deserialize(AbstractKafkaAvroDeserializer.java:92) > > > > Walter > > On Mon, Sep 19, 2016 at 8:25 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Walter, > > > > The WARN log entry should not be the cause of this issue. > > > > I double checked the 0.10.0.0 release and this issue should not really > > happen, so your observation is a bit weird to me. Could your add a log > > entry in the `configure` function which constructs the registry client to > > make sure it is indeed triggered when the streams app start up? > > > > > > Guozhang > > > > > > > > On Fri, Sep 16, 2016 at 2:27 PM, Walter rakoff <walter.rak...@gmail.com> > > wrote: > > > > > Guozhang, > > > > > > Any clues on this one? > > > > > > Walter > > > > > > On Wed, Sep 14, 2016 at 9:46 PM, Walter rakoff < > walter.rak...@gmail.com> > > > wrote: > > > > > > > Guozhang, > > > > > > > > I am using 0.10.0.0. Could the below log be the cause? > > > > > > > > 16/09/14 17:24:35 WARN ConsumerConfig: The configuration > > > > schema.registry.url = http://192.168.50.6: < > http://192.168.50.6:8081/> > > > 8081 > > > > was supplied but isn't a known config. > > > > 16/09/14 17:24:35 INFO AppInfoParser: Kafka version : 0.10.0.0 > > > > 16/09/14 17:24:35 INFO AppInfoParser: Kafka commitId : > b8642491e78c5a13 > > > > > > > > Walter > > > > > > > > > > > > On Wed, Sep 14, 2016 at 8:11 PM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > >> Hello Walter, > > > >> > > > >> Which version of Kafka were you using? > > > >> > > > >> I ask this because there was a bug causing the serde passed through > > > config > > > >> to NOT being configured when constructed: > > > >> https://issues.apache.org/jira/browse/KAFKA-3639 > > > >> > > > >> > > > >> Which is fixed in the 0.10.0.0 release, which means you will only > hit > > it > > > >> if > > > >> you are using the tech-preview release version. > > > >> > > > >> > > > >> Guozhang > > > >> > > > >> > > > >> On Wed, Sep 14, 2016 at 11:10 AM, Walter rakoff < > > > walter.rak...@gmail.com> > > > >> wrote: > > > >> > > > >> > Hello, > > > >> > > > > >> > I get the below exception when deserilaizing avro records > > > >> > using KafkaAvroDeserializer. > > > >> > > > > >> > 16/09/14 17:24:39 INFO StreamThread: Stream thread shutdown > complete > > > >> > [StreamThread-1] > > > >> > Exception in thread "StreamThread-1" > > > >> > org.apache.kafka.common.errors.SerializationException: > > > >> > Error deserializing Avro message for id 4 > > > >> > Caused by: java.lang.NullPointerException > > > >> > at io.confluent.kafka.serializers. > > > AbstractKafkaAvroDeserializer > > > >> . > > > >> > deserialize(AbstractKafkaAvroDeserializer.java:120) > > > >> > at io.confluent.kafka.serializers. > > > AbstractKafkaAvroDeserializer > > > >> . > > > >> > deserialize(AbstractKafkaAvroDeserializer.java:92) > > > >> > > > > >> > I can confirm that schema registry URL is accessible and > > > >> url/schemas/ids/4 > > > >> > does return valid schema. > > > >> > May be some initialization didn't happen correctly? > > > >> > > > > >> > props.put(AbstractKafkaAvroSerDeConfig. > > > SCHEMA_REGISTRY_URL_CONFIG, > > > >> " > > > >> > 192.168.50.6:8081") > > > >> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > > >> > Serdes.Long.getClass.getName) > > > >> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[ > > > >> > GenericAvroSerdeWithSchemaRegistry]) > > > >> > > > > >> > GenericAvroSerdeWithSchemaRegistry code --> > > > https://www.dropbox.com/s/ > > > >> > y471k9nj94tlxro/avro_serde.txt?dl=0 > > > >> > > > > >> > Walter > > > >> > > > > >> > > > >> > > > >> > > > >> -- > > > >> -- Guozhang > > > >> > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang