[ https://issues.apache.org/jira/browse/FLINK-26016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-26016: ----------------------------------- Labels: pull-request-available (was: ) > FileSystemLookupFunction does not produce correct results when hive table > uses columnar storage > ----------------------------------------------------------------------------------------------- > > Key: FLINK-26016 > URL: https://issues.apache.org/jira/browse/FLINK-26016 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive > Affects Versions: 1.14.3 > Reporter: jinfeng > Priority: Major > Labels: pull-request-available > > When I use the parquet hive table as the lookup table, there will be some > records that cannot be joined. This can be reproduced by adding unit tests to > HiveLookupJoinITCase. > {code:java} > // create the hive table with columnar storage. > tableEnv.executeSql( > String.format( > "create table columnar_table (x string) STORED AS > PARQUET " > + "tblproperties ('%s'='5min')", > HiveOptions.LOOKUP_JOIN_CACHE_TTL.key())); > @Test > public void testLookupJoinTableWithColumnarStorage() throws Exception { > // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch > is 2048, we should > // write as least 2048 records to the test table. > List<Row> testData = new ArrayList<>(4096); > for (int i = 0; i < 4096; i++) { > testData.add(Row.of(String.valueOf(i))); > } > // constructs test data using values table > TableEnvironment batchEnv = > HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT); > batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); > batchEnv.useCatalog(hiveCatalog.getName()); > String dataId = TestValuesTableFactory.registerData(testData); > batchEnv.executeSql( > String.format( > "create table value_source(x string, p as proctime()) > with (" > + "'connector' = 'values', 'data-id' = '%s', > 'bounded'='true')", > dataId)); > batchEnv.executeSql("insert overwrite columnar_table select x from > value_source").await(); > TableImpl flinkTable = > (TableImpl) > tableEnv.sqlQuery( > "select t.x as x1, c.x as x2 from > value_source t " > + "left join columnar_table for > system_time as of t.p c " > + "on t.x = c.x where c.x is null"); > List<Row> results = > CollectionUtil.iteratorToList(flinkTable.execute().collect()); > assertTrue(results.size() == 0); > } > {code} > The problem may be caused by the following code. > {code:java} > RowData row; > while ((row = partitionReader.read(reuse)) != null) { > count++; > RowData key = extractLookupKey(row); > List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>()); > rows.add(serializer.copy(row)); > } > {code} > > It can be fixed with the following modification > {code:java} > RowData row; > while ((row = partitionReader.read(reuse)) != null) { > count++; > RowData rowData = serializer.copy(row); > RowData key = extractLookupKey(rowData); > List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>()); > rows.add(rowData); > } > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)