[ https://issues.apache.org/jira/browse/FLINK-30679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
hehuiyuan updated FLINK-30679: ------------------------------ Component/s: Connectors / Hive > Can not load the data of hive dim table when project-push-down is introduced > ---------------------------------------------------------------------------- > > Key: FLINK-30679 > URL: https://issues.apache.org/jira/browse/FLINK-30679 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive > Reporter: hehuiyuan > Priority: Critical > > vectorize read: > > {code:java} > java.lang.ArrayIndexOutOfBoundsException: 3 > at > org.apache.flink.connectors.hive.read.HiveTableInputFormat.useOrcVectorizedRead(HiveTableInputFormat.java:276) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:129) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] > at > org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] > at LookupFunction$26.flatMap(Unknown Source) ~[?:?] {code} > > > mapreduce read: > > {code:java} > java.lang.ArrayIndexOutOfBoundsException: 3 > at > org.apache.flink.connectors.hive.read.HiveMapredSplitReader.lambda$new$0(HiveMapredSplitReader.java:139) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) > ~[?:1.8.0_301] > at > java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032) > ~[?:1.8.0_301] > at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) > ~[?:1.8.0_301] > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > ~[?:1.8.0_301] > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > ~[?:1.8.0_301] > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) > ~[?:1.8.0_301] > at > java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) > ~[?:1.8.0_301] > at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) > ~[?:1.8.0_301] > at > org.apache.flink.connectors.hive.read.HiveMapredSplitReader.<init>(HiveMapredSplitReader.java:141) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:157) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] > at > org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] > at LookupFunction$26.flatMap(Unknown Source) ~[?:?] > at > org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] > at > org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] {code} > > > The sql : > > {code:java} > CREATE TABLE kafkaTableSource ( > name string, > age int, > sex string, > address string, > ptime AS PROCTIME() > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'hehuiyuan1', > 'scan.startup.mode' = 'latest-offset', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.client.id' = 'test-consumer-group', > 'properties.group.id' = 'test-consumer-group', > 'format' = 'csv' > ); > CREATE TABLE printsink ( > name string, > age int, > sex string, > address string, > score bigint, > dt string > ) WITH ( > 'connector' = 'print' > ); > CREATE CATALOG myhive > WITH ( > 'type' = 'hive', > 'default-database' = 'hhy', > 'hive-version' = '2.0.0', > > 'hadoop-conf-dir'='/Users/hehuiyuan/soft/hadoop/hadoop-2.7.3/etc/hadoop' > ); > USE CATALOG myhive; > USE hhy; > set table.sql-dialect=hive; > CREATE TABLE IF NOT EXISTS tmp_flink_test_text ( > name STRING, > age INT, > score BIGINT > ) PARTITIONED BY (dt STRING) STORED AS TEXTFILE TBLPROPERTIES ( > 'streaming-source.enable' = 'false', > 'streaming-source.partition.include' = 'all', > 'lookup.join.cache.ttl' = '5 min' > ); > set table.sql-dialect=default; > USE CATALOG default_catalog; > INSERT INTO default_catalog.default_database.printsink > SELECT s.name, s.age, s.sex, s.address, r.score, r.dt > FROM default_catalog.default_database.kafkaTableSource as s > JOIN myhive.hhy.tmp_flink_test_text FOR SYSTEM_TIME AS OF s.ptime AS r > ON r.name = s.name; > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)