Hi Walter,

I downloaded the 0.10.0 jar and verified that the configure() function is
auto-triggered when you get the serde classes from `context.keySerde /
valueSerde`, which is auto-triggered if you use the DSL. And your Scala
code is the same as to our examples code:

https://github.com/confluentinc/examples/blob/030343f9acbb9f73a13ab0f0fd31690dca97c606/kafka-streams/src/main/java/io/confluent/examples/streams/utils/GenericAvroSerializer.java


Which demo example were you running? And are there any other jars
co-located with the 0.10.0.0 jar that could cause another class be loaded?


Guozhang


On Thu, Sep 22, 2016 at 8:41 AM, Walter rakoff <walter.rak...@gmail.com>
wrote:

> Guozhang,
>
> I tried your suggestion. Below is the log from Serde, Serializer
> & Deserializer.
> Confirmed that KafkaAvroDeserializer.configure does get invoked.
>
> Line 379: 16/09/22 15:28:46 WARN GenericAvroSerdeWithSchemaRegistry: In
> > configure {num.standby.replicas=1, replication.factor=3,
> > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092,
> > schema.registry.url=http://10.200.184.41:8081,
> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> > zookeeper.connect=10.200.184.26:2181, value.serde=class
> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> > auto.offset.reset=earliest, num.stream.threads=1, client.id
> =KStreams-Test,
> > application.id=testing-app}
> > Line 380: 16/09/22 15:28:46 WARN GenericAvroSerializerWithSchem
> aRegistry:
> > In configure{num.standby.replicas=1, replication.factor=3,
> > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092,
> > schema.registry.url=http://10.200.184.41:8081,
> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> > zookeeper.connect=10.200.184.26:2181, value.serde=class
> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> > auto.offset.reset=earliest, num.stream.threads=1, client.id
> =KStreams-Test,
> > application.id=testing-app}
> > Line 385: 16/09/22 15:28:46 WARN
> > GenericAvroDeserializerWithSchemaRegistry: In
> > configure{num.standby.replicas=1, replication.factor=3,
> commit.interval.ms=125000,
> > bootstrap.servers=10.200.184.29:9092, schema.registry.url=
> > http://10.200.184.41:8081,
> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> > zookeeper.connect=10.200.184.26:2181, value.serde=class
> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> > auto.offset.reset=earliest, num.stream.threads=1, client.id
> =KStreams-Test,
> > application.id=testing-app}
>
>
> Still the same exception
>
> 16/09/22 15:28:47 INFO StreamThread: Stream thread shutdown complete
> > [StreamThread-1]
> > Exception in thread "StreamThread-1"
> > org.apache.kafka.common.errors.SerializationException: Error
> deserializing
> > Avro message for id 7
> > Caused by: java.lang.NullPointerException
> >         at
> > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.
> deserialize(AbstractKafkaAvroDeserializer.java:120)
> >         at
> > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.
> deserialize(AbstractKafkaAvroDeserializer.java:92)
>
>
>
> Walter
>
> On Mon, Sep 19, 2016 at 8:25 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Walter,
> >
> > The WARN log entry should not be the cause of this issue.
> >
> > I double checked the 0.10.0.0 release and this issue should not really
> > happen, so your observation is a bit weird to me. Could your add a log
> > entry in the `configure` function which constructs the registry client to
> > make sure it is indeed triggered when the streams app start up?
> >
> >
> > Guozhang
> >
> >
> >
> > On Fri, Sep 16, 2016 at 2:27 PM, Walter rakoff <walter.rak...@gmail.com>
> > wrote:
> >
> > > Guozhang,
> > >
> > > Any clues on this one?
> > >
> > > Walter
> > >
> > > On Wed, Sep 14, 2016 at 9:46 PM, Walter rakoff <
> walter.rak...@gmail.com>
> > > wrote:
> > >
> > > > Guozhang,
> > > >
> > > > I am using 0.10.0.0. Could the below log be the cause?
> > > >
> > > > 16/09/14 17:24:35 WARN ConsumerConfig: The configuration
> > > > schema.registry.url = http://192.168.50.6: <
> http://192.168.50.6:8081/>
> > > 8081
> > > > was supplied but isn't a known config.
> > > > 16/09/14 17:24:35 INFO AppInfoParser: Kafka version : 0.10.0.0
> > > > 16/09/14 17:24:35 INFO AppInfoParser: Kafka commitId :
> b8642491e78c5a13
> > > >
> > > > Walter
> > > >
> > > >
> > > > On Wed, Sep 14, 2016 at 8:11 PM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > >> Hello Walter,
> > > >>
> > > >> Which version of Kafka were you using?
> > > >>
> > > >> I ask this because there was a bug causing the serde passed through
> > > config
> > > >> to NOT being configured when constructed:
> > > >> https://issues.apache.org/jira/browse/KAFKA-3639
> > > >>
> > > >>
> > > >> Which is fixed in the 0.10.0.0 release, which means you will only
> hit
> > it
> > > >> if
> > > >> you are using the tech-preview release version.
> > > >>
> > > >>
> > > >> Guozhang
> > > >>
> > > >>
> > > >> On Wed, Sep 14, 2016 at 11:10 AM, Walter rakoff <
> > > walter.rak...@gmail.com>
> > > >> wrote:
> > > >>
> > > >> > Hello,
> > > >> >
> > > >> > I get the below exception when deserilaizing avro records
> > > >> > using KafkaAvroDeserializer.
> > > >> >
> > > >> > 16/09/14 17:24:39 INFO StreamThread: Stream thread shutdown
> complete
> > > >> > [StreamThread-1]
> > > >> > Exception in thread "StreamThread-1"
> > > >> > org.apache.kafka.common.errors.SerializationException:
> > > >> > Error deserializing Avro message for id 4
> > > >> > Caused by: java.lang.NullPointerException
> > > >> >         at io.confluent.kafka.serializers.
> > > AbstractKafkaAvroDeserializer
> > > >> .
> > > >> > deserialize(AbstractKafkaAvroDeserializer.java:120)
> > > >> >         at io.confluent.kafka.serializers.
> > > AbstractKafkaAvroDeserializer
> > > >> .
> > > >> > deserialize(AbstractKafkaAvroDeserializer.java:92)
> > > >> >
> > > >> > I can confirm that schema registry URL is accessible and
> > > >> url/schemas/ids/4
> > > >> > does return valid schema.
> > > >> > May be some initialization didn't happen correctly?
> > > >> >
> > > >> >     props.put(AbstractKafkaAvroSerDeConfig.
> > > SCHEMA_REGISTRY_URL_CONFIG,
> > > >> "
> > > >> > 192.168.50.6:8081")
> > > >> >     props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > > >> > Serdes.Long.getClass.getName)
> > > >> >     props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[
> > > >> > GenericAvroSerdeWithSchemaRegistry])
> > > >> >
> > > >> > GenericAvroSerdeWithSchemaRegistry code -->
> > > https://www.dropbox.com/s/
> > > >> > y471k9nj94tlxro/avro_serde.txt?dl=0
> > > >> >
> > > >> > Walter
> > > >> >
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> -- Guozhang
> > > >>
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to