[ 
https://issues.apache.org/jira/browse/FLINK-30679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hehuiyuan updated FLINK-30679:
------------------------------
    Description: 
The project push down optimize:https://github.com/apache/flink/pull/21311

 

vectorize read:

 
{code:java}
java.lang.ArrayIndexOutOfBoundsException: 3
    at 
org.apache.flink.connectors.hive.read.HiveTableInputFormat.useOrcVectorizedRead(HiveTableInputFormat.java:276)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:129)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
    at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
    at LookupFunction$26.flatMap(Unknown Source) ~[?:?] {code}
 

 

mapreduce read:

 
{code:java}
java.lang.ArrayIndexOutOfBoundsException: 3
    at 
org.apache.flink.connectors.hive.read.HiveMapredSplitReader.lambda$new$0(HiveMapredSplitReader.java:139)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) 
~[?:1.8.0_301]
    at 
java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032)
 ~[?:1.8.0_301]
    at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) 
~[?:1.8.0_301]
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) 
~[?:1.8.0_301]
    at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
~[?:1.8.0_301]
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) 
~[?:1.8.0_301]
    at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
 ~[?:1.8.0_301]
    at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) 
~[?:1.8.0_301]
    at 
org.apache.flink.connectors.hive.read.HiveMapredSplitReader.<init>(HiveMapredSplitReader.java:141)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:157)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
    at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
    at LookupFunction$26.flatMap(Unknown Source) ~[?:?]
    at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
    at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] {code}
 

 

The sql :

 
{code:java}
CREATE TABLE kafkaTableSource (
    name string,
    age int,
    sex string,
    address string,
    ptime AS PROCTIME()
) WITH (
    'connector' = 'kafka',
    'topic' = 'hehuiyuan1',
    'scan.startup.mode' = 'latest-offset',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.client.id' = 'test-consumer-group',
    'properties.group.id' = 'test-consumer-group',
    'format' = 'csv'
);

CREATE TABLE printsink (
    name string,
    age int,
    sex string,
    address string,
    score bigint,
    dt string
) WITH (
    'connector' = 'print'
);

CREATE CATALOG myhive
WITH (
        'type' = 'hive',
        'default-database' = 'hhy',
        'hive-version' = '2.0.0',
        'hadoop-conf-dir'='/Users/hehuiyuan/soft/hadoop/hadoop-2.7.3/etc/hadoop'
);

USE CATALOG myhive;
USE hhy;

set table.sql-dialect=hive;
CREATE TABLE IF NOT EXISTS tmp_flink_test_text (
    name STRING,
    age INT,
    score BIGINT
) PARTITIONED BY (dt STRING) STORED AS TEXTFILE TBLPROPERTIES (
    'streaming-source.enable' = 'false',
    'streaming-source.partition.include' = 'all',
    'lookup.join.cache.ttl' = '5 min'
);
set table.sql-dialect=default;

USE CATALOG default_catalog;
INSERT INTO default_catalog.default_database.printsink
SELECT s.name, s.age, s.sex, s.address, r.score, r.dt
FROM default_catalog.default_database.kafkaTableSource  as s
JOIN myhive.hhy.tmp_flink_test_text FOR SYSTEM_TIME AS OF s.ptime  AS r
ON r.name = s.name;
 {code}
 

 

  was:
vectorize read:

 
{code:java}
java.lang.ArrayIndexOutOfBoundsException: 3
    at 
org.apache.flink.connectors.hive.read.HiveTableInputFormat.useOrcVectorizedRead(HiveTableInputFormat.java:276)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:129)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
    at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
    at LookupFunction$26.flatMap(Unknown Source) ~[?:?] {code}
 

 

mapreduce read:

 
{code:java}
java.lang.ArrayIndexOutOfBoundsException: 3
    at 
org.apache.flink.connectors.hive.read.HiveMapredSplitReader.lambda$new$0(HiveMapredSplitReader.java:139)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) 
~[?:1.8.0_301]
    at 
java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032)
 ~[?:1.8.0_301]
    at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) 
~[?:1.8.0_301]
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) 
~[?:1.8.0_301]
    at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
~[?:1.8.0_301]
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) 
~[?:1.8.0_301]
    at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
 ~[?:1.8.0_301]
    at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) 
~[?:1.8.0_301]
    at 
org.apache.flink.connectors.hive.read.HiveMapredSplitReader.<init>(HiveMapredSplitReader.java:141)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:157)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
    at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
    at LookupFunction$26.flatMap(Unknown Source) ~[?:?]
    at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
    at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] {code}
 

 

The sql :

 
{code:java}
CREATE TABLE kafkaTableSource (
    name string,
    age int,
    sex string,
    address string,
    ptime AS PROCTIME()
) WITH (
    'connector' = 'kafka',
    'topic' = 'hehuiyuan1',
    'scan.startup.mode' = 'latest-offset',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.client.id' = 'test-consumer-group',
    'properties.group.id' = 'test-consumer-group',
    'format' = 'csv'
);

CREATE TABLE printsink (
    name string,
    age int,
    sex string,
    address string,
    score bigint,
    dt string
) WITH (
    'connector' = 'print'
);

CREATE CATALOG myhive
WITH (
        'type' = 'hive',
        'default-database' = 'hhy',
        'hive-version' = '2.0.0',
        'hadoop-conf-dir'='/Users/hehuiyuan/soft/hadoop/hadoop-2.7.3/etc/hadoop'
);

USE CATALOG myhive;
USE hhy;

set table.sql-dialect=hive;
CREATE TABLE IF NOT EXISTS tmp_flink_test_text (
    name STRING,
    age INT,
    score BIGINT
) PARTITIONED BY (dt STRING) STORED AS TEXTFILE TBLPROPERTIES (
    'streaming-source.enable' = 'false',
    'streaming-source.partition.include' = 'all',
    'lookup.join.cache.ttl' = '5 min'
);
set table.sql-dialect=default;

USE CATALOG default_catalog;
INSERT INTO default_catalog.default_database.printsink
SELECT s.name, s.age, s.sex, s.address, r.score, r.dt
FROM default_catalog.default_database.kafkaTableSource  as s
JOIN myhive.hhy.tmp_flink_test_text FOR SYSTEM_TIME AS OF s.ptime  AS r
ON r.name = s.name;
 {code}
 

 


> Can not load the data of hive dim table when project-push-down is introduced
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-30679
>                 URL: https://issues.apache.org/jira/browse/FLINK-30679
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Hive
>    Affects Versions: 1.14.6
>            Reporter: hehuiyuan
>            Priority: Critical
>
> The project push down optimize:https://github.com/apache/flink/pull/21311
>  
> vectorize read:
>  
> {code:java}
> java.lang.ArrayIndexOutOfBoundsException: 3
>     at 
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.useOrcVectorizedRead(HiveTableInputFormat.java:276)
>  ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
>     at 
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:129)
>  ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
>     at 
> org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86)
>  ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
>     at 
> org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132)
>  ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
>     at 
> org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105)
>  ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
>     at LookupFunction$26.flatMap(Unknown Source) ~[?:?] {code}
>  
>  
> mapreduce read:
>  
> {code:java}
> java.lang.ArrayIndexOutOfBoundsException: 3
>     at 
> org.apache.flink.connectors.hive.read.HiveMapredSplitReader.lambda$new$0(HiveMapredSplitReader.java:139)
>  ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
>     at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) 
> ~[?:1.8.0_301]
>     at 
> java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032)
>  ~[?:1.8.0_301]
>     at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) 
> ~[?:1.8.0_301]
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) 
> ~[?:1.8.0_301]
>     at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
> ~[?:1.8.0_301]
>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) 
> ~[?:1.8.0_301]
>     at 
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
>  ~[?:1.8.0_301]
>     at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) 
> ~[?:1.8.0_301]
>     at 
> org.apache.flink.connectors.hive.read.HiveMapredSplitReader.<init>(HiveMapredSplitReader.java:141)
>  ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
>     at 
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:157)
>  ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
>     at 
> org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86)
>  ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
>     at 
> org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132)
>  ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
>     at 
> org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105)
>  ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
>     at LookupFunction$26.flatMap(Unknown Source) ~[?:?]
>     at 
> org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81)
>  ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
>     at 
> org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34)
>  ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] {code}
>  
>  
> The sql :
>  
> {code:java}
> CREATE TABLE kafkaTableSource (
>     name string,
>     age int,
>     sex string,
>     address string,
>     ptime AS PROCTIME()
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = 'hehuiyuan1',
>     'scan.startup.mode' = 'latest-offset',
>     'properties.bootstrap.servers' = 'localhost:9092',
>     'properties.client.id' = 'test-consumer-group',
>     'properties.group.id' = 'test-consumer-group',
>     'format' = 'csv'
> );
> CREATE TABLE printsink (
>     name string,
>     age int,
>     sex string,
>     address string,
>     score bigint,
>     dt string
> ) WITH (
>     'connector' = 'print'
> );
> CREATE CATALOG myhive
> WITH (
>         'type' = 'hive',
>         'default-database' = 'hhy',
>         'hive-version' = '2.0.0',
>         
> 'hadoop-conf-dir'='/Users/hehuiyuan/soft/hadoop/hadoop-2.7.3/etc/hadoop'
> );
> USE CATALOG myhive;
> USE hhy;
> set table.sql-dialect=hive;
> CREATE TABLE IF NOT EXISTS tmp_flink_test_text (
>     name STRING,
>     age INT,
>     score BIGINT
> ) PARTITIONED BY (dt STRING) STORED AS TEXTFILE TBLPROPERTIES (
>     'streaming-source.enable' = 'false',
>     'streaming-source.partition.include' = 'all',
>     'lookup.join.cache.ttl' = '5 min'
> );
> set table.sql-dialect=default;
> USE CATALOG default_catalog;
> INSERT INTO default_catalog.default_database.printsink
> SELECT s.name, s.age, s.sex, s.address, r.score, r.dt
> FROM default_catalog.default_database.kafkaTableSource  as s
> JOIN myhive.hhy.tmp_flink_test_text FOR SYSTEM_TIME AS OF s.ptime  AS r
> ON r.name = s.name;
>  {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to