yihua commented on code in PR #18468:
URL: https://github.com/apache/hudi/pull/18468#discussion_r3036071993


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/BatchRecords.java:
##########
@@ -80,6 +80,8 @@ public String nextSplit() {
   public HoodieRecordWithPosition<T> nextRecordFromSplit() {
     if (recordIterator.hasNext()) {
       recordAndPosition.record(recordIterator.next());
+      // Mark the last record in the split so the emitter can emit a split-end 
watermark.
+      recordAndPosition.setLastInSplit(!recordIterator.hasNext());

Review Comment:
   🤖 If a split has zero records, `nextRecordFromSplit()` immediately falls 
through to the `else` branch and returns null — `setLastInSplit` is never 
called, so no watermark is emitted for that split. Could you add an explicit 
watermark emission in the empty-split path (the `else` branch, or in 
`nextSplit()` before advancing), so that downstream operators don't stall 
waiting for progress from an empty split?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieRecordEmitter.java:
##########
@@ -18,21 +18,54 @@
 
 package org.apache.hudi.source.reader;
 
+import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.text.ParseException;
 
 /**
  * Default Hoodie record emitter.
- * @param <T>
+ *
+ * <p>In addition to forwarding each record downstream, this emitter advances 
the Flink
+ * watermark to the split's {@code latestCommit} epoch when the last record of 
the split
+ * is processed.  Downstream operators can rely on this watermark to trigger 
time-based
+ * windows or gauge read progress in bounded Hudi pipelines.
+ *
+ * @param <T> record type
  */
 public class HoodieRecordEmitter<T> implements 
RecordEmitter<HoodieRecordWithPosition<T>, T, HoodieSourceSplit>, Serializable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieRecordEmitter.class);
 

Review Comment:
   🤖 Have you considered the ordering guarantee here? Flink's source reader can 
process multiple splits concurrently, and splits from different commits 
(different `latestCommit` values) may complete in non-chronological order. A 
split for an older instant finishing *after* a newer one would emit a lower 
watermark, which Flink silently drops — but the caller might not realise the 
per-split watermark guarantee is weaker than it appears. It might be worth 
documenting this limitation, or clamping to the running maximum watermark seen 
so far.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to