sivabalan narayanan created HUDI-3864:
-----------------------------------------

             Summary: Avoid fetching all files for all partitions on the 
read/query path for flink
                 Key: HUDI-3864
                 URL: https://issues.apache.org/jira/browse/HUDI-3864
             Project: Apache Hudi
          Issue Type: Task
          Components: flink
            Reporter: sivabalan narayanan


Fetching all files across all partitions should be avoided in hot path. 
especially on the query side. I inspected HoodieFileIndex for spark and things 
looks to be ok. We only load files for the partitions involved in the query. 

 
{code:java}
public BaseHoodieTableFileIndex(HoodieEngineContext engineContext,
                                HoodieTableMetaClient metaClient,
                                TypedProperties configProperties,
                                HoodieTableQueryType queryType,
                                List<Path> queryPaths, 

{code}
Querypaths in above argument contains only the partitions involved in the 
split. 

 

later when we load the files, we load only for the matched partitions. 

 
{code:java}
private Map<PartitionPath, FileStatus[]> loadPartitionPathFiles() {
  // List files in all partition paths
  List<PartitionPath> pathToFetch = new ArrayList<>();
  Map<PartitionPath, FileStatus[]> cachedPartitionToFiles = new HashMap<>();

  // Fetch from the FileStatusCache
  List<PartitionPath> partitionPaths = getAllQueryPartitionPaths();
  partitionPaths.forEach(partitionPath -> {
    Option<FileStatus[]> filesInPartition = 
fileStatusCache.get(partitionPath.fullPartitionPath(basePath));
    if (filesInPartition.isPresent()) {
      cachedPartitionToFiles.put(partitionPath, filesInPartition.get());
    } else {
      pathToFetch.add(partitionPath);
    }
  });

  Map<PartitionPath, FileStatus[]> fetchedPartitionToFiles;

  if (pathToFetch.isEmpty()) {
    fetchedPartitionToFiles = Collections.emptyMap();
  } else {
    Map<String, PartitionPath> fullPartitionPathsMapToFetch = 
pathToFetch.stream()
        .collect(Collectors.toMap(
            partitionPath -> 
partitionPath.fullPartitionPath(basePath).toString(),
            Function.identity())
        );

    fetchedPartitionToFiles =
        FSUtils.getFilesInPartitions(
                engineContext,
                metadataConfig,
                basePath,
                fullPartitionPathsMapToFetch.keySet().toArray(new String[0]),
                fileSystemStorageConfig.getSpillableDir())
            .entrySet()
            .stream()
            .collect(Collectors.toMap(e -> 
fullPartitionPathsMapToFetch.get(e.getKey()), e -> e.getValue()));

  }

  // Update the fileStatusCache
  fetchedPartitionToFiles.forEach((partitionPath, filesInPartition) -> {
    fileStatusCache.put(partitionPath.fullPartitionPath(basePath), 
filesInPartition);
  });

  return CollectionUtils.combine(cachedPartitionToFiles, 
fetchedPartitionToFiles);
} {code}
 

 

But I do see in flink, we are loading across all partitions. Lets try to see if 
this can be avoided. 

 

IncrementalInputSplits 
[L180|https://github.com/apache/hudi/blob/d16740976e3aa89f2d934b0f1c48208dfe40bc5f/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L180]

fileStatuses = fileIndex.getFilesInPartitions();

 

HoodieTableSource 
[L298|https://github.com/apache/hudi/blob/d16740976e3aa89f2d934b0f1c48208dfe40bc5f/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java#L298]

FileStatus[] fileStatuses = fileIndex.getFilesInPartitions();

 

I do see we pass in required partition paths in both places. But will leave it 
to flink experts to inspect the code once and close out the ticket if no action 
required. 

 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to