hehuiyuan created FLINK-30679:
---------------------------------
Summary: Can not load the data of hive dim table when
project-push-down is introduced
Key: FLINK-30679
URL: https://issues.apache.org/jira/browse/FLINK-30679
Project: Flink
Issue Type: Bug
Reporter: hehuiyuan
vectorize read:
{code:java}
java.lang.ArrayIndexOutOfBoundsException: 3
at
org.apache.flink.connectors.hive.read.HiveTableInputFormat.useOrcVectorizedRead(HiveTableInputFormat.java:276)
~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:129)
~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86)
~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132)
~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
at
org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105)
~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
at LookupFunction$26.flatMap(Unknown Source) ~[?:?] {code}
mapreduce read:
{code:java}
java.lang.ArrayIndexOutOfBoundsException: 3
at
org.apache.flink.connectors.hive.read.HiveMapredSplitReader.lambda$new$0(HiveMapredSplitReader.java:139)
~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
~[?:1.8.0_301]
at
java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032)
~[?:1.8.0_301]
at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
~[?:1.8.0_301]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
~[?:1.8.0_301]
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
~[?:1.8.0_301]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
~[?:1.8.0_301]
at
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
~[?:1.8.0_301]
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
~[?:1.8.0_301]
at
org.apache.flink.connectors.hive.read.HiveMapredSplitReader.<init>(HiveMapredSplitReader.java:141)
~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:157)
~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86)
~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132)
~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
at
org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105)
~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
at LookupFunction$26.flatMap(Unknown Source) ~[?:?]
at
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81)
~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
at
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34)
~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] {code}
The sql :
{code:java}
CREATE TABLE kafkaTableSource (
name string,
age int,
sex string,
address string,
ptime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'hehuiyuan1',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.client.id' = 'test-consumer-group',
'properties.group.id' = 'test-consumer-group',
'format' = 'csv'
);
CREATE TABLE printsink (
name string,
age int,
sex string,
address string,
score bigint,
dt string
) WITH (
'connector' = 'print'
);
CREATE CATALOG myhive
WITH (
'type' = 'hive',
'default-database' = 'hhy',
'hive-version' = '2.0.0',
'hadoop-conf-dir'='/Users/hehuiyuan/soft/hadoop/hadoop-2.7.3/etc/hadoop'
);
USE CATALOG myhive;
USE hhy;
set table.sql-dialect=hive;
CREATE TABLE IF NOT EXISTS tmp_flink_test_text (
name STRING,
age INT,
score BIGINT
) PARTITIONED BY (dt STRING) STORED AS TEXTFILE TBLPROPERTIES (
'streaming-source.enable' = 'false',
'streaming-source.partition.include' = 'all',
'lookup.join.cache.ttl' = '5 min'
);
set table.sql-dialect=default;
USE CATALOG default_catalog;
INSERT INTO default_catalog.default_database.printsink
SELECT s.name, s.age, s.sex, s.address, r.score, r.dt
FROM default_catalog.default_database.kafkaTableSource as s
JOIN myhive.hhy.tmp_flink_test_text FOR SYSTEM_TIME AS OF s.ptime AS r
ON r.name = s.name;
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)