nsivabalan commented on code in PR #13517:
URL: https://github.com/apache/hudi/pull/13517#discussion_r2223621303
##########
hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java:
##########
@@ -96,6 +98,30 @@ public static String getPayloadClass(Properties props) {
return HoodieRecordPayload.getPayloadClassName(props);
}
+ /**
+ * Check if event time metadata should be tracked.
+ */
+ public static boolean shouldTrackEventTimeWaterMarkByConfig(TypedProperties
props) {
+ return props.getBoolean("hoodie.write.track.event.time.watermark", false);
+ }
+
+ /**
+ * Check if logical timestamp should be made consistent.
+ */
+ public static boolean shouldKeepConsistentLogicalTimestamp(TypedProperties
props) {
+ return Boolean.parseBoolean(props.getProperty(
+ KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
+ KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
+ }
+
+ /**
+ * Extract event_time field name from configuration.
+ */
+ @Nullable
+ public static String getEventTimeFieldName(TypedProperties props) {
Review Comment:
can we return Option<String> for this method
##########
hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java:
##########
@@ -96,6 +98,30 @@ public static String getPayloadClass(Properties props) {
return HoodieRecordPayload.getPayloadClassName(props);
}
+ /**
+ * Check if event time metadata should be tracked.
+ */
+ public static boolean shouldTrackEventTimeWaterMarkByConfig(TypedProperties
props) {
+ return props.getBoolean("hoodie.write.track.event.time.watermark", false);
Review Comment:
why not using the ConfigProperty directly
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java:
##########
@@ -133,6 +133,10 @@ public boolean canWrite(HoodieRecord record) {
@Override
protected void doWrite(HoodieRecord record, Schema schema, TypedProperties
props) {
Option<Map<String, String>> recordMetadata = record.getMetadata();
+ // Track event time metadata.
+ if (trackEventTimeWatermark && eventTimeFieldNameOpt.isPresent()) {
+ recordMetadata = appendEventTimeMetadata(record, recordMetadata);
Review Comment:
We should also fix HoodieRowCreateHandle.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java:
##########
@@ -292,7 +292,11 @@ private boolean writeRecord(HoodieRecord<T> newRecord,
Schema schema,
Properties prop,
boolean isDelete) {
- Option recordMetadata = newRecord.getMetadata();
+ Option<Map<String, String>> recordMetadata = newRecord.getMetadata();
+ // Track event time metadata.
+ if (trackEventTimeWatermark && eventTimeFieldNameOpt.isPresent()) {
+ recordMetadata = appendEventTimeMetadata(newRecord, recordMetadata);
Review Comment:
We should also fix FG reader based merge handle once the other patch is
landed (COW merges)
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -336,4 +352,51 @@ protected static Option<IndexedRecord>
toAvroRecord(HoodieRecord record, Schema
return Option.empty();
}
}
+
+ boolean shouldTrackEventTimeWaterMarkerByPayload(HoodieTableMetaClient
metaClient) {
Review Comment:
private
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -336,4 +352,51 @@ protected static Option<IndexedRecord>
toAvroRecord(HoodieRecord record, Schema
return Option.empty();
}
}
+
+ boolean shouldTrackEventTimeWaterMarkerByPayload(HoodieTableMetaClient
metaClient) {
+ // Only for event time ordering mode.
+ if (metaClient.getTableConfig().getRecordMergeMode()
+ != RecordMergeMode.EVENT_TIME_ORDERING) {
+ return false;
+ }
+
+ // Only when `DefaultHoodieRecordPayload` was the payload class.
Review Comment:
for v9 table, we should expect this to be part of write properties.
only incase of v8 tables, writers will go through payloads and we don't need
this fix at all.
we can simplify this.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -1422,6 +1422,10 @@ public boolean isSimpleBucketIndex() {
&&
HoodieIndex.BucketIndexEngineType.SIMPLE.equals(getBucketIndexEngineType());
}
+ public boolean trackEventTimeWatermark() {
Review Comment:
have you filed a follow up jira wrt things that we need to call out in
release docs.
for eg, we should let the users know that they are expected to set this
writer config after upgrading to v9 if they expect the event time water mark in
commit metadata.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]