[ https://issues.apache.org/jira/browse/FLINK-30679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
hehuiyuan updated FLINK-30679: ------------------------------ Description: The project push down optimize:https://github.com/apache/flink/pull/21311 vectorize read: {code:java} java.lang.ArrayIndexOutOfBoundsException: 3 at org.apache.flink.connectors.hive.read.HiveTableInputFormat.useOrcVectorizedRead(HiveTableInputFormat.java:276) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:129) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at LookupFunction$26.flatMap(Unknown Source) ~[?:?] {code} mapreduce read: {code:java} java.lang.ArrayIndexOutOfBoundsException: 3 at org.apache.flink.connectors.hive.read.HiveMapredSplitReader.lambda$new$0(HiveMapredSplitReader.java:139) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) ~[?:1.8.0_301] at java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032) ~[?:1.8.0_301] at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) ~[?:1.8.0_301] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_301] at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_301] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) ~[?:1.8.0_301] at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) ~[?:1.8.0_301] at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) ~[?:1.8.0_301] at org.apache.flink.connectors.hive.read.HiveMapredSplitReader.<init>(HiveMapredSplitReader.java:141) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:157) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at LookupFunction$26.flatMap(Unknown Source) ~[?:?] at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] {code} The sql : {code:java} CREATE TABLE kafkaTableSource ( name string, age int, sex string, address string, ptime AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'hehuiyuan1', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.client.id' = 'test-consumer-group', 'properties.group.id' = 'test-consumer-group', 'format' = 'csv' ); CREATE TABLE printsink ( name string, age int, sex string, address string, score bigint, dt string ) WITH ( 'connector' = 'print' ); CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'hhy', 'hive-version' = '2.0.0', 'hadoop-conf-dir'='/Users/hehuiyuan/soft/hadoop/hadoop-2.7.3/etc/hadoop' ); USE CATALOG myhive; USE hhy; set table.sql-dialect=hive; CREATE TABLE IF NOT EXISTS tmp_flink_test_text ( name STRING, age INT, score BIGINT ) PARTITIONED BY (dt STRING) STORED AS TEXTFILE TBLPROPERTIES ( 'streaming-source.enable' = 'false', 'streaming-source.partition.include' = 'all', 'lookup.join.cache.ttl' = '5 min' ); set table.sql-dialect=default; USE CATALOG default_catalog; INSERT INTO default_catalog.default_database.printsink SELECT s.name, s.age, s.sex, s.address, r.score, r.dt FROM default_catalog.default_database.kafkaTableSource as s JOIN myhive.hhy.tmp_flink_test_text FOR SYSTEM_TIME AS OF s.ptime AS r ON r.name = s.name; {code} was: vectorize read: {code:java} java.lang.ArrayIndexOutOfBoundsException: 3 at org.apache.flink.connectors.hive.read.HiveTableInputFormat.useOrcVectorizedRead(HiveTableInputFormat.java:276) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:129) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at LookupFunction$26.flatMap(Unknown Source) ~[?:?] {code} mapreduce read: {code:java} java.lang.ArrayIndexOutOfBoundsException: 3 at org.apache.flink.connectors.hive.read.HiveMapredSplitReader.lambda$new$0(HiveMapredSplitReader.java:139) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) ~[?:1.8.0_301] at java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032) ~[?:1.8.0_301] at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) ~[?:1.8.0_301] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_301] at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_301] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) ~[?:1.8.0_301] at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) ~[?:1.8.0_301] at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) ~[?:1.8.0_301] at org.apache.flink.connectors.hive.read.HiveMapredSplitReader.<init>(HiveMapredSplitReader.java:141) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:157) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at LookupFunction$26.flatMap(Unknown Source) ~[?:?] at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] {code} The sql : {code:java} CREATE TABLE kafkaTableSource ( name string, age int, sex string, address string, ptime AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'hehuiyuan1', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.client.id' = 'test-consumer-group', 'properties.group.id' = 'test-consumer-group', 'format' = 'csv' ); CREATE TABLE printsink ( name string, age int, sex string, address string, score bigint, dt string ) WITH ( 'connector' = 'print' ); CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'hhy', 'hive-version' = '2.0.0', 'hadoop-conf-dir'='/Users/hehuiyuan/soft/hadoop/hadoop-2.7.3/etc/hadoop' ); USE CATALOG myhive; USE hhy; set table.sql-dialect=hive; CREATE TABLE IF NOT EXISTS tmp_flink_test_text ( name STRING, age INT, score BIGINT ) PARTITIONED BY (dt STRING) STORED AS TEXTFILE TBLPROPERTIES ( 'streaming-source.enable' = 'false', 'streaming-source.partition.include' = 'all', 'lookup.join.cache.ttl' = '5 min' ); set table.sql-dialect=default; USE CATALOG default_catalog; INSERT INTO default_catalog.default_database.printsink SELECT s.name, s.age, s.sex, s.address, r.score, r.dt FROM default_catalog.default_database.kafkaTableSource as s JOIN myhive.hhy.tmp_flink_test_text FOR SYSTEM_TIME AS OF s.ptime AS r ON r.name = s.name; {code} > Can not load the data of hive dim table when project-push-down is introduced > ---------------------------------------------------------------------------- > > Key: FLINK-30679 > URL: https://issues.apache.org/jira/browse/FLINK-30679 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive > Affects Versions: 1.14.6 > Reporter: hehuiyuan > Priority: Critical > > The project push down optimize:https://github.com/apache/flink/pull/21311 > > vectorize read: > > {code:java} > java.lang.ArrayIndexOutOfBoundsException: 3 > at > org.apache.flink.connectors.hive.read.HiveTableInputFormat.useOrcVectorizedRead(HiveTableInputFormat.java:276) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:129) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] > at > org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] > at LookupFunction$26.flatMap(Unknown Source) ~[?:?] {code} > > > mapreduce read: > > {code:java} > java.lang.ArrayIndexOutOfBoundsException: 3 > at > org.apache.flink.connectors.hive.read.HiveMapredSplitReader.lambda$new$0(HiveMapredSplitReader.java:139) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) > ~[?:1.8.0_301] > at > java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032) > ~[?:1.8.0_301] > at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) > ~[?:1.8.0_301] > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > ~[?:1.8.0_301] > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > ~[?:1.8.0_301] > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) > ~[?:1.8.0_301] > at > java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) > ~[?:1.8.0_301] > at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) > ~[?:1.8.0_301] > at > org.apache.flink.connectors.hive.read.HiveMapredSplitReader.<init>(HiveMapredSplitReader.java:141) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:157) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] > at > org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] > at LookupFunction$26.flatMap(Unknown Source) ~[?:?] > at > org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] > at > org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] {code} > > > The sql : > > {code:java} > CREATE TABLE kafkaTableSource ( > name string, > age int, > sex string, > address string, > ptime AS PROCTIME() > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'hehuiyuan1', > 'scan.startup.mode' = 'latest-offset', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.client.id' = 'test-consumer-group', > 'properties.group.id' = 'test-consumer-group', > 'format' = 'csv' > ); > CREATE TABLE printsink ( > name string, > age int, > sex string, > address string, > score bigint, > dt string > ) WITH ( > 'connector' = 'print' > ); > CREATE CATALOG myhive > WITH ( > 'type' = 'hive', > 'default-database' = 'hhy', > 'hive-version' = '2.0.0', > > 'hadoop-conf-dir'='/Users/hehuiyuan/soft/hadoop/hadoop-2.7.3/etc/hadoop' > ); > USE CATALOG myhive; > USE hhy; > set table.sql-dialect=hive; > CREATE TABLE IF NOT EXISTS tmp_flink_test_text ( > name STRING, > age INT, > score BIGINT > ) PARTITIONED BY (dt STRING) STORED AS TEXTFILE TBLPROPERTIES ( > 'streaming-source.enable' = 'false', > 'streaming-source.partition.include' = 'all', > 'lookup.join.cache.ttl' = '5 min' > ); > set table.sql-dialect=default; > USE CATALOG default_catalog; > INSERT INTO default_catalog.default_database.printsink > SELECT s.name, s.age, s.sex, s.address, r.score, r.dt > FROM default_catalog.default_database.kafkaTableSource as s > JOIN myhive.hhy.tmp_flink_test_text FOR SYSTEM_TIME AS OF s.ptime AS r > ON r.name = s.name; > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)