I am facing some issues while reading / writing Avro data. Attached here the corresponding files and avro-generated pojo.
Any clues whats wrong here ? May be missing some simple step ! *A) << producer >> BeamKafkaFlinkAvroProducerTest * >> if I use KafkaProducer directly (i.e. call produceSimpleData.. ) , things are working fine (just for testing ) >> Using FlinkKafkaProducer as UnboundedSource (this is what I should do) (i.e. I call produceAvroData2.. ) with the following steps ... 1) First, if I use >> *AvroSerializationSchema* schema = new *AvroSerializationSchema*(Test.class); i.e. essentially using Avro’s org.apache.avro.specific.SpecificDatumWriter ; I face following error >> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.generic.IndexedRecord at org.apache.avro.generic.GenericData.getField(GenericData.java:580) at org.apache.avro.generic.GenericData.getField(GenericData.java:595) at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104) 2) Next, if I use *TypeInformationSerializationSchema* (irrespective of AvroCoder in Pipeline) , things apparently work fine .. as Kafka test consumer tool prints the message >> java.lang.String{"uname": "Joe", "id": 6} *B) <<Consumer>> , BeamKafkaFlinkAvroConsumerTest* >> I understand we should either use TypeInformationSerializationSchema in both consumer and producer OR should use AvroDeserializationSchema and AvroSerializationSchema in Consumer and Producer respectively !! But, irrespective of using AvroDeserializationSchema or TypeInformationSerializationSchema, I get the following exception >> Exception in thread "main" java.lang.NullPointerException: null value in entry: V=null at com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33) at com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39) at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49) at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70) at org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221) Thanks Kaniska
BeamKafkaFlinkAvroConsumerTest.java
Description: Binary data
BeamKafkaFlinkAvroProducerTest.java
Description: Binary data
User.java
Description: Binary data
Test.java
Description: Binary data