Jichao Wang created FLINK-35523: ----------------------------------- Summary: When using the Hive connector to read a Hive table in Parquet format, a null pointer exception is thrown. Key: FLINK-35523 URL: https://issues.apache.org/jira/browse/FLINK-35523 Project: Flink Issue Type: Bug Components: Connectors / Hive Affects Versions: 1.19.0, 1.16.2 Reporter: Jichao Wang
When using the Hive connector to read a Hive table in Parquet format, a null pointer exception is thrown. The exception stack information is as follows: {code:text} java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150) ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) [flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_342] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_342] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342] Caused by: java.io.IOException: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1 at org.apache.flink.connector.file.src.impl.HdpFileSourceSplitReader.fetch(HdpFileSourceSplitReader.java:40) ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] ... 6 more Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1 at java.util.ArrayList.rangeCheck(ArrayList.java:659) ~[?:1.8.0_342] at java.util.ArrayList.get(ArrayList.java:435) ~[?:1.8.0_342] at org.apache.flink.hive.shaded.parquet.schema.GroupType.getType(GroupType.java:216) ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.hive.shaded.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536) ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.hive.shaded.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:503) ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createWritableVectors(ParquetVectorizedInputFormat.java:277) ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReaderBatch(ParquetVectorizedInputFormat.java:266) ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createPoolOfBatches(ParquetVectorizedInputFormat.java:256) ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:139) ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:75) ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.connectors.hive.read.HiveInputFormat.createReader(HiveInputFormat.java:110) ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.connectors.hive.read.HiveInputFormat.createReader(HiveInputFormat.java:65) ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112) ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65) ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.connector.file.src.impl.HdpFileSourceSplitReader.lambda$fetch$0(HdpFileSourceSplitReader.java:38) ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.runtime.security.SecurityUtils.runAtOneContext(SecurityUtils.java:108) ~[flink-dist-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.connector.file.src.impl.HdpFileSourceSplitReader.fetch(HdpFileSourceSplitReader.java:38) ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] ... 6 more{code} Here's how to reproduce: 1. Create a parquet table parquet_array_struct in Hive {code:sql} create table parquet_array_struct(a array<struct<a1:int, a2:int>>) stored as parquet; {code} 2. Insert data into the parquet_array_struct table: {code:sql} insert into parquet_array_struct select array(named_struct('a1', 1, 'a2', 2)) {code} 3. Develop a Flink SQL task to read data from parquet_array_struct tables: {code:java} EnvironmentSettings settings = EnvironmentSettings.inBatchMode(); TableEnvironment tableEnv = TableEnvironment.create(settings); String name = "myhive"; String defaultDatabase = "default"; String hiveConfDir = null; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir); tableEnv.registerCatalog("myhive", hive); // set the HiveCatalog as the current catalog of the session tableEnv.useCatalog("myhive"); tableEnv.useDatabase(defaultDatabase); tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnv.executeSql("select * from parquet_array_struct"); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)