[
https://issues.apache.org/jira/browse/HUDI-3864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sagar Sumit updated HUDI-3864:
------------------------------
Fix Version/s: 0.12.1
(was: 0.12.0)
> Avoid fetching all files for all partitions on the read/query path for flink
> ----------------------------------------------------------------------------
>
> Key: HUDI-3864
> URL: https://issues.apache.org/jira/browse/HUDI-3864
> Project: Apache Hudi
> Issue Type: Task
> Components: flink
> Reporter: sivabalan narayanan
> Assignee: Danny Chen
> Priority: Major
> Fix For: 0.12.1
>
>
> Fetching all files across all partitions should be avoided in hot path.
> especially on the query side. we should only fetch files for interested
> partitions.
> I inspected HoodieFileIndex for spark and things looks to be ok. We only load
> files for the partitions involved in the query.
>
> {code:java}
> public BaseHoodieTableFileIndex(HoodieEngineContext engineContext,
> HoodieTableMetaClient metaClient,
> TypedProperties configProperties,
> HoodieTableQueryType queryType,
> List<Path> queryPaths,
> {code}
> Querypaths in above argument contains only the partitions involved in the
> split.
> later when we load the files, we load only for the matched partitions.
> {code:java}
> private Map<PartitionPath, FileStatus[]> loadPartitionPathFiles() {
> // List files in all partition paths
> List<PartitionPath> pathToFetch = new ArrayList<>();
> Map<PartitionPath, FileStatus[]> cachedPartitionToFiles = new HashMap<>();
> // Fetch from the FileStatusCache
> List<PartitionPath> partitionPaths = getAllQueryPartitionPaths();
> partitionPaths.forEach(partitionPath -> {
> Option<FileStatus[]> filesInPartition =
> fileStatusCache.get(partitionPath.fullPartitionPath(basePath));
> if (filesInPartition.isPresent()) {
> cachedPartitionToFiles.put(partitionPath, filesInPartition.get());
> } else {
> pathToFetch.add(partitionPath);
> }
> });
> Map<PartitionPath, FileStatus[]> fetchedPartitionToFiles;
> if (pathToFetch.isEmpty()) {
> fetchedPartitionToFiles = Collections.emptyMap();
> } else {
> Map<String, PartitionPath> fullPartitionPathsMapToFetch =
> pathToFetch.stream()
> .collect(Collectors.toMap(
> partitionPath ->
> partitionPath.fullPartitionPath(basePath).toString(),
> Function.identity())
> );
> fetchedPartitionToFiles =
> FSUtils.getFilesInPartitions(
> engineContext,
> metadataConfig,
> basePath,
> fullPartitionPathsMapToFetch.keySet().toArray(new String[0]),
> fileSystemStorageConfig.getSpillableDir())
> .entrySet()
> .stream()
> .collect(Collectors.toMap(e ->
> fullPartitionPathsMapToFetch.get(e.getKey()), e -> e.getValue()));
> }
> // Update the fileStatusCache
> fetchedPartitionToFiles.forEach((partitionPath, filesInPartition) -> {
> fileStatusCache.put(partitionPath.fullPartitionPath(basePath),
> filesInPartition);
> });
> return CollectionUtils.combine(cachedPartitionToFiles,
> fetchedPartitionToFiles);
> } {code}
>
> I also inspected flink and may we we are loading all files across all
> partitions.
>
> IncrementalInputSplits
> [L180|https://github.com/apache/hudi/blob/d16740976e3aa89f2d934b0f1c48208dfe40bc5f/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L180]
> fileStatuses = fileIndex.getFilesInPartitions();
>
> HoodieTableSource
> [L298|https://github.com/apache/hudi/blob/d16740976e3aa89f2d934b0f1c48208dfe40bc5f/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java#L298]
> FileStatus[] fileStatuses = fileIndex.getFilesInPartitions();
>
> I do see we pass in required partition paths in both places. But will leave
> it to flink experts to inspect the code once and close out the ticket if no
> action required.
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)