Hi all,

I need to write a DataStream<POJO> with a java.util.Instant field to
parquet files in S3. I couldn't find any straightforward way to do that, so
I changed that POJO class to Avro SpecificRecord (I followed this example
https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/blob/main/java/S3ParquetSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java
).
My avdl file is as follows:

record MTEvent {
  @logicalType("timestamp-micros")
  long eventTimestamp;

  string sessionId;
  ...
}


When I ran the job, I got the following error, with full stacktrace further
below..
java.lang.ClassCastException: class java.time.Instant cannot be cast to
class java.lang.Number (java.time.Instant and java.lang.Number are in
module java.base of loader 'bootstrap')

It seemed the SpecificData instance had an empty `conversions` and didn't
know how to serialize that field `eventTimestamp`.

I couldn't find any mention of the same error, but at the same time I
couldn't find any working example for writing Timestamp to parquet.

Could someone please help?

Thank you very much.


```
Caused by: java.lang.ClassCastException: class java.time.Instant cannot be
cast to class java.lang.Number (java.time.Instant and java.lang.Number are
in module java.base of loader 'bootstrap')
at
org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:345)
at
org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:282)
at
org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:202)
at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:178)
at
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:152)
at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:425)
at
org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
at
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:51)
at
org.apache.flink.connector.file.sink.writer.FileWriterBucket.write(FileWriterBucket.java:191)
at
org.apache.flink.connector.file.sink.writer.FileWriter.write(FileWriter.java:198)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:160)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:90)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:40)
at
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:83)
at
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:34)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:52)
```

Reply via email to