I am facing some issues while reading / writing Avro data.

Attached here the corresponding files and avro-generated pojo.

Any clues whats wrong here ?   May be missing some simple step !

*A) << producer >> BeamKafkaFlinkAvroProducerTest *

>> if I use  KafkaProducer directly (i.e. call produceSimpleData.. ) ,
things are working fine   (just for testing )

>> Using FlinkKafkaProducer as UnboundedSource  (this is what I should do)

(i.e. I call produceAvroData2.. )  with the following steps ...

1) First, if I use >> *AvroSerializationSchema* schema = new
*AvroSerializationSchema*(Test.class);

i.e. essentially using Avro’s org.apache.avro.specific.SpecificDatumWriter
; I face following error >>

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.avro.generic.IndexedRecord

at org.apache.avro.generic.GenericData.getField(GenericData.java:580)

at org.apache.avro.generic.GenericData.getField(GenericData.java:595)

at
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112)

at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)


2) Next,  if I use *TypeInformationSerializationSchema* (irrespective of
AvroCoder in Pipeline) , things apparently work fine

.. as Kafka test consumer tool prints the message  >>
java.lang.String{"uname": "Joe", "id": 6}


*B) <<Consumer>> ,  BeamKafkaFlinkAvroConsumerTest*

>> I understand we should either use TypeInformationSerializationSchema in
both consumer and producer OR

should use AvroDeserializationSchema and AvroSerializationSchema in
Consumer and Producer respectively !!

But, irrespective of using AvroDeserializationSchema
or TypeInformationSerializationSchema, I get the following exception >>

Exception in thread "main" java.lang.NullPointerException: null value in
entry: V=null

at
com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33)

at
com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39)

at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49)

at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70)

at
org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221)


Thanks

Kaniska

Attachment: BeamKafkaFlinkAvroConsumerTest.java
Description: Binary data

Attachment: BeamKafkaFlinkAvroProducerTest.java
Description: Binary data

Attachment: User.java
Description: Binary data

Attachment: Test.java
Description: Binary data

Reply via email to