danny0405 commented on code in PR #5953:
URL: https://github.com/apache/hudi/pull/5953#discussion_r912974418
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java:
##########
@@ -302,6 +312,51 @@ private Stream<HoodieInstant>
maySkipCompaction(Stream<HoodieInstant> instants)
: instants;
}
+ private Stream<FileSlice> filterFileSliceWithValidFiles(FileSystem fs,
Stream<FileSlice> fileSlices) {
+ // we need to filter out the base file and log file that does not exist
+ return fileSlices.map(fileSlice -> {
+ List<HoodieLogFile> logFiles = fileSlice.getLogFiles()
+ .filter(logFile -> {
+ try {
+ return fs.exists(logFile.getPath());
+ } catch (IOException e) {
+ LOG.error("Checking exists of log file path: {} error",
logFile.getPath().toString());
+ throw new HoodieException(e);
+ }
+ }).collect(Collectors.toList());
+ return generateFileSlice(fileSlice.getPartitionPath(),
+ fileSlice.getBaseInstantTime(),
+ fileSlice.getFileId(),
+ fileSlice.getBaseFile().orElse(null),
+ logFiles);
+ }).filter(fileSlice -> {
+ // we should keep the file slice if any base/log file exists
+ if (fileSlice.getLatestLogFile().isPresent()) {
+ return true;
+ }
+ Option<String> basePath = fileSlice.getBaseFile().map(BaseFile::getPath);
+ try {
+ return basePath.isPresent() && fs.exists(new
org.apache.hadoop.fs.Path(basePath.get()));
+ } catch (IOException e) {
+ LOG.error("Checking exists of base path: {} error", basePath);
+ throw new HoodieException(e);
+ }
+ });
+ }
+
+ private FileSlice generateFileSlice(String partitionPath,
+ String baseInstant,
Review Comment:
We may need to consider some fallback mechanism like scan the storage
directly when we find any file that does not exist. Let's see the effect for
removing check totally first.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]