jinfeng created FLINK-26016: ------------------------------- Summary: FileSystemLookupFunction does not produce correct results when hive table uses columnar storage Key: FLINK-26016 URL: https://issues.apache.org/jira/browse/FLINK-26016 Project: Flink Issue Type: Bug Components: Connectors / Hive Affects Versions: 1.14.3 Reporter: jinfeng
When I use the parquet hive table as the lookup table, there will be some records that cannot be joined. This can be reproduced by adding unit tests to HiveLookupJoinITCase. {code:java} // create the hive table with columnar storage. tableEnv.executeSql( String.format( "create table columnar_table (x string) STORED AS PARQUET " + "tblproperties ('%s'='5min')", HiveOptions.LOOKUP_JOIN_CACHE_TTL.key())); @Test public void testLookupJoinTableWithColumnarStorage() throws Exception { // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch is 2048, we should // write as least 2048 records to the test table. List<Row> testData = new ArrayList<>(4096); for (int i = 0; i < 4096; i++) { testData.add(Row.of(String.valueOf(i))); } // constructs test data using values table TableEnvironment batchEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT); batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); batchEnv.useCatalog(hiveCatalog.getName()); String dataId = TestValuesTableFactory.registerData(testData); batchEnv.executeSql( String.format( "create table value_source(x string, p as proctime()) with (" + "'connector' = 'values', 'data-id' = '%s', 'bounded'='true')", dataId)); batchEnv.executeSql("insert overwrite columnar_table select x from value_source").await(); TableImpl flinkTable = (TableImpl) tableEnv.sqlQuery( "select t.x as x1, c.x as x2 from value_source t " + "left join columnar_table for system_time as of t.p c " + "on t.x = c.x where c.x is null"); List<Row> results = CollectionUtil.iteratorToList(flinkTable.execute().collect()); assertTrue(results.size() == 0); } {code} The problem may be caused by the following code. {code:java} RowData row; while ((row = partitionReader.read(reuse)) != null) { count++; RowData key = extractLookupKey(row); List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>()); rows.add(serializer.copy(row)); } {code} It can be fixed with the following modification {code:java} RowData row; while ((row = partitionReader.read(reuse)) != null) { count++; RowData rowData = serializer.copy(row); RowData key = extractLookupKey(rowData); List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>()); rows.add(rowData); } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)