Ahmed Elhassany created FLINK-33039:
---------------------------------------

             Summary: Avro Specific Record Logical timestamp is not serialized 
in Parquet
                 Key: FLINK-33039
                 URL: https://issues.apache.org/jira/browse/FLINK-33039
             Project: Flink
          Issue Type: Bug
          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
    Affects Versions: 1.17.1
            Reporter: Ahmed Elhassany


I'm trying to save a SpecificRecord to S3 Parquet, which contains a field with 
a logical timestmap. It's defined as
{code:java}
{
  "name": "ts",
  "type": {
    "type": "long",
    "logicalType": "timestamp-millis"
  }
}
  {code}
And I'm using the following method to save it
{code:java}
final FileSink<MyObj> sinkFlowAggregationAvro =
    FileSink.forBulkFormat(path, 
AvroParquetWriters.forSpecificRecord(MyObj.class))
        .withOutputFileConfig(OutputFileConfig
            .builder()
            .withPartSuffix(".parquet")
            .build())
        .build(); {code}
 

However, I'm getting the following casting errors:

 
{noformat}
flink-taskmanager-b467cbff9-n28zp taskmanager 
2023-09-05T16:10:02.124425478+02:00 Caused by: java.lang.ClassCastException: 
class java.time.Instant cannot be cast to class java.lang.Number 
(java.time.Instant and java.lang.Number are in module java.base of loader 
'bootstrap')
flink-taskmanager-b467cbff9-n28zp taskmanager 
2023-09-05T16:10:02.124425478+02:00     at 
org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:340)
 
~[blob_p-22acff48719adf70603f57842bd158d7f5538a47-e40c3e350efab078d53261fe2bc38640:?]
flink-taskmanager-b467cbff9-wt8p9 taskmanager 
2023-09-05T16:10:01.868385407+02:00     at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-dist-1.17.1.jar:1.17.1]
flink-taskmanager-b467cbff9-n28zp taskmanager 
2023-09-05T16:10:02.124425478+02:00     at 
org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:288) 
~[blob_p-22acff48719adf70603f57842bd158d7f5538a47-e40c3e350efab078d53261fe2bc38640:?]
flink-taskmanager-b467cbff9-wt8p9 taskmanager 
2023-09-05T16:10:01.868385407+02:00     at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
 ~[flink-dist-1.17.1.jar:1.17.1]
flink-taskmanager-b467cbff9-wt8p9 taskmanager 
2023-09-05T16:10:01.868385407+02:00     at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
 ~[flink-dist-1.17.1.jar:1.17.1]
flink-taskmanager-b467cbff9-wt8p9 taskmanager 
2023-09-05T16:10:01.868385407+02:00     ... 21 more
flink-taskmanager-b467cbff9-m5gdt taskmanager 
2023-09-05T16:10:01.979428558+02:00     at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-dist-1.17.1.jar:1.17.1]
flink-taskmanager-b467cbff9-wt8p9 taskmanager 
2023-09-05T16:10:01.868385407+02:00 Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
flink-taskmanager-b467cbff9-2xn5w taskmanager 
2023-09-05T16:10:01.871644827+02:00     at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:144)
 ~[flink-connector-base-1.17.1.jar:1.17.1]
flink-taskmanager-b467cbff9-m5gdt taskmanager 
2023-09-05T16:10:01.979428558+02:00     at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
 ~[flink-dist-1.17.1.jar:1.17.1]
flink-taskmanager-b467cbff9-pqjqr taskmanager 
2023-09-05T16:10:02.276107852+02:00     ... 21 more
flink-taskmanager-b467cbff9-m5gdt taskmanager 
2023-09-05T16:10:01.979428558+02:00     at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput^Cflink-taskmanager-b467cbff9-m5gdt
 taskmanager 2023-09-05T16:10:01.979428558+02:00     ... 21 more{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to