Hi,

I'm trying to tail a Kafka topic and copy the data to s3 as parquet files.
I'm using StreamingFileSink with ParquetAvroWriters. It works just fine.
However, it looks like I have to generate the Avro schema and convert my
POJO class to GenericRecord first (i.e. convert DataStream<POJO> to
DataStream<GenericRecord>). Here is my code:

DataStream<POJO> input = ... // read from Kafka
DataStream<GenericRecord> output =
input.map(KafkaToS3::convertPlaySessionEventToGenericRecord).uid(operatorUid);
Schema schema = new Schema.Parser().parse(schemaStr);
StreamingFileSink<GenericRecord> streamingFileSink = StreamingFileSink
    .forBulkFormat(new Path(filePath),
ParquetAvroWriters.forGenericRecord(schema))
    .build();
output.addSink(streamingFileSink).uid(operatorUid);

Following is the code for KafkaToS3::convertPlaySessionEventToGenericRecord:

private static GenericRecord
convertPlaySessionEventToGenericRecord(PlaySessionEvent playSessionEvent)
throws Exception {
    Schema schema = PlaySession.getAvroSchema();
    GenericRecord record = new GenericData.Record(schema);
    for (Schema.Field field : schema.getFields()) {
        record.put(field.name(), KafkaToS3.getObjectField(field,
playSessionEvent));
    }
    return record;
}

Although the app works just fine, I feel the whole process is unnecessarily
convoluted. I would appreciate any guideline on this front. Am I doing this
job right?

Thanks.

Thomas

Reply via email to