the-other-tim-brown commented on code in PR #14037:
URL: https://github.com/apache/hudi/pull/14037#discussion_r2399141172


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java:
##########
@@ -102,11 +103,24 @@ public 
FileGroupReaderSchemaHandler(HoodieReaderContext<T> readerContext,
     this.requestedSchema = AvroSchemaCache.intern(requestedSchema);
     this.hoodieTableConfig = metaClient.getTableConfig();
     this.deleteContext = new DeleteContext(properties, tableSchema);
+    this.metaClient = metaClient;
+
+    boolean hasInstantRange = readerContext.getInstantRange().isPresent();
+    boolean shouldAddCompletionTimeField = !metaClient.isMetadataTable()
+        && metaClient.getTableConfig() != null && 
metaClient.getTableConfig().getTableVersion() != null

Review Comment:
   The table config and version should always be non-null so we can simplify 
this



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java:
##########
@@ -102,11 +103,24 @@ public 
FileGroupReaderSchemaHandler(HoodieReaderContext<T> readerContext,
     this.requestedSchema = AvroSchemaCache.intern(requestedSchema);
     this.hoodieTableConfig = metaClient.getTableConfig();
     this.deleteContext = new DeleteContext(properties, tableSchema);
+    this.metaClient = metaClient;
+
+    boolean hasInstantRange = readerContext.getInstantRange().isPresent();
+    boolean shouldAddCompletionTimeField = !metaClient.isMetadataTable()
+        && metaClient.getTableConfig() != null && 
metaClient.getTableConfig().getTableVersion() != null
+        && 
metaClient.getTableConfig().getTableVersion().greaterThanOrEquals(HoodieTableVersion.SIX)

Review Comment:
   The completion time is only available in version 8 and above if I remember 
correctly



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java:
##########
@@ -188,12 +242,24 @@ Schema generateRequiredSchema(DeleteContext 
deleteContext) {
     boolean hasInstantRange = readerContext.getInstantRange().isPresent();
     //might need to change this if other queries than mor have mandatory fields
     if (!readerContext.getHasLogFiles()) {
+      List<Schema.Field> addedFields = new ArrayList<>();
       if (hasInstantRange && !findNestedField(requestedSchema, 
HoodieRecord.COMMIT_TIME_METADATA_FIELD).isPresent()) {
-        List<Schema.Field> addedFields = new ArrayList<>();
         addedFields.add(getField(this.tableSchema, 
HoodieRecord.COMMIT_TIME_METADATA_FIELD));
-        return appendFieldsToSchemaDedupNested(requestedSchema, addedFields);
       }
-      return requestedSchema;
+      if (shouldAddCompletionTime && 
!findNestedField(requestedSchemaWithCompletionTime, 
HoodieRecord.COMMIT_COMPLETION_TIME_METADATA_FIELD).isPresent()) {

Review Comment:
   I think the expectation is that the completion time is a top level field, 
not nested, so we can simplify this



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##########
@@ -58,6 +58,7 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
   public static final String FILENAME_METADATA_FIELD = 
HoodieMetadataField.FILENAME_METADATA_FIELD.getFieldName();
   public static final String OPERATION_METADATA_FIELD = 
HoodieMetadataField.OPERATION_METADATA_FIELD.getFieldName();
   public static final String HOODIE_IS_DELETED_FIELD = "_hoodie_is_deleted";
+  public static final String COMMIT_COMPLETION_TIME_METADATA_FIELD = 
"_hoodie_commit_completion_time";

Review Comment:
   The metadata fields are typically persisted to the files. In this case it is 
just a field we add at query time so maybe we can come up with a better name. 
You called this a `virtual` field in the description so maybe something along 
those lines?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java:
##########
@@ -89,6 +87,9 @@ public class FileGroupReaderSchemaHandler<T> {
   protected final TypedProperties properties;
   private final DeleteContext deleteContext;
   private final HoodieTableMetaClient metaClient;
+  private final boolean shouldAddCompletionTime;
+  private final Map<String, String> commitTimeToCompletionTimeMap;
+  private final Schema requestedSchemaWithCompletionTime;

Review Comment:
   If the completion time is required, then the requested schema should be 
updated to include the completion time. I don't think we need a second instance 
variable.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java:
##########
@@ -324,4 +404,26 @@ private static Schema.Field getField(Schema schema, String 
fieldName) {
     }
     return foundFieldOpt.get();
   }
+
+  private Map<String, String> buildCompletionTimeMapping(HoodieTableMetaClient 
metaClient) {
+    return 
metaClient.getCommitsTimeline().filterCompletedInstants().getInstants().stream()

Review Comment:
   This does not account for instants that are no longer in the active 
timeline. We'll need to use the `CompletionTimeQueryView` for this



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java:
##########
@@ -141,12 +144,63 @@ public Option<InternalSchema> getInternalSchemaOpt() {
   }
 
   public Option<UnaryOperator<T>> getOutputConverter() {
-    if (!AvroSchemaUtils.areSchemasProjectionEquivalent(requiredSchema, 
requestedSchema)) {
-      return 
Option.of(readerContext.getRecordContext().projectRecord(requiredSchema, 
requestedSchema));
+    Schema targetSchema = shouldAddCompletionTime ? 
requestedSchemaWithCompletionTime : requestedSchema;
+    UnaryOperator<T> projectionConverter = null;
+    UnaryOperator<T> completionTimeConverter = null;
+    boolean schemasEquivalent = 
AvroSchemaUtils.areSchemasProjectionEquivalent(requiredSchema, targetSchema);
+    if (!schemasEquivalent) {
+      projectionConverter = 
readerContext.getRecordContext().projectRecord(requiredSchema, targetSchema);
+    }
+    if (shouldAddCompletionTime) {
+      completionTimeConverter = getCompletionTimeTransformer();
+    }
+    if (projectionConverter != null && completionTimeConverter != null) {
+      final UnaryOperator<T> finalProjectionConverter = projectionConverter;
+      final UnaryOperator<T> finalCompletionTimeConverter = 
completionTimeConverter;
+      UnaryOperator<T> composed = t -> 
finalCompletionTimeConverter.apply(finalProjectionConverter.apply(t));
+      return Option.of(composed);
+    } else if (projectionConverter != null) {
+      return Option.of(projectionConverter);
+    } else if (completionTimeConverter != null) {
+      return Option.of(completionTimeConverter);
     }
     return Option.empty();
   }
 
+  private UnaryOperator<T> getCompletionTimeTransformer() {
+    return record -> {
+      try {
+        Object commitTimeObj = readerContext.getRecordContext().getValue(
+            record,
+            requestedSchemaWithCompletionTime,
+            HoodieRecord.COMMIT_TIME_METADATA_FIELD
+        );
+        if (commitTimeObj == null) {
+          return record;
+        }
+        String commitTime = commitTimeObj.toString();
+        String completionTime = 
commitTimeToCompletionTimeMap.getOrDefault(commitTime, commitTime);
+        Schema.Field completionTimeField = 
requestedSchemaWithCompletionTime.getField(HoodieRecord.COMMIT_COMPLETION_TIME_METADATA_FIELD);
+        if (completionTimeField == null) {
+          return record;
+        }
+        int completionTimePos = completionTimeField.pos();
+        Object[] fieldValues = new 
Object[requestedSchemaWithCompletionTime.getFields().size()];

Review Comment:
   Is there a way to simply set the value in the existing object instead of 
creating a new one? At this point the record should have a null for the 
completion time based on my understanding of the code.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala:
##########
@@ -330,7 +338,64 @@ class 
HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV2(override val sqlCont
                                                             isBootstrap: 
Boolean,
                                                             rangeType: 
RangeType = RangeType.OPEN_CLOSED)
   extends HoodieMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap,
-    MergeOnReadIncrementalRelationV2(sqlContext, options, metaClient, 
schemaSpec, None, rangeType))
+    MergeOnReadIncrementalRelationV2(sqlContext, options, metaClient, 
schemaSpec, None, rangeType)) {
+
+  private lazy val extendedSchemaSpec: Option[StructType] = {
+    if (!metaClient.isMetadataTable) {
+      val baseSchema = schemaSpec.getOrElse(tableStructSchema)
+      Some(extendSchemaForCompletionTime(baseSchema))
+    } else {
+      schemaSpec
+    }
+  }
+
+  private lazy val incrementalFileIndexV2 = new HoodieIncrementalFileIndex(
+    sparkSession, metaClient, extendedSchemaSpec, options, fileStatusCache, 
true,
+    MergeOnReadIncrementalRelationV2(sqlContext, options, metaClient, 
extendedSchemaSpec, None, rangeType))
+
+  override def buildFileIndex(): HoodieFileIndex = incrementalFileIndexV2
+
+  override def buildDataSchema(): StructType = {
+    val baseSchema = super.buildDataSchema()
+    extendSchemaForCompletionTime(baseSchema)
+  }
+
+  override def buildFileFormat(): FileFormat = {
+    val tableConfig = metaClient.getTableConfig
+    val (finalAvroSchema, finalStructSchema) = if 
(!metaClient.isMetadataTable) {
+      (extendAvroSchemaForCompletionTime(tableAvroSchema), buildDataSchema())
+    } else {
+      (tableAvroSchema, buildDataSchema())
+    }
+
+    new HoodieFileGroupReaderBasedFileFormat(basePath.toString,
+      HoodieTableSchema(finalStructSchema, finalAvroSchema.toString, 
internalSchemaOpt),
+      tableConfig.getTableName, queryTimestamp.get, getMandatoryFields, isMOR, 
isBootstrap,
+      isIncremental, validCommits, shouldUseRecordPosition, getRequiredFilters,
+      tableConfig.isMultipleBaseFileFormatsEnabled, 
tableConfig.getBaseFileFormat)
+  }
+
+  private def extendSchemaForCompletionTime(baseSchema: StructType): 
StructType = {
+    val completionTimeField = 
StructField(HoodieRecord.COMMIT_COMPLETION_TIME_METADATA_FIELD, StringType, 
nullable = true)
+    StructType(baseSchema.fields :+ completionTimeField)
+  }
+
+  private def extendAvroSchemaForCompletionTime(baseAvroSchema: Schema): 
Schema = {

Review Comment:
   This repeats code in another section of this PR. Can we move this into a 
common utility?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala:
##########
@@ -386,8 +451,15 @@ abstract class 
HoodieBaseCopyOnWriteIncrementalHadoopFsRelationFactory(override
                                                                        
isBootstrap: Boolean)
   extends HoodieBaseHadoopFsRelationFactory(sqlContext, metaClient, options, 
schemaSpec, isBootstrap) {
 
-  override protected def getMandatoryFields(): Seq[String] = 
Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
-    orderingFields ++ partitionColumnsToRead
+  override protected def getMandatoryFields: Seq[String] = {
+    val baseMandatory = Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
+      orderingFields ++ partitionColumnsToRead
+    if (!metaClient.isMetadataTable && 
metaClient.getTableConfig.getTableVersion.greaterThanOrEquals(HoodieTableVersion.SIX))
 {

Review Comment:
   Should be version 8 here as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to