cshuo commented on code in PR #18468:
URL: https://github.com/apache/hudi/pull/18468#discussion_r3043411448
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieRecordEmitter.java:
##########
@@ -18,21 +18,54 @@
package org.apache.hudi.source.reader;
+import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.Serializable;
+import java.text.ParseException;
/**
* Default Hoodie record emitter.
- * @param <T>
+ *
+ * <p>In addition to forwarding each record downstream, this emitter advances
the Flink
+ * watermark to the split's {@code latestCommit} epoch when the last record of
the split
+ * is processed. Downstream operators can rely on this watermark to trigger
time-based
+ * windows or gauge read progress in bounded Hudi pipelines.
+ *
+ * @param <T> record type
*/
public class HoodieRecordEmitter<T> implements
RecordEmitter<HoodieRecordWithPosition<T>, T, HoodieSourceSplit>, Serializable {
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieRecordEmitter.class);
@Override
public void emitRecord(HoodieRecordWithPosition<T> record, SourceOutput<T>
output, HoodieSourceSplit split) throws Exception {
output.collect(record.record());
split.updatePosition(record.fileOffset(), record.recordOffset());
+ if (record.isLastInSplit()) {
+ emitSplitWatermark(output, split);
Review Comment:
This seems risky. The PR emits Flink watermarks from
`split.getLatestCommit()`, but that timestamp is neither a true event-time
watermark nor a processing-time signal. It may conflict with pipelines that
derive watermarks from an event-time field in the records or from the
processing time in flink world. It would be better to make this an explicit
optional “commit-time watermark” contract, rather than emitting it by default.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]