Hey all, I am trying to write a simple pipeline to read Read Stringified JSON from Kinesis -> parsed to POJO -> converted to Avro -> for the purpose of writing Parquet files to AWS S3.
1) This is my SimpleMapper public class SimpleMapper extends RichMapFunction<String, GenericRecord> { private static final GsonBuilder gsonBuilder = new GsonBuilder().excludeFieldsWithoutExposeAnnotation().setPrettyPrinting(); private static final Gson gson = gsonBuilder.create(); private static final Schema schema = ReflectData.get().getSchema(Response.class); @Override public GenericRecord map(String s) throws Exception { Response response = gson.fromJson(s, Response.class); GenericData.Record record = new GenericData.Record(schema); record.put(0, response); return record; } 2) This is my Job Definition public class ClickStreamPipeline implements Serializable { private static Schema schema = ReflectData.get().getSchema(Response.class); public static void main(String args[]) throws Exception { final MultipleParameterTool params = MultipleParameterTool.fromArgs(args); StreamExecutionEnvironment env = getStreamExecutionEnvironment(params); FlinkKinesisConsumer<String> kinesisConsumer = new FlinkKinesisConsumer<>( "web-clickstream", new SimpleStringSchema(), getKafkaConsumerProperties()); final StreamingFileSink<GenericRecord> streamingFileSink = StreamingFileSink.forBulkFormat( new Path("s3://data-ingestion-pipeline/flink_pipeline/"), ParquetAvroWriters.forGenericRecord(schema)) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .build(); env.addSource(kinesisConsumer) .map(new SimpleMapper()) .returns(new GenericRecordAvroTypeInfo(schema)) .addSink(streamingFileSink); env.execute("Read files in streaming fashion"); } private static StreamExecutionEnvironment getStreamExecutionEnvironment( MultipleParameterTool params) throws ClassNotFoundException { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection"); env.getConfig() .addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class); env.enableCheckpointing(60_000L); return env; } The issue I am facing is failiing to serialize the Avro GenericRecord wrapped message - When I used a GenericRecordAvroTypeInfo(schema); to force use my Avro as preferred Serializer , I am getting the error below * java.lang.ClassCastException: class <my fully qualified POJO> cannot be cast to class org.apache.avro.generic.IndexedRecord* - If I don't use the GenericRecordAvroTypeInfo and try to register my pojo with KryoSerializer , the serialization fails with NPE somewhere in my Schema class.Do I need to implement/register a proper Avro serializer with flink config? Thanks for the help!