[ https://issues.apache.org/jira/browse/HUDI-3864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
sivabalan narayanan reassigned HUDI-3864: ----------------------------------------- Assignee: Danny Chen > Avoid fetching all files for all partitions on the read/query path for flink > ---------------------------------------------------------------------------- > > Key: HUDI-3864 > URL: https://issues.apache.org/jira/browse/HUDI-3864 > Project: Apache Hudi > Issue Type: Task > Components: flink > Reporter: sivabalan narayanan > Assignee: Danny Chen > Priority: Major > > Fetching all files across all partitions should be avoided in hot path. > especially on the query side. I inspected HoodieFileIndex for spark and > things looks to be ok. We only load files for the partitions involved in the > query. > > {code:java} > public BaseHoodieTableFileIndex(HoodieEngineContext engineContext, > HoodieTableMetaClient metaClient, > TypedProperties configProperties, > HoodieTableQueryType queryType, > List<Path> queryPaths, > {code} > Querypaths in above argument contains only the partitions involved in the > split. > > later when we load the files, we load only for the matched partitions. > > {code:java} > private Map<PartitionPath, FileStatus[]> loadPartitionPathFiles() { > // List files in all partition paths > List<PartitionPath> pathToFetch = new ArrayList<>(); > Map<PartitionPath, FileStatus[]> cachedPartitionToFiles = new HashMap<>(); > // Fetch from the FileStatusCache > List<PartitionPath> partitionPaths = getAllQueryPartitionPaths(); > partitionPaths.forEach(partitionPath -> { > Option<FileStatus[]> filesInPartition = > fileStatusCache.get(partitionPath.fullPartitionPath(basePath)); > if (filesInPartition.isPresent()) { > cachedPartitionToFiles.put(partitionPath, filesInPartition.get()); > } else { > pathToFetch.add(partitionPath); > } > }); > Map<PartitionPath, FileStatus[]> fetchedPartitionToFiles; > if (pathToFetch.isEmpty()) { > fetchedPartitionToFiles = Collections.emptyMap(); > } else { > Map<String, PartitionPath> fullPartitionPathsMapToFetch = > pathToFetch.stream() > .collect(Collectors.toMap( > partitionPath -> > partitionPath.fullPartitionPath(basePath).toString(), > Function.identity()) > ); > fetchedPartitionToFiles = > FSUtils.getFilesInPartitions( > engineContext, > metadataConfig, > basePath, > fullPartitionPathsMapToFetch.keySet().toArray(new String[0]), > fileSystemStorageConfig.getSpillableDir()) > .entrySet() > .stream() > .collect(Collectors.toMap(e -> > fullPartitionPathsMapToFetch.get(e.getKey()), e -> e.getValue())); > } > // Update the fileStatusCache > fetchedPartitionToFiles.forEach((partitionPath, filesInPartition) -> { > fileStatusCache.put(partitionPath.fullPartitionPath(basePath), > filesInPartition); > }); > return CollectionUtils.combine(cachedPartitionToFiles, > fetchedPartitionToFiles); > } {code} > > > But I do see in flink, we are loading across all partitions. Lets try to see > if this can be avoided. > > IncrementalInputSplits > [L180|https://github.com/apache/hudi/blob/d16740976e3aa89f2d934b0f1c48208dfe40bc5f/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L180] > fileStatuses = fileIndex.getFilesInPartitions(); > > HoodieTableSource > [L298|https://github.com/apache/hudi/blob/d16740976e3aa89f2d934b0f1c48208dfe40bc5f/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java#L298] > FileStatus[] fileStatuses = fileIndex.getFilesInPartitions(); > > I do see we pass in required partition paths in both places. But will leave > it to flink experts to inspect the code once and close out the ticket if no > action required. > > -- This message was sent by Atlassian Jira (v8.20.1#820001)