[ 
https://issues.apache.org/jira/browse/FLINK-29547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29547:
-----------------------------------
    Labels: pull-request-available  (was: )

> Select a[1] which is  array type for parquet complex type throw 
> ClassCastException
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-29547
>                 URL: https://issues.apache.org/jira/browse/FLINK-29547
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.16.0
>            Reporter: dalongliu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.17.0
>
>
> Regarding the following SQL test in HiveTableSourceITCase, it will throw 
> ClassCastException.
> {code:java}
> batchTableEnv.executeSql(
>         "create table parquet_complex_type_test("
>                 + "a array<int>, m map<int,string>, s 
> struct<f1:int,f2:bigint>) stored as parquet");
> String[] modules = batchTableEnv.listModules();
> // load hive module so that we can use array,map, named_struct function
> // for convenient writing complex data
> batchTableEnv.loadModule("hive", new HiveModule());
> batchTableEnv.useModules("hive", CoreModuleFactory.IDENTIFIER);
> batchTableEnv
>         .executeSql(
>                 "insert into parquet_complex_type_test"
>                         + " select array(1, 2), map(1, 'val1', 2, 'val2'),"
>                         + " named_struct('f1', 1,  'f2', 2)")
>         .await();
> Table src = batchTableEnv.sqlQuery("select a[1] from 
> parquet_complex_type_test");
> List<Row> rows = CollectionUtil.iteratorToList(src.execute().collect());{code}
> The exception stack: 
> Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast 
> to [Ljava.lang.Integer;
>     at BatchExecCalc$37.processElement(Unknown Source)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
>     at 
> org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:98)
>     at 
> org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:92)
>     at 
> org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45)
>     at 
> org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35)
>     at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:144)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:401)
>     at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>     at java.lang.Thread.run(Thread.java:748)
>  
> After debugging the code, I found the root cause is that source operator 
> reads array data from parquet in the vectorized way, and it returns 
> ColumnarArrayData, then in the calc operator we convert it to 
> GenericArrayData, the object array is Object[] type instead of Integer[], so 
> if we call the ArrayObjectArrayConverter#toExternal method converts it to 
> Integer[], it still returns Object[] type, and then if convert the array to 
> Integer[] type forcedly, we will get the exception.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to