Jichao Wang created FLINK-35523:
-----------------------------------

             Summary: When using the Hive connector to read a Hive table in 
Parquet format, a null pointer exception is thrown.
                 Key: FLINK-35523
                 URL: https://issues.apache.org/jira/browse/FLINK-35523
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Hive
    Affects Versions: 1.19.0, 1.16.2
            Reporter: Jichao Wang


When using the Hive connector to read a Hive table in Parquet format, a null 
pointer exception is thrown.
The exception stack information is as follows:
{code:text}
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception 
while polling the records
        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
 ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
 [flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_342]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_342]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_342]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_342]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342]
Caused by: java.io.IOException: java.lang.IndexOutOfBoundsException: Index: 1, 
Size: 1
        at 
org.apache.flink.connector.file.src.impl.HdpFileSourceSplitReader.fetch(HdpFileSourceSplitReader.java:40)
 ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
        at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
 ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
 ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
        ... 6 more
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
        at java.util.ArrayList.rangeCheck(ArrayList.java:659) ~[?:1.8.0_342]
        at java.util.ArrayList.get(ArrayList.java:435) ~[?:1.8.0_342]
        at 
org.apache.flink.hive.shaded.parquet.schema.GroupType.getType(GroupType.java:216)
 ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
        at 
org.apache.flink.hive.shaded.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536)
 ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
        at 
org.apache.flink.hive.shaded.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:503)
 ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
        at 
org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createWritableVectors(ParquetVectorizedInputFormat.java:277)
 ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
        at 
org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReaderBatch(ParquetVectorizedInputFormat.java:266)
 ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
        at 
org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createPoolOfBatches(ParquetVectorizedInputFormat.java:256)
 ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
        at 
org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:139)
 ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
        at 
org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:75)
 ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
        at 
org.apache.flink.connectors.hive.read.HiveInputFormat.createReader(HiveInputFormat.java:110)
 ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
        at 
org.apache.flink.connectors.hive.read.HiveInputFormat.createReader(HiveInputFormat.java:65)
 ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
        at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
 ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
        at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
 ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
        at 
org.apache.flink.connector.file.src.impl.HdpFileSourceSplitReader.lambda$fetch$0(HdpFileSourceSplitReader.java:38)
 ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
        at 
org.apache.flink.runtime.security.SecurityUtils.runAtOneContext(SecurityUtils.java:108)
 ~[flink-dist-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
        at 
org.apache.flink.connector.file.src.impl.HdpFileSourceSplitReader.fetch(HdpFileSourceSplitReader.java:38)
 ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
        at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
 ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
 ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
        ... 6 more{code}
Here's how to reproduce:
1. Create a parquet table parquet_array_struct in Hive
{code:sql}
create table parquet_array_struct(a array<struct<a1:int, a2:int>>) stored as 
parquet; {code}
2. Insert data into the parquet_array_struct table:
{code:sql}
insert into parquet_array_struct select array(named_struct('a1', 1, 'a2', 2)) 
{code}
3. Develop a Flink SQL task to read data from parquet_array_struct tables:
{code:java}
EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String name            = "myhive";
String defaultDatabase = "default";
String hiveConfDir     = null;

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");
tableEnv.useDatabase(defaultDatabase);
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("select * from parquet_array_struct");
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to