Hi Kaniska,

I've replied to your mail on the Beam user mailing list.

Cheers,
Max

On Wed, Apr 27, 2016 at 4:56 PM, kaniska Mandal
<kaniska.man...@gmail.com> wrote:
> I am facing some issues while reading / writing Avro data.
>
> Attached here the corresponding files and avro-generated pojo.
>
> Any clues whats wrong here ?   May be missing some simple step !
>
> A) << producer >> BeamKafkaFlinkAvroProducerTest
>
>>> if I use  KafkaProducer directly (i.e. call produceSimpleData.. ) ,
>>> things are working fine   (just for testing )
>
>>> Using FlinkKafkaProducer as UnboundedSource  (this is what I should do)
>
> (i.e. I call produceAvroData2.. )  with the following steps ...
>
> 1) First, if I use >> AvroSerializationSchema schema = new
> AvroSerializationSchema(Test.class);
>
> i.e. essentially using Avro’s org.apache.avro.specific.SpecificDatumWriter ;
> I face following error >>
>
> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to
> org.apache.avro.generic.IndexedRecord
>
> at org.apache.avro.generic.GenericData.getField(GenericData.java:580)
>
> at org.apache.avro.generic.GenericData.getField(GenericData.java:595)
>
> at
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112)
>
> at
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
>
>
> 2) Next,  if I use TypeInformationSerializationSchema (irrespective of
> AvroCoder in Pipeline) , things apparently work fine
>
> .. as Kafka test consumer tool prints the message  >>
> java.lang.String{"uname": "Joe", "id": 6}
>
>
> B) <<Consumer>> ,  BeamKafkaFlinkAvroConsumerTest
>
>>> I understand we should either use TypeInformationSerializationSchema in
>>> both consumer and producer OR
>
> should use AvroDeserializationSchema and AvroSerializationSchema in Consumer
> and Producer respectively !!
>
> But, irrespective of using AvroDeserializationSchema or
> TypeInformationSerializationSchema, I get the following exception >>
>
> Exception in thread "main" java.lang.NullPointerException: null value in
> entry: V=null
>
> at
> com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33)
>
> at
> com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39)
>
> at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49)
>
> at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70)
>
> at
> org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221)
>
>
> Thanks
>
> Kaniska

Reply via email to