[ 
https://issues.apache.org/jira/browse/FLINK-35698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35698:
-------------------------------
    Fix Version/s: 2.0.0
                       (was: 1.20.0)
                       (was: 1.19.2)

> Parquet connector fails to load ROW<x decimal(5, 2)> after save
> ---------------------------------------------------------------
>
>                 Key: FLINK-35698
>                 URL: https://issues.apache.org/jira/browse/FLINK-35698
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / API
>    Affects Versions: 1.17.0, 1.18.0, 1.19.0
>            Reporter: Andrey Gaskov
>            Priority: Critical
>             Fix For: 2.0.0
>
>
> The bug could be reproduced by the following test added to 
> ParquetFileSystemITCase.java:
> {code:java}
> @TestTemplate
> void testRowColumnType() throws IOException, ExecutionException, 
> InterruptedException {
>     String path = 
> TempDirUtils.newFolder(super.fileTempFolder()).toURI().getPath();
>     super.tableEnv()
>             .executeSql(
>                     "create table t_in("
>                             + "grp ROW<x decimal(5, 2)>"
>                             + ") with ("
>                             + "'connector' = 'datagen',"
>                             + "'number-of-rows' = '10'"
>                             + ")");
>     super.tableEnv()
>             .executeSql(
>                     "create table t_out("
>                             + "grp ROW<x decimal(5, 2)>"
>                             + ") with ("
>                             + "'connector' = 'filesystem',"
>                             + "'path' = '"
>                             + path
>                             + "',"
>                             + "'format' = 'parquet'"
>                             + ")");
>     super.tableEnv().executeSql("insert into t_out select * from 
> t_in").await();
>     List<Row> rows =
>             CollectionUtil.iteratorToList(
>                     super.tableEnv().executeSql("select * from t_out limit 
> 10").collect());
>     assertThat(rows).hasSize(10);
> } {code}
> It fails with this root exception after hanging for 40 seconds:
> {code:java}
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.table.data.columnar.vector.heap.HeapIntVector cannot be cast 
> to org.apache.flink.table.data.columnar.vector.DecimalColumnVector
>     at 
> org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch.getDecimal(VectorizedColumnBatch.java:118)
>     at 
> org.apache.flink.table.data.columnar.ColumnarRowData.getDecimal(ColumnarRowData.java:128)
>     at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$89bd9445$1(RowData.java:233)
>     at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:207)
>     at 
> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeRow(AbstractBinaryWriter.java:147)
>     at 
> org.apache.flink.table.data.writer.BinaryRowWriter.writeRow(BinaryRowWriter.java:27)
>     at 
> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:155)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48)
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:173)
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:44)
>     at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>     at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:152)
>     at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:108)
>     at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
>     at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:140)
>     at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collectAndCheckIfChained(RecordWriterOutput.java:120)
>     at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:101)
>     at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:53)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:60)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:32)
>     at 
> org.apache.flink.table.runtime.operators.sort.LimitOperator.processElement(LimitOperator.java:47)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:109)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:78)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:40)
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
>     at 
> org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:103)
>     at 
> org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:97)
>     at 
> org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45)
>     at 
> org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35)
>     at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
>     at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>     at java.lang.Thread.run(Thread.java:750) {code}
> If "grp ROW<x decimal(5, 2)>" is changed to "grp ROW<x int>", the test runs 
> successuflly in few seconds.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to