sivabalan narayanan created HUDI-3864: -----------------------------------------
Summary: Avoid fetching all files for all partitions on the read/query path for flink Key: HUDI-3864 URL: https://issues.apache.org/jira/browse/HUDI-3864 Project: Apache Hudi Issue Type: Task Components: flink Reporter: sivabalan narayanan Fetching all files across all partitions should be avoided in hot path. especially on the query side. I inspected HoodieFileIndex for spark and things looks to be ok. We only load files for the partitions involved in the query. {code:java} public BaseHoodieTableFileIndex(HoodieEngineContext engineContext, HoodieTableMetaClient metaClient, TypedProperties configProperties, HoodieTableQueryType queryType, List<Path> queryPaths, {code} Querypaths in above argument contains only the partitions involved in the split. later when we load the files, we load only for the matched partitions. {code:java} private Map<PartitionPath, FileStatus[]> loadPartitionPathFiles() { // List files in all partition paths List<PartitionPath> pathToFetch = new ArrayList<>(); Map<PartitionPath, FileStatus[]> cachedPartitionToFiles = new HashMap<>(); // Fetch from the FileStatusCache List<PartitionPath> partitionPaths = getAllQueryPartitionPaths(); partitionPaths.forEach(partitionPath -> { Option<FileStatus[]> filesInPartition = fileStatusCache.get(partitionPath.fullPartitionPath(basePath)); if (filesInPartition.isPresent()) { cachedPartitionToFiles.put(partitionPath, filesInPartition.get()); } else { pathToFetch.add(partitionPath); } }); Map<PartitionPath, FileStatus[]> fetchedPartitionToFiles; if (pathToFetch.isEmpty()) { fetchedPartitionToFiles = Collections.emptyMap(); } else { Map<String, PartitionPath> fullPartitionPathsMapToFetch = pathToFetch.stream() .collect(Collectors.toMap( partitionPath -> partitionPath.fullPartitionPath(basePath).toString(), Function.identity()) ); fetchedPartitionToFiles = FSUtils.getFilesInPartitions( engineContext, metadataConfig, basePath, fullPartitionPathsMapToFetch.keySet().toArray(new String[0]), fileSystemStorageConfig.getSpillableDir()) .entrySet() .stream() .collect(Collectors.toMap(e -> fullPartitionPathsMapToFetch.get(e.getKey()), e -> e.getValue())); } // Update the fileStatusCache fetchedPartitionToFiles.forEach((partitionPath, filesInPartition) -> { fileStatusCache.put(partitionPath.fullPartitionPath(basePath), filesInPartition); }); return CollectionUtils.combine(cachedPartitionToFiles, fetchedPartitionToFiles); } {code} But I do see in flink, we are loading across all partitions. Lets try to see if this can be avoided. IncrementalInputSplits [L180|https://github.com/apache/hudi/blob/d16740976e3aa89f2d934b0f1c48208dfe40bc5f/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L180] fileStatuses = fileIndex.getFilesInPartitions(); HoodieTableSource [L298|https://github.com/apache/hudi/blob/d16740976e3aa89f2d934b0f1c48208dfe40bc5f/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java#L298] FileStatus[] fileStatuses = fileIndex.getFilesInPartitions(); I do see we pass in required partition paths in both places. But will leave it to flink experts to inspect the code once and close out the ticket if no action required. -- This message was sent by Atlassian Jira (v8.20.1#820001)