Hi all, I need to write a DataStream<POJO> with a java.util.Instant field to parquet files in S3. I couldn't find any straightforward way to do that, so I changed that POJO class to Avro SpecificRecord (I followed this example https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/blob/main/java/S3ParquetSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java ). My avdl file is as follows:
record MTEvent { @logicalType("timestamp-micros") long eventTimestamp; string sessionId; ... } When I ran the job, I got the following error, with full stacktrace further below.. java.lang.ClassCastException: class java.time.Instant cannot be cast to class java.lang.Number (java.time.Instant and java.lang.Number are in module java.base of loader 'bootstrap') It seemed the SpecificData instance had an empty `conversions` and didn't know how to serialize that field `eventTimestamp`. I couldn't find any mention of the same error, but at the same time I couldn't find any working example for writing Timestamp to parquet. Could someone please help? Thank you very much. ``` Caused by: java.lang.ClassCastException: class java.time.Instant cannot be cast to class java.lang.Number (java.time.Instant and java.lang.Number are in module java.base of loader 'bootstrap') at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:345) at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:282) at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:202) at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:178) at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:152) at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:425) at org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52) at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:51) at org.apache.flink.connector.file.sink.writer.FileWriterBucket.write(FileWriterBucket.java:191) at org.apache.flink.connector.file.sink.writer.FileWriter.write(FileWriter.java:198) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:160) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:90) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:40) at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:83) at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:34) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:52) ```