Hi, I'm trying to tail a Kafka topic and copy the data to s3 as parquet files. I'm using StreamingFileSink with ParquetAvroWriters. It works just fine. However, it looks like I have to generate the Avro schema and convert my POJO class to GenericRecord first (i.e. convert DataStream<POJO> to DataStream<GenericRecord>). Here is my code:
DataStream<POJO> input = ... // read from Kafka DataStream<GenericRecord> output = input.map(KafkaToS3::convertPlaySessionEventToGenericRecord).uid(operatorUid); Schema schema = new Schema.Parser().parse(schemaStr); StreamingFileSink<GenericRecord> streamingFileSink = StreamingFileSink .forBulkFormat(new Path(filePath), ParquetAvroWriters.forGenericRecord(schema)) .build(); output.addSink(streamingFileSink).uid(operatorUid); Following is the code for KafkaToS3::convertPlaySessionEventToGenericRecord: private static GenericRecord convertPlaySessionEventToGenericRecord(PlaySessionEvent playSessionEvent) throws Exception { Schema schema = PlaySession.getAvroSchema(); GenericRecord record = new GenericData.Record(schema); for (Schema.Field field : schema.getFields()) { record.put(field.name(), KafkaToS3.getObjectField(field, playSessionEvent)); } return record; } Although the app works just fine, I feel the whole process is unnecessarily convoluted. I would appreciate any guideline on this front. Am I doing this job right? Thanks. Thomas