[ 
https://issues.apache.org/jira/browse/HUDI-3634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-3634:
---------------------------------
    Labels: pull-request-available  (was: )

> Could read empty or partial HoodieCommitMetaData in downstream if using HDFS
> ----------------------------------------------------------------------------
>
>                 Key: HUDI-3634
>                 URL: https://issues.apache.org/jira/browse/HUDI-3634
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: timeline-server
>            Reporter: Hui An
>            Priority: Major
>              Labels: pull-request-available
>
> If we're using Incremental query to continues read the HUDI upstream, it 
> could miss some batches
> As we use Fsoutputstream to create an outputstream and then write the commit 
> data,
> {code:java}
> // HoodieActiveTimeline
>   private void createImmutableFileInPath(Path fullPath, Option<byte[]> 
> content) {
>     FSDataOutputStream fsout = null;
>     try {
>       fsout = metaClient.getFs().create(fullPath, false);
>       if (content.isPresent()) {
>         fsout.write(content.get());
>       }
>     } catch (IOException e) {
>       throw new HoodieIOException("Failed to create file " + fullPath, e);
>     } finally {
>       try {
>         if (null != fsout) {
>           fsout.close();
>         }
>       } catch (IOException e) {
>         throw new HoodieIOException("Failed to close file " + fullPath, e);
>       }
>     }
>   }
> {code}
> HDFS will first create an empty file and then return the outputstream, if at 
> this moment the data is not write yet, the downstream could read empty 
> metadata, as it cannot get fileId and locations from the metadata, it will 
> skip this commit and return an empty dataframe
> {code:java}
> // IncrementalRelation
>       for (commit <- commitsToReturn) {
>         // As this commit is empty, so HoodieCommitMetaData has nothing
>         val metadata: HoodieCommitMetadata = 
> HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
>           .get, classOf[HoodieCommitMetadata])
>         if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == 
> commit.getTimestamp) {
>           metaBootstrapFileIdToFullPath ++= 
> metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
>             replacedFile.contains(k) && v.startsWith(replacedFile(k))
>           }
>         } else {
>           regularFileIdToFullPath ++= 
> metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
>             replacedFile.contains(k) && v.startsWith(replacedFile(k))
>           }
>         }
>       }
> {code}
> This pr introduces a new configure to try to write the commit data to a temp 
> file, and after the write is done, then move the temp file back to the commit.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to