Ah, that was it. I was passing the same Serde while creating the topology. It works after I removed it.
Thanks! Walter On Mon, Sep 26, 2016 at 1:16 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hi Walter, > > One thing I can think of is that, if you pass the serde object as part of > your topology definition, instead of passing the serde class in the config, > then these serde objects will not be auto configured and hence for your > case the schema registry client will not be constructed and initialized. > > https://issues.apache.org/jira/browse/KAFKA-3729 > > So in case your application's topology does overwrite serdes with direct > serde object passing, you need to configure them manually for now. > > > Guozhang > > On Thu, Sep 22, 2016 at 5:36 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hi Walter, > > > > I downloaded the 0.10.0 jar and verified that the configure() function is > > auto-triggered when you get the serde classes from `context.keySerde / > > valueSerde`, which is auto-triggered if you use the DSL. And your Scala > > code is the same as to our examples code: > > > > https://github.com/confluentinc/examples/blob/ > > 030343f9acbb9f73a13ab0f0fd31690dca97c606/kafka-streams/src/ > > main/java/io/confluent/examples/streams/utils/GenericAvroSerializer.java > > > > > > Which demo example were you running? And are there any other jars > > co-located with the 0.10.0.0 jar that could cause another class be > loaded? > > > > > > Guozhang > > > > > > On Thu, Sep 22, 2016 at 8:41 AM, Walter rakoff <walter.rak...@gmail.com> > > wrote: > > > >> Guozhang, > >> > >> I tried your suggestion. Below is the log from Serde, Serializer > >> & Deserializer. > >> Confirmed that KafkaAvroDeserializer.configure does get invoked. > >> > >> Line 379: 16/09/22 15:28:46 WARN GenericAvroSerdeWithSchemaRegistry: In > >> > configure {num.standby.replicas=1, replication.factor=3, > >> > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092, > >> > schema.registry.url=http://10.200.184.41:8081, > >> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde, > >> > zookeeper.connect=10.200.184.26:2181, value.serde=class > >> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry, > >> > auto.offset.reset=earliest, num.stream.threads=1, client.id > >> =KStreams-Test, > >> > application.id=testing-app} > >> > Line 380: 16/09/22 15:28:46 WARN GenericAvroSerializerWithSchem > >> aRegistry: > >> > In configure{num.standby.replicas=1, replication.factor=3, > >> > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092, > >> > schema.registry.url=http://10.200.184.41:8081, > >> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde, > >> > zookeeper.connect=10.200.184.26:2181, value.serde=class > >> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry, > >> > auto.offset.reset=earliest, num.stream.threads=1, client.id > >> =KStreams-Test, > >> > application.id=testing-app} > >> > Line 385: 16/09/22 15:28:46 WARN > >> > GenericAvroDeserializerWithSchemaRegistry: In > >> > configure{num.standby.replicas=1, replication.factor=3, > >> commit.interval.ms=125000, > >> > bootstrap.servers=10.200.184.29:9092, schema.registry.url= > >> > http://10.200.184.41:8081, > >> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde, > >> > zookeeper.connect=10.200.184.26:2181, value.serde=class > >> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry, > >> > auto.offset.reset=earliest, num.stream.threads=1, client.id > >> =KStreams-Test, > >> > application.id=testing-app} > >> > >> > >> Still the same exception > >> > >> 16/09/22 15:28:47 INFO StreamThread: Stream thread shutdown complete > >> > [StreamThread-1] > >> > Exception in thread "StreamThread-1" > >> > org.apache.kafka.common.errors.SerializationException: Error > >> deserializing > >> > Avro message for id 7 > >> > Caused by: java.lang.NullPointerException > >> > at > >> > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer > >> .deserialize(AbstractKafkaAvroDeserializer.java:120) > >> > at > >> > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer > >> .deserialize(AbstractKafkaAvroDeserializer.java:92) > >> > >> > >> > >> Walter > >> > >> On Mon, Sep 19, 2016 at 8:25 PM, Guozhang Wang <wangg...@gmail.com> > >> wrote: > >> > >> > Hello Walter, > >> > > >> > The WARN log entry should not be the cause of this issue. > >> > > >> > I double checked the 0.10.0.0 release and this issue should not really > >> > happen, so your observation is a bit weird to me. Could your add a log > >> > entry in the `configure` function which constructs the registry client > >> to > >> > make sure it is indeed triggered when the streams app start up? > >> > > >> > > >> > Guozhang > >> > > >> > > >> > > >> > On Fri, Sep 16, 2016 at 2:27 PM, Walter rakoff < > walter.rak...@gmail.com > >> > > >> > wrote: > >> > > >> > > Guozhang, > >> > > > >> > > Any clues on this one? > >> > > > >> > > Walter > >> > > > >> > > On Wed, Sep 14, 2016 at 9:46 PM, Walter rakoff < > >> walter.rak...@gmail.com> > >> > > wrote: > >> > > > >> > > > Guozhang, > >> > > > > >> > > > I am using 0.10.0.0. Could the below log be the cause? > >> > > > > >> > > > 16/09/14 17:24:35 WARN ConsumerConfig: The configuration > >> > > > schema.registry.url = http://192.168.50.6: < > >> http://192.168.50.6:8081/> > >> > > 8081 > >> > > > was supplied but isn't a known config. > >> > > > 16/09/14 17:24:35 INFO AppInfoParser: Kafka version : 0.10.0.0 > >> > > > 16/09/14 17:24:35 INFO AppInfoParser: Kafka commitId : > >> b8642491e78c5a13 > >> > > > > >> > > > Walter > >> > > > > >> > > > > >> > > > On Wed, Sep 14, 2016 at 8:11 PM, Guozhang Wang < > wangg...@gmail.com> > >> > > wrote: > >> > > > > >> > > >> Hello Walter, > >> > > >> > >> > > >> Which version of Kafka were you using? > >> > > >> > >> > > >> I ask this because there was a bug causing the serde passed > through > >> > > config > >> > > >> to NOT being configured when constructed: > >> > > >> https://issues.apache.org/jira/browse/KAFKA-3639 > >> > > >> > >> > > >> > >> > > >> Which is fixed in the 0.10.0.0 release, which means you will only > >> hit > >> > it > >> > > >> if > >> > > >> you are using the tech-preview release version. > >> > > >> > >> > > >> > >> > > >> Guozhang > >> > > >> > >> > > >> > >> > > >> On Wed, Sep 14, 2016 at 11:10 AM, Walter rakoff < > >> > > walter.rak...@gmail.com> > >> > > >> wrote: > >> > > >> > >> > > >> > Hello, > >> > > >> > > >> > > >> > I get the below exception when deserilaizing avro records > >> > > >> > using KafkaAvroDeserializer. > >> > > >> > > >> > > >> > 16/09/14 17:24:39 INFO StreamThread: Stream thread shutdown > >> complete > >> > > >> > [StreamThread-1] > >> > > >> > Exception in thread "StreamThread-1" > >> > > >> > org.apache.kafka.common.errors.SerializationException: > >> > > >> > Error deserializing Avro message for id 4 > >> > > >> > Caused by: java.lang.NullPointerException > >> > > >> > at io.confluent.kafka.serializers. > >> > > AbstractKafkaAvroDeserializer > >> > > >> . > >> > > >> > deserialize(AbstractKafkaAvroDeserializer.java:120) > >> > > >> > at io.confluent.kafka.serializers. > >> > > AbstractKafkaAvroDeserializer > >> > > >> . > >> > > >> > deserialize(AbstractKafkaAvroDeserializer.java:92) > >> > > >> > > >> > > >> > I can confirm that schema registry URL is accessible and > >> > > >> url/schemas/ids/4 > >> > > >> > does return valid schema. > >> > > >> > May be some initialization didn't happen correctly? > >> > > >> > > >> > > >> > props.put(AbstractKafkaAvroSerDeConfig. > >> > > SCHEMA_REGISTRY_URL_CONFIG, > >> > > >> " > >> > > >> > 192.168.50.6:8081") > >> > > >> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > >> > > >> > Serdes.Long.getClass.getName) > >> > > >> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[ > >> > > >> > GenericAvroSerdeWithSchemaRegistry]) > >> > > >> > > >> > > >> > GenericAvroSerdeWithSchemaRegistry code --> > >> > > https://www.dropbox.com/s/ > >> > > >> > y471k9nj94tlxro/avro_serde.txt?dl=0 > >> > > >> > > >> > > >> > Walter > >> > > >> > > >> > > >> > >> > > >> > >> > > >> > >> > > >> -- > >> > > >> -- Guozhang > >> > > >> > >> > > > > >> > > > > >> > > > >> > > >> > > >> > > >> > -- > >> > -- Guozhang > >> > > >> > > > > > > > > -- > > -- Guozhang > > > > > > -- > -- Guozhang >