dalongliu created FLINK-29547: --------------------------------- Summary: Select a[1] which is array type for parquet complex type throw ClassCastException Key: FLINK-29547 URL: https://issues.apache.org/jira/browse/FLINK-29547 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.16.0 Reporter: dalongliu Fix For: 1.17.0
Regarding the following SQL test in HiveTableSourceITCase, it will throw ClassCastException. {code:java} batchTableEnv.executeSql( "create table parquet_complex_type_test(" + "a array<int>, m map<int,string>, s struct<f1:int,f2:bigint>) stored as parquet"); String[] modules = batchTableEnv.listModules(); // load hive module so that we can use array,map, named_struct function // for convenient writing complex data batchTableEnv.loadModule("hive", new HiveModule()); batchTableEnv.useModules("hive", CoreModuleFactory.IDENTIFIER); batchTableEnv .executeSql( "insert into parquet_complex_type_test" + " select array(1, 2), map(1, 'val1', 2, 'val2')," + " named_struct('f1', 1, 'f2', 2)") .await(); Table src = batchTableEnv.sqlQuery("select a[1] from parquet_complex_type_test"); List<Row> rows = CollectionUtil.iteratorToList(src.execute().collect());{code} The exception stack: Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Ljava.lang.Integer; at BatchExecCalc$37.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) at org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:98) at org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:92) at org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45) at org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:144) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:401) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.lang.Thread.run(Thread.java:748) After debugging the code, I found the root cause is that source operator reads array data from parquet in the vectorized way, and it returns ColumnarArrayData, then in the calc operator we convert it to GenericArrayData, the object array is Object[] type instead of Integer[], so if we call the ArrayObjectArrayConverter#toExternal method converts it to Integer[], it still returns Object[] type, and then if convert the array to Integer[] type forcedly, we will get the exception. -- This message was sent by Atlassian Jira (v8.20.10#820010)