Hi Thomas, The usual way with Avro would be to generate a class from your schema [1]. Then PlaySession would already be a SpecificRecord and you would avoid the extra step.
I'm quite positive that the same way works with ParquetAvroWriters. Note that you would need to use ParquetAvroWriters#forSpecificRecord instead. You can also directly try ParquetAvroWriters#forReflectRecord(POJO.class), but it's much slower. Let me know if that approach doesn't work for you. [1] https://avro.apache.org/docs/1.10.2/gettingstartedjava.html On Wed, Jun 23, 2021 at 12:08 AM Thomas Wang <w...@datability.io> wrote: > Hi, > > I'm trying to tail a Kafka topic and copy the data to s3 as parquet files. > I'm using StreamingFileSink with ParquetAvroWriters. It works just fine. > However, it looks like I have to generate the Avro schema and convert my > POJO class to GenericRecord first (i.e. convert DataStream<POJO> to > DataStream<GenericRecord>). Here is my code: > > DataStream<POJO> input = ... // read from Kafka > DataStream<GenericRecord> output = > input.map(KafkaToS3::convertPlaySessionEventToGenericRecord).uid(operatorUid); > Schema schema = new Schema.Parser().parse(schemaStr); > StreamingFileSink<GenericRecord> streamingFileSink = StreamingFileSink > .forBulkFormat(new Path(filePath), > ParquetAvroWriters.forGenericRecord(schema)) > .build(); > output.addSink(streamingFileSink).uid(operatorUid); > > Following is the code for > KafkaToS3::convertPlaySessionEventToGenericRecord: > > private static GenericRecord > convertPlaySessionEventToGenericRecord(PlaySessionEvent playSessionEvent) > throws Exception { > Schema schema = PlaySession.getAvroSchema(); > GenericRecord record = new GenericData.Record(schema); > for (Schema.Field field : schema.getFields()) { > record.put(field.name(), KafkaToS3.getObjectField(field, > playSessionEvent)); > } > return record; > } > > Although the app works just fine, I feel the whole process is > unnecessarily convoluted. I would appreciate any guideline on this front. > Am I doing this job right? > > Thanks. > > Thomas >