Ah, that was it. I was passing the same Serde while creating the topology.
It works after I removed it.

Thanks!

Walter

On Mon, Sep 26, 2016 at 1:16 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Walter,
>
> One thing I can think of is that, if you pass the serde object as part of
> your topology definition, instead of passing the serde class in the config,
> then these serde objects will not be auto configured and hence for your
> case the schema registry client will not be constructed and initialized.
>
> https://issues.apache.org/jira/browse/KAFKA-3729
>
> So in case your application's topology does overwrite serdes with direct
> serde object passing, you need to configure them manually for now.
>
>
> Guozhang
>
> On Thu, Sep 22, 2016 at 5:36 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Walter,
> >
> > I downloaded the 0.10.0 jar and verified that the configure() function is
> > auto-triggered when you get the serde classes from `context.keySerde /
> > valueSerde`, which is auto-triggered if you use the DSL. And your Scala
> > code is the same as to our examples code:
> >
> > https://github.com/confluentinc/examples/blob/
> > 030343f9acbb9f73a13ab0f0fd31690dca97c606/kafka-streams/src/
> > main/java/io/confluent/examples/streams/utils/GenericAvroSerializer.java
> >
> >
> > Which demo example were you running? And are there any other jars
> > co-located with the 0.10.0.0 jar that could cause another class be
> loaded?
> >
> >
> > Guozhang
> >
> >
> > On Thu, Sep 22, 2016 at 8:41 AM, Walter rakoff <walter.rak...@gmail.com>
> > wrote:
> >
> >> Guozhang,
> >>
> >> I tried your suggestion. Below is the log from Serde, Serializer
> >> & Deserializer.
> >> Confirmed that KafkaAvroDeserializer.configure does get invoked.
> >>
> >> Line 379: 16/09/22 15:28:46 WARN GenericAvroSerdeWithSchemaRegistry: In
> >> > configure {num.standby.replicas=1, replication.factor=3,
> >> > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092,
> >> > schema.registry.url=http://10.200.184.41:8081,
> >> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> >> > zookeeper.connect=10.200.184.26:2181, value.serde=class
> >> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> >> > auto.offset.reset=earliest, num.stream.threads=1, client.id
> >> =KStreams-Test,
> >> > application.id=testing-app}
> >> > Line 380: 16/09/22 15:28:46 WARN GenericAvroSerializerWithSchem
> >> aRegistry:
> >> > In configure{num.standby.replicas=1, replication.factor=3,
> >> > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092,
> >> > schema.registry.url=http://10.200.184.41:8081,
> >> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> >> > zookeeper.connect=10.200.184.26:2181, value.serde=class
> >> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> >> > auto.offset.reset=earliest, num.stream.threads=1, client.id
> >> =KStreams-Test,
> >> > application.id=testing-app}
> >> > Line 385: 16/09/22 15:28:46 WARN
> >> > GenericAvroDeserializerWithSchemaRegistry: In
> >> > configure{num.standby.replicas=1, replication.factor=3,
> >> commit.interval.ms=125000,
> >> > bootstrap.servers=10.200.184.29:9092, schema.registry.url=
> >> > http://10.200.184.41:8081,
> >> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> >> > zookeeper.connect=10.200.184.26:2181, value.serde=class
> >> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> >> > auto.offset.reset=earliest, num.stream.threads=1, client.id
> >> =KStreams-Test,
> >> > application.id=testing-app}
> >>
> >>
> >> Still the same exception
> >>
> >> 16/09/22 15:28:47 INFO StreamThread: Stream thread shutdown complete
> >> > [StreamThread-1]
> >> > Exception in thread "StreamThread-1"
> >> > org.apache.kafka.common.errors.SerializationException: Error
> >> deserializing
> >> > Avro message for id 7
> >> > Caused by: java.lang.NullPointerException
> >> >         at
> >> > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
> >> .deserialize(AbstractKafkaAvroDeserializer.java:120)
> >> >         at
> >> > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
> >> .deserialize(AbstractKafkaAvroDeserializer.java:92)
> >>
> >>
> >>
> >> Walter
> >>
> >> On Mon, Sep 19, 2016 at 8:25 PM, Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >>
> >> > Hello Walter,
> >> >
> >> > The WARN log entry should not be the cause of this issue.
> >> >
> >> > I double checked the 0.10.0.0 release and this issue should not really
> >> > happen, so your observation is a bit weird to me. Could your add a log
> >> > entry in the `configure` function which constructs the registry client
> >> to
> >> > make sure it is indeed triggered when the streams app start up?
> >> >
> >> >
> >> > Guozhang
> >> >
> >> >
> >> >
> >> > On Fri, Sep 16, 2016 at 2:27 PM, Walter rakoff <
> walter.rak...@gmail.com
> >> >
> >> > wrote:
> >> >
> >> > > Guozhang,
> >> > >
> >> > > Any clues on this one?
> >> > >
> >> > > Walter
> >> > >
> >> > > On Wed, Sep 14, 2016 at 9:46 PM, Walter rakoff <
> >> walter.rak...@gmail.com>
> >> > > wrote:
> >> > >
> >> > > > Guozhang,
> >> > > >
> >> > > > I am using 0.10.0.0. Could the below log be the cause?
> >> > > >
> >> > > > 16/09/14 17:24:35 WARN ConsumerConfig: The configuration
> >> > > > schema.registry.url = http://192.168.50.6: <
> >> http://192.168.50.6:8081/>
> >> > > 8081
> >> > > > was supplied but isn't a known config.
> >> > > > 16/09/14 17:24:35 INFO AppInfoParser: Kafka version : 0.10.0.0
> >> > > > 16/09/14 17:24:35 INFO AppInfoParser: Kafka commitId :
> >> b8642491e78c5a13
> >> > > >
> >> > > > Walter
> >> > > >
> >> > > >
> >> > > > On Wed, Sep 14, 2016 at 8:11 PM, Guozhang Wang <
> wangg...@gmail.com>
> >> > > wrote:
> >> > > >
> >> > > >> Hello Walter,
> >> > > >>
> >> > > >> Which version of Kafka were you using?
> >> > > >>
> >> > > >> I ask this because there was a bug causing the serde passed
> through
> >> > > config
> >> > > >> to NOT being configured when constructed:
> >> > > >> https://issues.apache.org/jira/browse/KAFKA-3639
> >> > > >>
> >> > > >>
> >> > > >> Which is fixed in the 0.10.0.0 release, which means you will only
> >> hit
> >> > it
> >> > > >> if
> >> > > >> you are using the tech-preview release version.
> >> > > >>
> >> > > >>
> >> > > >> Guozhang
> >> > > >>
> >> > > >>
> >> > > >> On Wed, Sep 14, 2016 at 11:10 AM, Walter rakoff <
> >> > > walter.rak...@gmail.com>
> >> > > >> wrote:
> >> > > >>
> >> > > >> > Hello,
> >> > > >> >
> >> > > >> > I get the below exception when deserilaizing avro records
> >> > > >> > using KafkaAvroDeserializer.
> >> > > >> >
> >> > > >> > 16/09/14 17:24:39 INFO StreamThread: Stream thread shutdown
> >> complete
> >> > > >> > [StreamThread-1]
> >> > > >> > Exception in thread "StreamThread-1"
> >> > > >> > org.apache.kafka.common.errors.SerializationException:
> >> > > >> > Error deserializing Avro message for id 4
> >> > > >> > Caused by: java.lang.NullPointerException
> >> > > >> >         at io.confluent.kafka.serializers.
> >> > > AbstractKafkaAvroDeserializer
> >> > > >> .
> >> > > >> > deserialize(AbstractKafkaAvroDeserializer.java:120)
> >> > > >> >         at io.confluent.kafka.serializers.
> >> > > AbstractKafkaAvroDeserializer
> >> > > >> .
> >> > > >> > deserialize(AbstractKafkaAvroDeserializer.java:92)
> >> > > >> >
> >> > > >> > I can confirm that schema registry URL is accessible and
> >> > > >> url/schemas/ids/4
> >> > > >> > does return valid schema.
> >> > > >> > May be some initialization didn't happen correctly?
> >> > > >> >
> >> > > >> >     props.put(AbstractKafkaAvroSerDeConfig.
> >> > > SCHEMA_REGISTRY_URL_CONFIG,
> >> > > >> "
> >> > > >> > 192.168.50.6:8081")
> >> > > >> >     props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >> > > >> > Serdes.Long.getClass.getName)
> >> > > >> >     props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[
> >> > > >> > GenericAvroSerdeWithSchemaRegistry])
> >> > > >> >
> >> > > >> > GenericAvroSerdeWithSchemaRegistry code -->
> >> > > https://www.dropbox.com/s/
> >> > > >> > y471k9nj94tlxro/avro_serde.txt?dl=0
> >> > > >> >
> >> > > >> > Walter
> >> > > >> >
> >> > > >>
> >> > > >>
> >> > > >>
> >> > > >> --
> >> > > >> -- Guozhang
> >> > > >>
> >> > > >
> >> > > >
> >> > >
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >> >
> >>
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to