[ https://issues.apache.org/jira/browse/FLINK-35698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Weijie Guo updated FLINK-35698: ------------------------------- Fix Version/s: 2.0.0 (was: 1.20.0) (was: 1.19.2) > Parquet connector fails to load ROW<x decimal(5, 2)> after save > --------------------------------------------------------------- > > Key: FLINK-35698 > URL: https://issues.apache.org/jira/browse/FLINK-35698 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / API > Affects Versions: 1.17.0, 1.18.0, 1.19.0 > Reporter: Andrey Gaskov > Priority: Critical > Fix For: 2.0.0 > > > The bug could be reproduced by the following test added to > ParquetFileSystemITCase.java: > {code:java} > @TestTemplate > void testRowColumnType() throws IOException, ExecutionException, > InterruptedException { > String path = > TempDirUtils.newFolder(super.fileTempFolder()).toURI().getPath(); > super.tableEnv() > .executeSql( > "create table t_in(" > + "grp ROW<x decimal(5, 2)>" > + ") with (" > + "'connector' = 'datagen'," > + "'number-of-rows' = '10'" > + ")"); > super.tableEnv() > .executeSql( > "create table t_out(" > + "grp ROW<x decimal(5, 2)>" > + ") with (" > + "'connector' = 'filesystem'," > + "'path' = '" > + path > + "'," > + "'format' = 'parquet'" > + ")"); > super.tableEnv().executeSql("insert into t_out select * from > t_in").await(); > List<Row> rows = > CollectionUtil.iteratorToList( > super.tableEnv().executeSql("select * from t_out limit > 10").collect()); > assertThat(rows).hasSize(10); > } {code} > It fails with this root exception after hanging for 40 seconds: > {code:java} > Caused by: java.lang.ClassCastException: > org.apache.flink.table.data.columnar.vector.heap.HeapIntVector cannot be cast > to org.apache.flink.table.data.columnar.vector.DecimalColumnVector > at > org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch.getDecimal(VectorizedColumnBatch.java:118) > at > org.apache.flink.table.data.columnar.ColumnarRowData.getDecimal(ColumnarRowData.java:128) > at > org.apache.flink.table.data.RowData.lambda$createFieldGetter$89bd9445$1(RowData.java:233) > at > org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:207) > at > org.apache.flink.table.data.writer.AbstractBinaryWriter.writeRow(AbstractBinaryWriter.java:147) > at > org.apache.flink.table.data.writer.BinaryRowWriter.writeRow(BinaryRowWriter.java:27) > at > org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:155) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:173) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:44) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:152) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:108) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:140) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collectAndCheckIfChained(RecordWriterOutput.java:120) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:101) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:53) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:60) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:32) > at > org.apache.flink.table.runtime.operators.sort.LimitOperator.processElement(LimitOperator.java:47) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:109) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:78) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:40) > at > org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310) > at > org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:103) > at > org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:97) > at > org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45) > at > org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422) > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > at java.lang.Thread.run(Thread.java:750) {code} > If "grp ROW<x decimal(5, 2)>" is changed to "grp ROW<x int>", the test runs > successuflly in few seconds. > -- This message was sent by Atlassian Jira (v8.20.10#820010)