Andrey Gaskov created FLINK-35698:
-------------------------------------
Summary: Parquet connector fails to load ROW<x decimal(5, 2)>
after save
Key: FLINK-35698
URL: https://issues.apache.org/jira/browse/FLINK-35698
Project: Flink
Issue Type: Bug
Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table
SQL / API
Affects Versions: 1.19.0, 1.18.0, 1.17.0
Reporter: Andrey Gaskov
The bug could be reproduced by the following test added to
ParquetFileSystemITCase.java:
{code:java}
@TestTemplate
void testRowColumnType() throws IOException, ExecutionException,
InterruptedException {
String path =
TempDirUtils.newFolder(super.fileTempFolder()).toURI().getPath();
super.tableEnv()
.executeSql(
"create table t_in("
+ "grp ROW<x decimal(5, 2)>"
+ ") with ("
+ "'connector' = 'datagen',"
+ "'number-of-rows' = '10'"
+ ")");
super.tableEnv()
.executeSql(
"create table t_out("
+ "grp ROW<x decimal(5, 2)>"
+ ") with ("
+ "'connector' = 'filesystem',"
+ "'path' = '"
+ path
+ "',"
+ "'format' = 'parquet'"
+ ")");
super.tableEnv().executeSql("insert into t_out select * from t_in").await();
List<Row> rows =
CollectionUtil.iteratorToList(
super.tableEnv().executeSql("select * from t_out limit
10").collect());
assertThat(rows).hasSize(10);
} {code}
It fails with this root exception after hanging for 40 seconds:
{code:java}
Caused by: java.lang.ClassCastException:
org.apache.flink.table.data.columnar.vector.heap.HeapIntVector cannot be cast
to org.apache.flink.table.data.columnar.vector.DecimalColumnVector
at
org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch.getDecimal(VectorizedColumnBatch.java:118)
at
org.apache.flink.table.data.columnar.ColumnarRowData.getDecimal(ColumnarRowData.java:128)
at
org.apache.flink.table.data.RowData.lambda$createFieldGetter$89bd9445$1(RowData.java:233)
at
org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:207)
at
org.apache.flink.table.data.writer.AbstractBinaryWriter.writeRow(AbstractBinaryWriter.java:147)
at
org.apache.flink.table.data.writer.BinaryRowWriter.writeRow(BinaryRowWriter.java:27)
at
org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:155)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:173)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:44)
at
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:152)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:108)
at
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:140)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collectAndCheckIfChained(RecordWriterOutput.java:120)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:101)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:53)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:60)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:32)
at
org.apache.flink.table.runtime.operators.sort.LimitOperator.processElement(LimitOperator.java:47)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:109)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:78)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:40)
at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
at
org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:103)
at
org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:97)
at
org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45)
at
org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:750) {code}
If "grp ROW<x decimal(5, 2)>" is changed to "grp ROW<x int>", the test runs
successuflly in few seconds.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)