[ https://issues.apache.org/jira/browse/HUDI-3634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hui An updated HUDI-3634: ------------------------- Description: If we're using Incremental query to continues read the HUDI upstream, it could miss some batches As we use Fsoutputstream to create an outputstream and then write the commit data, {code:java} // HoodieActiveTimeline private void createImmutableFileInPath(Path fullPath, Option<byte[]> content) { FSDataOutputStream fsout = null; try { fsout = metaClient.getFs().create(fullPath, false); if (content.isPresent()) { fsout.write(content.get()); } } catch (IOException e) { throw new HoodieIOException("Failed to create file " + fullPath, e); } finally { try { if (null != fsout) { fsout.close(); } } catch (IOException e) { throw new HoodieIOException("Failed to close file " + fullPath, e); } } } {code} HDFS will first create an empty file and then return the outputstream, if at this moment the data is not write yet, the downstream could read empty metadata, as it cannot get fileId and locations from the metadata, it will skip this commit and return an empty dataframe {code:java} // IncrementalRelation for (commit <- commitsToReturn) { // As this commit is empty, so HoodieCommitMetaData has nothing val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit) .get, classOf[HoodieCommitMetadata]) if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) { metaBootstrapFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) => replacedFile.contains(k) && v.startsWith(replacedFile(k)) } } else { regularFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) => replacedFile.contains(k) && v.startsWith(replacedFile(k)) } } } {code} This pr introduces a new configure to try to write the commit data to a temp file, and after the write is done, then move the temp file back to the commit. was: If we're using Incremental query to continues read the HUDI upstream, it could miss some batches As we using > Could read empty or partial HoodieCommitMetaData in downstream if using HDFS > ---------------------------------------------------------------------------- > > Key: HUDI-3634 > URL: https://issues.apache.org/jira/browse/HUDI-3634 > Project: Apache Hudi > Issue Type: Bug > Components: timeline-server > Reporter: Hui An > Priority: Major > > If we're using Incremental query to continues read the HUDI upstream, it > could miss some batches > As we use Fsoutputstream to create an outputstream and then write the commit > data, > {code:java} > // HoodieActiveTimeline > private void createImmutableFileInPath(Path fullPath, Option<byte[]> > content) { > FSDataOutputStream fsout = null; > try { > fsout = metaClient.getFs().create(fullPath, false); > if (content.isPresent()) { > fsout.write(content.get()); > } > } catch (IOException e) { > throw new HoodieIOException("Failed to create file " + fullPath, e); > } finally { > try { > if (null != fsout) { > fsout.close(); > } > } catch (IOException e) { > throw new HoodieIOException("Failed to close file " + fullPath, e); > } > } > } > {code} > HDFS will first create an empty file and then return the outputstream, if at > this moment the data is not write yet, the downstream could read empty > metadata, as it cannot get fileId and locations from the metadata, it will > skip this commit and return an empty dataframe > {code:java} > // IncrementalRelation > for (commit <- commitsToReturn) { > // As this commit is empty, so HoodieCommitMetaData has nothing > val metadata: HoodieCommitMetadata = > HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit) > .get, classOf[HoodieCommitMetadata]) > if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == > commit.getTimestamp) { > metaBootstrapFileIdToFullPath ++= > metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) => > replacedFile.contains(k) && v.startsWith(replacedFile(k)) > } > } else { > regularFileIdToFullPath ++= > metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) => > replacedFile.contains(k) && v.startsWith(replacedFile(k)) > } > } > } > {code} > This pr introduces a new configure to try to write the commit data to a temp > file, and after the write is done, then move the temp file back to the commit. -- This message was sent by Atlassian Jira (v8.20.1#820001)