Hey all,
I am trying to write a simple pipeline to read

Read Stringified JSON from Kinesis -> parsed to POJO -> converted to Avro
-> for the purpose of writing Parquet files to AWS S3.

1) This is my SimpleMapper

public class SimpleMapper extends RichMapFunction<String, GenericRecord> {
    private static final GsonBuilder gsonBuilder =
            new
GsonBuilder().excludeFieldsWithoutExposeAnnotation().setPrettyPrinting();

    private static final Gson gson = gsonBuilder.create();
    private static final Schema schema =
ReflectData.get().getSchema(Response.class);

    @Override
    public GenericRecord map(String s) throws Exception {

        Response response = gson.fromJson(s, Response.class);
        GenericData.Record record = new GenericData.Record(schema);
        record.put(0, response);

        return record;
    }

2) This is my Job Definition

public class ClickStreamPipeline implements Serializable {

    private static Schema schema = ReflectData.get().getSchema(Response.class);

    public static void main(String args[]) throws Exception {
        final MultipleParameterTool params =
MultipleParameterTool.fromArgs(args);
        StreamExecutionEnvironment env = getStreamExecutionEnvironment(params);


        FlinkKinesisConsumer<String> kinesisConsumer =
                new FlinkKinesisConsumer<>(
                        "web-clickstream", new SimpleStringSchema(),
getKafkaConsumerProperties());

        final StreamingFileSink<GenericRecord> streamingFileSink =
                StreamingFileSink.forBulkFormat(
                        new
Path("s3://data-ingestion-pipeline/flink_pipeline/"),
                        ParquetAvroWriters.forGenericRecord(schema))
                        .withRollingPolicy(OnCheckpointRollingPolicy.build())
                        .build();

        env.addSource(kinesisConsumer)
                .map(new SimpleMapper())
                .returns(new GenericRecordAvroTypeInfo(schema))
                .addSink(streamingFileSink);

        env.execute("Read files in streaming fashion");
    }

    private static StreamExecutionEnvironment getStreamExecutionEnvironment(
            MultipleParameterTool params) throws ClassNotFoundException {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);

        Class<?> unmodColl =
Class.forName("java.util.Collections$UnmodifiableCollection");
        env.getConfig()
                .addDefaultKryoSerializer(unmodColl,
UnmodifiableCollectionsSerializer.class);
        env.enableCheckpointing(60_000L);

        return env;
    }



The issue I am facing is failiing to serialize the Avro GenericRecord
wrapped message

   - When I used a GenericRecordAvroTypeInfo(schema); to force use my Avro
   as preferred Serializer , I am getting the error below


*              java.lang.ClassCastException: class <my fully qualified
POJO> cannot be cast to class org.apache.avro.generic.IndexedRecord*



   - If I don't use the GenericRecordAvroTypeInfo and try to register my
   pojo with KryoSerializer , the serialization fails with NPE somewhere in my
   Schema class.Do I need to implement/register a proper Avro serializer with
   flink config?

Thanks for the help!

Reply via email to