the-other-tim-brown commented on code in PR #13699:
URL: https://github.com/apache/hudi/pull/13699#discussion_r2279379802


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -156,40 +215,53 @@ private void init(CompactionOperation operation, String 
partitionPath) {
       fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, 
newFilePath, hoodieTable.getStorage(),
           config, writeSchemaWithMetaFields, taskContextSupplier, recordType);
     } catch (IOException io) {
-      LOG.error("Error in update task at commit {}", instantTime, io);
       writeStatus.setGlobalError(io);
       throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle 
for FileId: " + fileId + " on commit "
           + instantTime + " on path " + 
hoodieTable.getMetaClient().getBasePath(), io);
     }
   }
 
+  @Override
+  protected void populateIncomingRecordsMap(Iterator<HoodieRecord<T>> 
newRecordsItr) {
+    // no op.
+  }
+
+  /**
+   * This is only for spark, the engine context fetched from a serialized 
hoodie table is always local,
+   * overrides it to spark specific reader context.
+   */
+  public void setReaderContext(HoodieReaderContext<T> readerContext) {
+    this.readerContext = readerContext;
+  }
+
   /**
    * Reads the file slice of a compaction operation using a file group reader,
    * by getting an iterator of the records; then writes the records to a new 
base file.
    */
   @Override
   public void doMerge() {
+    // For non-compaction operations, the merger needs to be initialized with 
the writer properties to handle cases like Merge-Into commands
+    if (operation.isEmpty()) {
+      this.readerContext.initRecordMergerForIngestion(config.getProps());
+    }
     boolean usePosition = 
config.getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
-    Option<InternalSchema> internalSchemaOption = 
SerDeHelper.fromJson(config.getInternalSchema());
-    TypedProperties props = TypedProperties.copy(config.getProps());
+    Option<InternalSchema> internalSchemaOption = 
SerDeHelper.fromJson(config.getInternalSchema())
+        .map(internalSchema -> 
AvroSchemaEvolutionUtils.reconcileSchema(writeSchemaWithMetaFields, 
internalSchema,
+            
config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS)));
     long maxMemoryPerCompaction = 
IOUtils.getMaxMemoryPerCompaction(taskContextSupplier, config);
     props.put(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), 
String.valueOf(maxMemoryPerCompaction));
-    Stream<HoodieLogFile> logFiles = 
operation.getDeltaFileNames().stream().map(logFileName ->
+    Option<Stream<HoodieLogFile>> logFilesStreamOpt = operation.map(op -> 
op.getDeltaFileNames().stream().map(logFileName ->
         new HoodieLogFile(new StoragePath(FSUtils.constructAbsolutePath(
-            config.getBasePath(), operation.getPartitionPath()), 
logFileName)));
+            config.getBasePath(), op.getPartitionPath()), logFileName))));
     // Initializes file group reader
-    try (HoodieFileGroupReader<T> fileGroupReader = 
HoodieFileGroupReader.<T>newBuilder().withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
-        
.withLatestCommitTime(maxInstantTime).withPartitionPath(partitionPath).withBaseFileOption(Option.ofNullable(baseFileToMerge)).withLogFiles(logFiles)
-        
.withDataSchema(writeSchemaWithMetaFields).withRequestedSchema(writeSchemaWithMetaFields).withInternalSchema(internalSchemaOption).withProps(props)
-        
.withShouldUseRecordPosition(usePosition).withSortOutput(hoodieTable.requireSortedRecords())
-        .withFileGroupUpdateCallback(cdcLogger.map(logger -> new 
CDCCallback(logger, 
readerContext))).withEnableOptimizedLogBlockScan(config.enableOptimizedLogBlocksScan()).build())
 {
+    try (HoodieFileGroupReader<T> fileGroupReader = 
getFileGroupReader(usePosition, internalSchemaOption, props, logFilesStreamOpt, 
incomingRecordsItr)) {
       // Reads the records from the file slice
       try (ClosableIterator<HoodieRecord<T>> recordIterator = 
fileGroupReader.getClosableHoodieRecordIterator()) {
         while (recordIterator.hasNext()) {
           HoodieRecord<T> record = recordIterator.next();
+          Option<Map<String, String>> recordMetadata = 
getRecordMetadata(record, writeSchema, props);

Review Comment:
   We're not right now. Will update to check for operation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to