[ 
https://issues.apache.org/jira/browse/FLINK-26016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-26016:
-----------------------------------
    Labels: pull-request-available  (was: )

> FileSystemLookupFunction does not produce correct results when hive table 
> uses columnar storage
> -----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26016
>                 URL: https://issues.apache.org/jira/browse/FLINK-26016
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Hive
>    Affects Versions: 1.14.3
>            Reporter: jinfeng
>            Priority: Major
>              Labels: pull-request-available
>
> When I use the parquet hive table as the lookup table, there will be some 
> records that cannot be joined. This can be reproduced by adding unit tests to 
> HiveLookupJoinITCase.
> {code:java}
>   // create the hive table with columnar storage.
>         tableEnv.executeSql(
>                 String.format(
>                         "create table columnar_table (x string) STORED AS 
> PARQUET "
>                                 + "tblproperties ('%s'='5min')",
>                         HiveOptions.LOOKUP_JOIN_CACHE_TTL.key()));
>     @Test
>     public void testLookupJoinTableWithColumnarStorage() throws Exception {
>         // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch 
> is 2048, we should
>         // write as least 2048 records to the test table.
>         List<Row> testData = new ArrayList<>(4096);
>         for (int i = 0; i < 4096; i++) {
>             testData.add(Row.of(String.valueOf(i)));
>         }
>         // constructs test data using values table
>         TableEnvironment batchEnv = 
> HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT);
>         batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
>         batchEnv.useCatalog(hiveCatalog.getName());
>         String dataId = TestValuesTableFactory.registerData(testData);
>         batchEnv.executeSql(
>                 String.format(
>                         "create table value_source(x string, p as proctime()) 
> with ("
>                                 + "'connector' = 'values', 'data-id' = '%s', 
> 'bounded'='true')",
>                         dataId));
>         batchEnv.executeSql("insert overwrite columnar_table select x from 
> value_source").await();
>         TableImpl flinkTable =
>                 (TableImpl)
>                         tableEnv.sqlQuery(
>                                 "select t.x as x1, c.x as x2 from 
> value_source t "
>                                         + "left join columnar_table for 
> system_time as of t.p c "
>                                         + "on t.x = c.x where c.x is null");
>         List<Row> results = 
> CollectionUtil.iteratorToList(flinkTable.execute().collect());
>         assertTrue(results.size() == 0);
>     }
> {code}
> The problem may be caused by the following code. 
> {code:java}
> RowData row;
> while ((row = partitionReader.read(reuse)) != null) {
>    count++;
>    RowData key = extractLookupKey(row);
>    List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
>    rows.add(serializer.copy(row));
> }
> {code}
>          
> It can be fixed with the following modification
> {code:java}
> RowData row;
> while ((row = partitionReader.read(reuse)) != null) {
>     count++;
>     RowData rowData = serializer.copy(row);
>     RowData key = extractLookupKey(rowData);
>     List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
>     rows.add(rowData);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to