[ https://issues.apache.org/jira/browse/HUDI-3634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated HUDI-3634: --------------------------------- Labels: pull-request-available (was: ) > Could read empty or partial HoodieCommitMetaData in downstream if using HDFS > ---------------------------------------------------------------------------- > > Key: HUDI-3634 > URL: https://issues.apache.org/jira/browse/HUDI-3634 > Project: Apache Hudi > Issue Type: Bug > Components: timeline-server > Reporter: Hui An > Priority: Major > Labels: pull-request-available > > If we're using Incremental query to continues read the HUDI upstream, it > could miss some batches > As we use Fsoutputstream to create an outputstream and then write the commit > data, > {code:java} > // HoodieActiveTimeline > private void createImmutableFileInPath(Path fullPath, Option<byte[]> > content) { > FSDataOutputStream fsout = null; > try { > fsout = metaClient.getFs().create(fullPath, false); > if (content.isPresent()) { > fsout.write(content.get()); > } > } catch (IOException e) { > throw new HoodieIOException("Failed to create file " + fullPath, e); > } finally { > try { > if (null != fsout) { > fsout.close(); > } > } catch (IOException e) { > throw new HoodieIOException("Failed to close file " + fullPath, e); > } > } > } > {code} > HDFS will first create an empty file and then return the outputstream, if at > this moment the data is not write yet, the downstream could read empty > metadata, as it cannot get fileId and locations from the metadata, it will > skip this commit and return an empty dataframe > {code:java} > // IncrementalRelation > for (commit <- commitsToReturn) { > // As this commit is empty, so HoodieCommitMetaData has nothing > val metadata: HoodieCommitMetadata = > HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit) > .get, classOf[HoodieCommitMetadata]) > if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == > commit.getTimestamp) { > metaBootstrapFileIdToFullPath ++= > metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) => > replacedFile.contains(k) && v.startsWith(replacedFile(k)) > } > } else { > regularFileIdToFullPath ++= > metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) => > replacedFile.contains(k) && v.startsWith(replacedFile(k)) > } > } > } > {code} > This pr introduces a new configure to try to write the commit data to a temp > file, and after the write is done, then move the temp file back to the commit. -- This message was sent by Atlassian Jira (v8.20.1#820001)