This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new fc41c22b9f36 [HUDI-9638] Infer table configs for new table creation in 
v9 (#13615)
fc41c22b9f36 is described below

commit fc41c22b9f360d0e08860f20a37a5ebf520c200c
Author: Lin Liu <[email protected]>
AuthorDate: Thu Aug 14 16:22:35 2025 -0700

    [HUDI-9638] Infer table configs for new table creation in v9 (#13615)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
    Co-authored-by: sivabalan <[email protected]>
---
 .../hudi/utils/HoodieWriterClientTestHarness.java  |   5 +-
 .../hudi/common/engine/HoodieReaderContext.java    |   7 +-
 .../hudi/common/model/AWSDmsAvroPayload.java       |   3 +-
 .../hudi/common/model/HoodieRecordPayload.java     |  29 ++-
 .../hudi/common/table/HoodieTableConfig.java       | 153 ++++++++++++-
 .../hudi/common/table/HoodieTableMetaClient.java   |  25 ++-
 .../table/read/FileGroupReaderSchemaHandler.java   |  24 ++-
 .../common/table/read/PartialUpdateStrategy.java   |   7 +-
 .../org/apache/hudi/common/util/ConfigUtils.java   |   4 +-
 .../common/table/read/SchemaHandlerTestBase.java   |   3 +-
 .../apache/hudi/common/util/TestConfigUtils.java   |  94 ++++----
 .../java/org/apache/hudi/util/StreamerUtil.java    |   2 +-
 .../apache/hudi/utils/TestFlinkWriteClients.java   |   4 +-
 .../hudi/common/table/TestHoodieTableConfig.java   | 236 ++++++++++++++++++++-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   3 +-
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |  37 +++-
 .../org/apache/hudi/TestHoodieWriterUtils.java     |  42 ++++
 .../hudi/command/MergeIntoHoodieTableCommand.scala |   2 +-
 .../apache/hudi/TestDecimalTypeDataWorkflow.scala  |   1 -
 .../hudi/functional/TestBufferedRecordMerger.java  |   2 +-
 .../hudi/functional/TestHoodieBackedMetadata.java  |  22 +-
 .../apache/hudi/functional/TestMORDataSource.scala |   6 +-
 .../functional/TestPayloadDeprecationFlow.scala    | 145 ++++++++++---
 .../hudi/functional/TestRecordLevelIndex.scala     |   2 +-
 .../others/TestMergeModeCommitTimeOrdering.scala   |   1 -
 .../others/TestMergeModeEventTimeOrdering.scala    |   1 -
 .../sql/hudi/procedure/TestRepairsProcedure.scala  |   1 -
 .../hudi/utilities/streamer/HoodieStreamer.java    |   2 +-
 .../deltastreamer/HoodieDeltaStreamerTestBase.java |   2 +-
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  11 +-
 30 files changed, 709 insertions(+), 167 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
index 9268f811a325..e534e126efae 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
@@ -27,7 +27,6 @@ import 
org.apache.hudi.client.transaction.lock.InProcessLockProvider;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.LockConfiguration;
-import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.EngineType;
@@ -554,11 +553,9 @@ public abstract class HoodieWriterClientTestHarness 
extends HoodieCommonTestHarn
         .getReaderContextFactoryForWrite(metaClient, 
HoodieRecord.HoodieRecordType.AVRO, writeConfig.getProps()).getContext();
     List<String> orderingFieldNames = getOrderingFieldNames(
         readerContext.getMergeMode(), writeClient.getConfig().getProps(), 
metaClient);
-    RecordMergeMode recordMergeMode = 
HoodieTableConfig.inferCorrectMergingBehavior(null, 
writeConfig.getPayloadClass(), null,
-        String.join(",", orderingFieldNames), 
metaClient.getTableConfig().getTableVersion()).getLeft();
     BufferedRecordMerger<HoodieRecord> recordMerger = 
BufferedRecordMergerFactory.create(
         readerContext,
-        recordMergeMode,
+        metaClient.getTableConfig().getRecordMergeMode(),
         false,
         Option.ofNullable(writeClient.getConfig().getRecordMerger()),
         orderingFieldNames,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index c498b836c396..5c57275496ac 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -55,6 +55,7 @@ import java.util.List;
 
 import static 
org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY;
 import static 
org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.inferMergingConfigsForPreV9Table;
 
 /**
  * An abstract reader context class for {@code HoodieFileGroupReader} to use, 
containing APIs for
@@ -270,12 +271,12 @@ public abstract class HoodieReaderContext<T> {
     HoodieTableVersion tableVersion = tableConfig.getTableVersion();
     // If the provided payload class differs from the table's payload class, 
we need to infer the correct merging behavior.
     if (isIngestion && providedPayloadClass.map(className -> 
!className.equals(tableConfig.getPayloadClass())).orElse(false)) {
-      Triple<RecordMergeMode, String, String> triple = 
HoodieTableConfig.inferCorrectMergingBehavior(null, providedPayloadClass.get(), 
null,
+      Triple<RecordMergeMode, String, String> triple = 
HoodieTableConfig.inferMergingConfigsForWrites(null, 
providedPayloadClass.get(), null,
           tableConfig.getPreCombineFieldsStr().orElse(null), tableVersion);
       recordMergeMode = triple.getLeft();
       mergeStrategyId = triple.getRight();
-    } else if (!tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
-      Triple<RecordMergeMode, String, String> triple = 
HoodieTableConfig.inferCorrectMergingBehavior(
+    } else if (tableVersion.lesserThan(HoodieTableVersion.EIGHT)) {
+      Triple<RecordMergeMode, String, String> triple = 
inferMergingConfigsForPreV9Table(
           recordMergeMode, tableConfig.getPayloadClass(),
           mergeStrategyId, tableConfig.getPreCombineFieldsStr().orElse(null), 
tableVersion);
       recordMergeMode = triple.getLeft();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java
index e58e2c5ec176..0bd06b20404a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java
@@ -45,6 +45,7 @@ import java.util.Properties;
 public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload {
 
   public static final String OP_FIELD = "Op";
+  public static final String DELETE_OPERATION_VALUE = "D";
 
   public AWSDmsAvroPayload(GenericRecord record, Comparable orderingVal) {
     super(record, orderingVal);
@@ -116,6 +117,6 @@ public class AWSDmsAvroPayload extends 
OverwriteWithLatestAvroPayload {
   }
 
   private static boolean isDMSDeleteRecord(GenericRecord record) {
-    return record.get(OP_FIELD) != null && 
record.get(OP_FIELD).toString().equalsIgnoreCase("D");
+    return record.get(OP_FIELD) != null && 
record.get(OP_FIELD).toString().equalsIgnoreCase(DELETE_OPERATION_VALUE);
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
index 4cc2a493df17..bd11d3a0d3fb 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
@@ -23,7 +23,7 @@ import org.apache.hudi.PublicAPIClass;
 import org.apache.hudi.PublicAPIMethod;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.RecordMergeMode;
-import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 
@@ -35,7 +35,10 @@ import java.io.Serializable;
 import java.util.Map;
 import java.util.Properties;
 
+import static 
org.apache.hudi.common.table.HoodieTableConfig.DEFAULT_PAYLOAD_CLASS_NAME;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
 
 /**
  * Every Hoodie table has an implementation of the 
<code>HoodieRecordPayload</code> This abstracts out callbacks which depend on 
record specific logic.
@@ -182,16 +185,34 @@ public interface HoodieRecordPayload<T extends 
HoodieRecordPayload> extends Seri
   }
 
   static String getPayloadClassName(Properties props) {
-    return 
getPayloadClassNameIfPresent(props).orElse(HoodieTableConfig.DEFAULT_PAYLOAD_CLASS_NAME);
+    Option<String> payloadOpt = getPayloadClassNameIfPresent(props);
+    if (payloadOpt.isPresent()) {
+      return payloadOpt.get();
+    }
+    // Note: starting from version 9, payload class is not necessary set, but
+    //       merge mode must exist. Therefore, we use merge mode to infer
+    //       the payload class for certain corner cases, like for MIT command.
+
+    if (ConfigUtils.containsConfigProperty(props, RECORD_MERGE_MODE)
+        && ConfigUtils.getStringWithAltKeys(props, RECORD_MERGE_MODE, "")
+        .equals(RecordMergeMode.COMMIT_TIME_ORDERING.name())) {
+      return OverwriteWithLatestAvroPayload.class.getName();
+    }
+    return DEFAULT_PAYLOAD_CLASS_NAME;
   }
 
+  // NOTE: PAYLOAD_CLASS_NAME is before LEGACY_PAYLOAD_CLASS_NAME to make sure
+  // some temporary payload class setting is respect.
   static Option<String> getPayloadClassNameIfPresent(Properties props) {
     String payloadClassName = null;
-    if (props.containsKey(PAYLOAD_CLASS_NAME.key())) {
-      payloadClassName = props.getProperty(PAYLOAD_CLASS_NAME.key());
+    if (ConfigUtils.containsConfigProperty(props, PAYLOAD_CLASS_NAME)) {
+      payloadClassName = ConfigUtils.getStringWithAltKeys(props, 
PAYLOAD_CLASS_NAME);
+    } else if (props.containsKey(LEGACY_PAYLOAD_CLASS_NAME.key())) {
+      payloadClassName = ConfigUtils.getStringWithAltKeys(props, 
LEGACY_PAYLOAD_CLASS_NAME);
     } else if (props.containsKey("hoodie.datasource.write.payload.class")) {
       payloadClassName = 
props.getProperty("hoodie.datasource.write.payload.class");
     }
+
     // There could be tables written with payload class from com.uber.hoodie.
     // Need to transparently change to org.apache.hudi.
     return Option.ofNullable(payloadClassName).map(className -> 
className.replace("com.uber.hoodie", "org.apache.hudi"));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 450f7df7eb3d..65a9b591fb39 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.OrderedProperties;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
 import org.apache.hudi.common.model.BootstrapIndexType;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.EventTimeAvroPayload;
@@ -37,7 +38,11 @@ import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieTimelineTimeZone;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
@@ -71,6 +76,7 @@ import java.lang.reflect.Modifier;
 import java.time.Instant;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -94,7 +100,12 @@ import static 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAM
 import static 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT;
 import static 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_TIMEZONE_FORMAT;
 import static 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD;
+import static 
org.apache.hudi.common.model.AWSDmsAvroPayload.DELETE_OPERATION_VALUE;
+import static org.apache.hudi.common.model.AWSDmsAvroPayload.OP_FIELD;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
 import static org.apache.hudi.common.util.ConfigUtils.fetchConfigs;
@@ -121,10 +132,34 @@ public class HoodieTableConfig extends HoodieConfig {
   public static final String PARTIAL_UPDATE_CUSTOM_MARKER = 
"hoodie.write.partial.update.custom.marker";
   public static final String DEBEZIUM_UNAVAILABLE_VALUE = 
"__debezium_unavailable_value";
   // This prefix is used to set merging related properties.
-  // A reader might need to read some merger properties to function as 
expected,
-  // and Hudi stores properties with this prefix so the reader parses these 
properties,
-  // and produces a map of key value pairs (Key1->Value1, Key2->Value2, ...) 
to use.
-  public static final String MERGE_PROPERTIES_PREFIX = 
"hoodie.table.merge.properties.";
+  // A reader might need to read some writer properties to function as 
expected,
+  // and Hudi stores properties with this prefix so the reader parses these 
properties to fetch any custom property.
+  public static final String RECORD_MERGE_PROPERTY_PREFIX = 
"hoodie.record.merge.property.";
+  public static final Set<String> PAYLOADS_UNDER_DEPRECATION = 
Collections.unmodifiableSet(
+      new HashSet<>(Arrays.asList(
+          AWSDmsAvroPayload.class.getName(),
+          DefaultHoodieRecordPayload.class.getName(),
+          EventTimeAvroPayload.class.getName(),
+          MySqlDebeziumAvroPayload.class.getName(),
+          OverwriteNonDefaultsWithLatestAvroPayload.class.getName(),
+          OverwriteWithLatestAvroPayload.class.getName(),
+          PartialUpdateAvroPayload.class.getName(),
+          PostgresDebeziumAvroPayload.class.getName())));
+
+  public static final Set<String> EVENT_TIME_ORDERING_PAYLOADS = 
Collections.unmodifiableSet(
+      new HashSet<>(Arrays.asList(
+          DefaultHoodieRecordPayload.class.getName(),
+          EventTimeAvroPayload.class.getName(),
+          MySqlDebeziumAvroPayload.class.getName(),
+          PartialUpdateAvroPayload.class.getName(),
+          PostgresDebeziumAvroPayload.class.getName())));
+
+  public static final Set<String> BUILTIN_MERGE_STRATEGIES = 
Collections.unmodifiableSet(
+      new HashSet<>(Arrays.asList(
+          COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
+          CUSTOM_MERGE_STRATEGY_UUID,
+          EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
+          PAYLOAD_BASED_MERGE_STRATEGY_UUID)));
 
   public static final ConfigProperty<String> DATABASE_NAME = ConfigProperty
       .key("hoodie.database.name")
@@ -230,6 +265,12 @@ public class HoodieTableConfig extends HoodieConfig {
       .withDocumentation("Payload class to use for performing merges, 
compactions, i.e merge delta logs with current base file and then "
           + " produce a new base file.");
 
+  public static final ConfigProperty<String> LEGACY_PAYLOAD_CLASS_NAME = 
ConfigProperty
+      .key("hoodie.table.legacy.payload.class")
+      .noDefaultValue()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Payload class to indicate the payload class that is 
used to create the table and is not used anymore.");
+
   // This is the default payload class used by Hudi 0.x releases (table 
version 6 and below)
   public static final String DEFAULT_PAYLOAD_CLASS_NAME = 
DefaultHoodieRecordPayload.class.getName();
 
@@ -801,19 +842,109 @@ public class HoodieTableConfig extends HoodieConfig {
     return HoodieRecordPayload.getPayloadClassName(this);
   }
 
+  public String getLegacyPayloadClass() {
+    return getStringOrDefault(LEGACY_PAYLOAD_CLASS_NAME, "");
+  }
+
   public String getRecordMergeStrategyId() {
     return getString(RECORD_MERGE_STRATEGY_ID);
   }
 
+  /**
+   * Handle table config creation logic when creating a table for Table 
Version 9,
+   * which is based on the logic of table version < 9, and then tuned for 
version 9 logic.
+   * This approach fits the same behavior of upgrade from 8 to 9.
+   */
+  static Map<String, String> 
inferMergingConfigsForV9TableCreation(RecordMergeMode recordMergeMode,
+                                                                   String 
payloadClassName,
+                                                                   String 
recordMergeStrategyId,
+                                                                   String 
orderingFieldName,
+                                                                   
HoodieTableVersion tableVersion) {
+    Map<String, String> reconciledConfigs = new HashMap<>();
+    if (tableVersion.lesserThan(HoodieTableVersion.NINE)) {
+      throw new HoodieIOException("Unsupported flow for table versions less 
than 9");
+    }
+
+    // Step 1: Infer merging configs based on input information.
+    // This step is important since it provides the same configs before we do 
table upgrade.
+    // Then additional logic for table version 9 could be verified.
+    Triple<RecordMergeMode, String, String> inferredConfigs = 
inferMergingConfigsForPreV9Table(
+        recordMergeMode, payloadClassName, recordMergeStrategyId, 
orderingFieldName, tableVersion);
+    recordMergeMode = inferredConfigs.getLeft();
+    recordMergeStrategyId = inferredConfigs.getRight();
+
+    // Step 2: Handle Version 9 specific logic.
+    // CASE 0: For tables with special merger properties, e.g., with 
non-builtin mergers.
+    // CASE 1: For tables using MERGE MODE, or CUSTOM builtin mergers.
+    //   NOTE: Payload class should NOT be set for these cases.
+    if (!BUILTIN_MERGE_STRATEGIES.contains(recordMergeStrategyId)
+        || StringUtils.isNullOrEmpty(payloadClassName)) {
+      reconciledConfigs.put(RECORD_MERGE_MODE.key(), recordMergeMode.name());
+      reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), 
recordMergeStrategyId);
+    } else {
+      // For tables using payload classes.
+      //   CASE 2: Custom payload class. We set these properties explicitly.
+      if (!PAYLOADS_UNDER_DEPRECATION.contains(payloadClassName)) {
+        reconciledConfigs.put(RECORD_MERGE_MODE.key(), CUSTOM.toString());
+        reconciledConfigs.put(PAYLOAD_CLASS_NAME.key(), payloadClassName);
+        reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), 
PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+      } else { // CASE 3: Payload classes are under deprecation.
+        // Standard merging configs.
+        // NOTE: We use LEGACY_PAYLOAD_CLASS_NAME instead of 
PAYLOAD_CLASS_NAME here.
+        if (EVENT_TIME_ORDERING_PAYLOADS.contains(payloadClassName)) {
+          reconciledConfigs.put(RECORD_MERGE_MODE.key(), 
EVENT_TIME_ORDERING.name());
+          reconciledConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(), 
payloadClassName);
+          reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
+        } else {
+          reconciledConfigs.put(RECORD_MERGE_MODE.key(), 
COMMIT_TIME_ORDERING.name());
+          reconciledConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(), 
payloadClassName);
+          reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
+        }
+        // Partial update mode config.
+        // Certain payloads are migrated to non payload way from 1.1 Hudi 
binary.
+        // Hence we need to set the right value for partial update mode for 
some of the cases.
+        if (payloadClassName.equals(PartialUpdateAvroPayload.class.getName())
+            || 
payloadClassName.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName()))
 {
+          reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(), 
PartialUpdateMode.IGNORE_DEFAULTS.name());
+        } else if 
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+          reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(), 
PartialUpdateMode.IGNORE_MARKERS.name());
+        }
+        // Additional custom merge properties.
+        // Cretain payloads are migrated to non payload way from 1.1 Hudi 
binary and the reader might need certain properties for the
+        // merge to function as expected. Handing such special cases here.
+        if 
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+          reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + 
PARTIAL_UPDATE_CUSTOM_MARKER, DEBEZIUM_UNAVAILABLE_VALUE);
+        } else if (payloadClassName.equals(AWSDmsAvroPayload.class.getName())) 
{
+          reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY, 
OP_FIELD);
+          reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER, 
DELETE_OPERATION_VALUE);
+        }
+      }
+    }
+    return reconciledConfigs;
+  }
+
+  /**
+   * To be invoked for table creation flows or writer flows.
+   * @return the merging configs to use.
+   */
+  public static Triple<RecordMergeMode, String, String> 
inferMergingConfigsForWrites(RecordMergeMode recordMergeMode,
+                                                                               
      String payloadClassName,
+                                                                               
      String recordMergeStrategyId,
+                                                                               
      String orderingFieldNamesAsString,
+                                                                               
      HoodieTableVersion tableVersion) {
+    return inferMergingConfigsForPreV9Table(recordMergeMode, payloadClassName, 
recordMergeStrategyId, orderingFieldNamesAsString, tableVersion);
+  }
+
   /**
    * Infers the merging behavior based on what the user sets (or doesn't set).
-   * Validates that the user has not set an illegal combination of configs
+   * Validates that the user has not set an illegal combination of configs.
+   * This function infers basic merging properties used by table version <= 8.
    */
-  public static Triple<RecordMergeMode, String, String> 
inferCorrectMergingBehavior(RecordMergeMode recordMergeMode,
-                                                                               
     String payloadClassName,
-                                                                               
     String recordMergeStrategyId,
-                                                                               
     String orderingFieldNamesAsString,
-                                                                               
     HoodieTableVersion tableVersion) {
+  public static Triple<RecordMergeMode, String, String> 
inferMergingConfigsForPreV9Table(RecordMergeMode recordMergeMode,
+                                                                               
          String payloadClassName,
+                                                                               
          String recordMergeStrategyId,
+                                                                               
          String orderingFieldNamesAsString,
+                                                                               
          HoodieTableVersion tableVersion) {
     RecordMergeMode inferredRecordMergeMode;
     String inferredPayloadClassName;
     String inferredRecordMergeStrategyId;
@@ -1204,7 +1335,7 @@ public class HoodieTableConfig extends HoodieConfig {
   }
 
   public Map<String, String> getTableMergeProperties() {
-    return ConfigUtils.extractWithPrefix(this.props, MERGE_PROPERTIES_PREFIX);
+    return ConfigUtils.extractWithPrefix(this.props, 
RECORD_MERGE_PROPERTY_PREFIX);
   }
 
   public Map<String, String> propsMap() {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 1b15c401103e..a96f3ad89405 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -90,6 +90,8 @@ import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
 import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_PATH;
 import static org.apache.hudi.common.table.HoodieTableConfig.VERSION;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.inferMergingConfigsForPreV9Table;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.inferMergingConfigsForV9TableCreation;
 import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
@@ -1480,13 +1482,22 @@ public class HoodieTableMetaClient implements 
Serializable {
       tableConfig.setTableVersion(tableVersion);
       tableConfig.setInitialVersion(tableVersion);
 
-      Triple<RecordMergeMode, String, String> mergeConfigs =
-          HoodieTableConfig.inferCorrectMergingBehavior(
-              recordMergeMode, payloadClassName, recordMergerStrategyId, 
preCombineFields,
-              tableVersion);
-      tableConfig.setValue(RECORD_MERGE_MODE, mergeConfigs.getLeft().name());
-      tableConfig.setValue(PAYLOAD_CLASS_NAME.key(), mergeConfigs.getMiddle());
-      tableConfig.setValue(RECORD_MERGE_STRATEGY_ID, mergeConfigs.getRight());
+      // For table version <= 8
+      if (tableVersion.lesserThan(HoodieTableVersion.NINE)) {
+        Triple<RecordMergeMode, String, String> mergeConfigs =
+            inferMergingConfigsForPreV9Table(
+                recordMergeMode, payloadClassName, recordMergerStrategyId, 
preCombineFields,
+                tableVersion);
+        tableConfig.setValue(RECORD_MERGE_MODE, mergeConfigs.getLeft().name());
+        tableConfig.setValue(PAYLOAD_CLASS_NAME.key(), 
mergeConfigs.getMiddle());
+        tableConfig.setValue(RECORD_MERGE_STRATEGY_ID, 
mergeConfigs.getRight());
+      } else { // For table version >= 9
+        Map<String, String> mergeConfigs = 
inferMergingConfigsForV9TableCreation(
+            recordMergeMode, payloadClassName, recordMergerStrategyId, 
preCombineFields, tableVersion);
+        for (Map.Entry<String, String> config : mergeConfigs.entrySet()) {
+          tableConfig.setValue(config.getKey(), config.getValue());
+        }
+      }
 
       if (null != tableCreateSchema) {
         tableConfig.setValue(HoodieTableConfig.CREATE_SCHEMA, 
tableCreateSchema);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
index 2404eb91fd0e..0414d9857a98 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import 
org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer;
 import org.apache.hudi.common.util.InternalSchemaCache;
 import org.apache.hudi.common.util.Option;
@@ -57,6 +58,7 @@ import java.util.stream.Stream;
 import static 
org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchemaDedupNested;
 import static 
org.apache.hudi.avro.AvroSchemaUtils.createNewSchemaFromFieldsWithReference;
 import static org.apache.hudi.avro.AvroSchemaUtils.findNestedField;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.inferMergingConfigsForPreV9Table;
 
 /**
  * This class is responsible for handling the schema for the file group reader.
@@ -206,14 +208,18 @@ public class FileGroupReaderSchemaHandler<T> {
                                                        boolean 
hasBuiltInDelete,
                                                        Option<Pair<String, 
String>> customDeleteMarkerKeyAndValue,
                                                        boolean 
hasInstantRange) {
-    Triple<RecordMergeMode, String, String> mergingConfigs = 
HoodieTableConfig.inferCorrectMergingBehavior(
-        cfg.getRecordMergeMode(),
-        cfg.getPayloadClass(),
-        cfg.getRecordMergeStrategyId(),
-        cfg.getPreCombineFieldsStr().orElse(null),
-        cfg.getTableVersion());
-
-    if (mergingConfigs.getLeft() == RecordMergeMode.CUSTOM) {
+    RecordMergeMode mergeMode = cfg.getRecordMergeMode();
+    if (cfg.getTableVersion().lesserThan(HoodieTableVersion.NINE)) {
+      Triple<RecordMergeMode, String, String> mergingConfigs = 
inferMergingConfigsForPreV9Table(
+          cfg.getRecordMergeMode(),
+          cfg.getPayloadClass(),
+          cfg.getRecordMergeStrategyId(),
+          cfg.getPreCombineFieldsStr().orElse(null),
+          cfg.getTableVersion());
+      mergeMode = mergingConfigs.getLeft();
+    }
+
+    if (mergeMode == RecordMergeMode.CUSTOM) {
       return recordMerger.get().getMandatoryFieldsForMerging(tableSchema, cfg, 
props);
     }
 
@@ -234,7 +240,7 @@ public class FileGroupReaderSchemaHandler<T> {
       }
     }
     // Add precombine field for event time ordering merge mode.
-    if (mergingConfigs.getLeft() == RecordMergeMode.EVENT_TIME_ORDERING) {
+    if (mergeMode == RecordMergeMode.EVENT_TIME_ORDERING) {
       List<String> preCombineFields = cfg.getPreCombineFields();
       requiredFields.addAll(preCombineFields);
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java
index 0c3d5c0a8a51..3a925cba9c58 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java
@@ -25,14 +25,15 @@ import org.apache.hudi.common.table.PartialUpdateMode;
 
 import org.apache.avro.Schema;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import static org.apache.hudi.avro.HoodieAvroUtils.toJavaDefaultValue;
 import static 
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS;
-import static 
org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES_PREFIX;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX;
 import static org.apache.hudi.common.util.ConfigUtils.extractWithPrefix;
 
 /**
@@ -41,7 +42,7 @@ import static 
org.apache.hudi.common.util.ConfigUtils.extractWithPrefix;
  * {@link BufferedRecordMergerFactory.CommitTimePartialRecordMerger} and
  * {@link BufferedRecordMergerFactory.EventTimePartialRecordMerger}.
  */
-public class PartialUpdateStrategy<T> {
+public class PartialUpdateStrategy<T> implements Serializable {
   private final RecordContext<T> recordContext;
   private final PartialUpdateMode partialUpdateMode;
   private final Map<String, String> mergeProperties;
@@ -178,6 +179,6 @@ public class PartialUpdateStrategy<T> {
   }
 
   static Map<String, String> parseMergeProperties(TypedProperties props) {
-    return extractWithPrefix(props, MERGE_PROPERTIES_PREFIX);
+    return extractWithPrefix(props, RECORD_MERGE_PROPERTY_PREFIX);
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
index 937748677b45..70cf9cb376b2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
@@ -253,11 +253,11 @@ public class ConfigUtils {
    * Whether the properties contain a config. If any of the key or alternative 
keys of the
    * {@link ConfigProperty} exists in the properties, this method returns 
{@code true}.
    *
-   * @param props          Configs in {@link TypedProperties}
+   * @param props          Configs in {@link Properties}
    * @param configProperty Config to look up.
    * @return {@code true} if exists; {@code false} otherwise.
    */
-  public static boolean containsConfigProperty(TypedProperties props,
+  public static boolean containsConfigProperty(Properties props,
                                                ConfigProperty<?> 
configProperty) {
     if (!props.containsKey(configProperty.key())) {
       for (String alternative : configProperty.getAlternatives()) {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
index bed30d3a2538..77d85dc0f5b3 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
@@ -257,6 +257,7 @@ public abstract class SchemaHandlerTestBase {
   private static void setupMORTable(RecordMergeMode mergeMode, boolean 
hasPrecombine, HoodieTableConfig hoodieTableConfig) {
     when(hoodieTableConfig.populateMetaFields()).thenReturn(true);
     when(hoodieTableConfig.getRecordMergeMode()).thenReturn(mergeMode);
+    
when(hoodieTableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
     if (hasPrecombine) {
       
when(hoodieTableConfig.getPreCombineFieldsStr()).thenReturn(Option.of("timestamp"));
       
when(hoodieTableConfig.getPreCombineFields()).thenReturn(Collections.singletonList("timestamp"));
@@ -266,8 +267,6 @@ public abstract class SchemaHandlerTestBase {
     }
     if (mergeMode == CUSTOM) {
       when(hoodieTableConfig.getRecordMergeStrategyId()).thenReturn("asdf");
-      // NOTE: in this test custom doesn't have any meta cols because it is 
more interesting of a test case
-      
when(hoodieTableConfig.getTableVersion()).thenReturn(HoodieTableVersion.EIGHT);
     }
   }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
index a287092c88c3..9075228c7802 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
@@ -38,7 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
 
-import static 
org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES_PREFIX;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -203,8 +203,8 @@ public class TestConfigUtils {
   @Test
   void testParseValidProperties() {
     TypedProperties props = new TypedProperties();
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "Ki", "Vi");
-    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
MERGE_PROPERTIES_PREFIX);
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "Ki", "Vi");
+    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
RECORD_MERGE_PROPERTY_PREFIX);
     assertEquals(1, result.size());
     assertEquals("Vi", result.get("Ki"));
   }
@@ -212,18 +212,18 @@ public class TestConfigUtils {
   @Test
   void testMissingKeyReturnsEmptyMap() {
     TypedProperties props = new TypedProperties(); // no property set
-    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
MERGE_PROPERTIES_PREFIX);
+    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
RECORD_MERGE_PROPERTY_PREFIX);
     assertTrue(result.isEmpty());
   }
 
   @Test
   void testMultipleValidProperties() {
     TypedProperties props = new TypedProperties();
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "key1", "value1");
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "key2", "value2");
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "key3", "value3");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "key1", "value1");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "key2", "value2");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "key3", "value3");
 
-    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
MERGE_PROPERTIES_PREFIX);
+    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
RECORD_MERGE_PROPERTY_PREFIX);
     assertEquals(3, result.size());
     assertEquals("value1", result.get("key1"));
     assertEquals("value2", result.get("key2"));
@@ -233,11 +233,11 @@ public class TestConfigUtils {
   @Test
   void testPropertiesWithDifferentPrefixes() {
     TypedProperties props = new TypedProperties();
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "mergeKey", "mergeValue");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "mergeKey", "mergeValue");
     props.setProperty("other.prefix.key", "otherValue");
-    props.setProperty("hoodie.merge.custom.property.prefix", 
"directPrefixValue");
+    props.setProperty("hoodie.merge.custom.property", "directPrefixValue");
 
-    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
MERGE_PROPERTIES_PREFIX);
+    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
RECORD_MERGE_PROPERTY_PREFIX);
     assertEquals(1, result.size());
     assertEquals("mergeValue", result.get("mergeKey"));
   }
@@ -245,9 +245,9 @@ public class TestConfigUtils {
   @Test
   void testPropertiesWithEmptyValues() {
     TypedProperties props = new TypedProperties();
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "emptyKey", "");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "emptyKey", "");
 
-    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
MERGE_PROPERTIES_PREFIX);
+    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
RECORD_MERGE_PROPERTY_PREFIX);
     assertEquals(1, result.size());
     assertEquals("", result.get("emptyKey"));
   }
@@ -255,11 +255,11 @@ public class TestConfigUtils {
   @Test
   void testPropertiesWithSpecialCharacters() {
     TypedProperties props = new TypedProperties();
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "key.with.dots", 
"value.with.dots");
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "key_with_underscores", 
"value_with_underscores");
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "key-with-dashes", 
"value-with-dashes");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "key.with.dots", 
"value.with.dots");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "key_with_underscores", 
"value_with_underscores");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "key-with-dashes", 
"value-with-dashes");
 
-    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
MERGE_PROPERTIES_PREFIX);
+    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
RECORD_MERGE_PROPERTY_PREFIX);
     assertEquals(3, result.size());
     assertEquals("value.with.dots", result.get("key.with.dots"));
     assertEquals("value_with_underscores", result.get("key_with_underscores"));
@@ -269,11 +269,11 @@ public class TestConfigUtils {
   @Test
   void testPropertiesWithNumericValues() {
     TypedProperties props = new TypedProperties();
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "intKey", "123");
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "doubleKey", "123.45");
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "booleanKey", "true");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "intKey", "123");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "doubleKey", "123.45");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "booleanKey", "true");
 
-    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
MERGE_PROPERTIES_PREFIX);
+    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
RECORD_MERGE_PROPERTY_PREFIX);
     assertEquals(3, result.size());
     assertEquals("123", result.get("intKey"));
     assertEquals("123.45", result.get("doubleKey"));
@@ -283,9 +283,9 @@ public class TestConfigUtils {
   @Test
   void testPropertiesWithWhitespace() {
     TypedProperties props = new TypedProperties();
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "  spacedKey  ", "  
spacedValue  ");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "  spacedKey  ", "  
spacedValue  ");
 
-    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
MERGE_PROPERTIES_PREFIX);
+    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
RECORD_MERGE_PROPERTY_PREFIX);
     assertEquals(1, result.size());
     assertEquals("spacedValue", result.get("spacedKey")); // Values should be 
trimmed
   }
@@ -293,10 +293,10 @@ public class TestConfigUtils {
   @Test
   void testPropertiesWithWhitespaceInKeysAndValues() {
     TypedProperties props = new TypedProperties();
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "  keyWithSpaces  ", "  
valueWithSpaces  ");
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "keyWithoutSpaces", 
"valueWithoutSpaces");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "  keyWithSpaces  ", "  
valueWithSpaces  ");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "keyWithoutSpaces", 
"valueWithoutSpaces");
 
-    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
MERGE_PROPERTIES_PREFIX);
+    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
RECORD_MERGE_PROPERTY_PREFIX);
     assertEquals(2, result.size());
     assertEquals("valueWithSpaces", result.get("keyWithSpaces")); // Both key 
and value should be trimmed
     assertEquals("valueWithoutSpaces", result.get("keyWithoutSpaces"));
@@ -305,28 +305,28 @@ public class TestConfigUtils {
   @Test
   void testPropertiesWithExactPrefixMatch() {
     TypedProperties props = new TypedProperties();
-    props.setProperty(MERGE_PROPERTIES_PREFIX, "exactPrefixValue");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX, "exactPrefixValue");
 
-    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
MERGE_PROPERTIES_PREFIX);
+    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
RECORD_MERGE_PROPERTY_PREFIX);
     assertEquals(0, result.size()); // Exact prefix match should not be 
included as it has no suffix
   }
 
   @Test
   void testPropertiesWithPrefixFollowedByDot() {
     TypedProperties props = new TypedProperties();
-    props.setProperty(MERGE_PROPERTIES_PREFIX, "valueAfterDot");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX, "valueAfterDot");
 
-    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
MERGE_PROPERTIES_PREFIX);
+    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
RECORD_MERGE_PROPERTY_PREFIX);
     assertEquals(0, result.size()); // Empty key after trimming should be 
filtered out
   }
 
   @Test
   void testPropertiesWithWhitespaceOnlyKeys() {
     TypedProperties props = new TypedProperties();
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "   ", 
"valueForWhitespaceKey");
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "  \t  \n  ", 
"valueForTabNewlineKey");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "   ", 
"valueForWhitespaceKey");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "  \t  \n  ", 
"valueForTabNewlineKey");
 
-    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
MERGE_PROPERTIES_PREFIX);
+    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
RECORD_MERGE_PROPERTY_PREFIX);
     assertEquals(0, result.size());
   }
 
@@ -334,21 +334,21 @@ public class TestConfigUtils {
   void testPropertiesWithNullKeys() {
     TypedProperties props = new TypedProperties();
     // Note: TypedProperties doesn't allow null keys, but we test the edge case
-    props.setProperty(MERGE_PROPERTIES_PREFIX, "valueForNullKey");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX, "valueForNullKey");
 
-    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
MERGE_PROPERTIES_PREFIX);
+    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
RECORD_MERGE_PROPERTY_PREFIX);
     assertEquals(0, result.size()); // Empty key should be filtered out
   }
 
   @Test
   void testPropertiesWithMixedValidAndInvalidKeys() {
     TypedProperties props = new TypedProperties();
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "validKey", "validValue");
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "   ", "invalidValue1");
-    props.setProperty(MERGE_PROPERTIES_PREFIX, "invalidValue2");
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "anotherValidKey", 
"anotherValidValue");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "validKey", "validValue");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "   ", "invalidValue1");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX, "invalidValue2");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "anotherValidKey", 
"anotherValidValue");
 
-    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
MERGE_PROPERTIES_PREFIX);
+    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
RECORD_MERGE_PROPERTY_PREFIX);
     assertEquals(2, result.size()); // Only valid keys should be included
     assertEquals("validValue", result.get("validKey"));
     assertEquals("anotherValidValue", result.get("anotherValidKey"));
@@ -357,10 +357,10 @@ public class TestConfigUtils {
   @Test
   void testPropertiesWithCaseSensitivity() {
     TypedProperties props = new TypedProperties();
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "Key1", "Value1");
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "key1", "value1");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "Key1", "Value1");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "key1", "value1");
 
-    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
MERGE_PROPERTIES_PREFIX);
+    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
RECORD_MERGE_PROPERTY_PREFIX);
     assertEquals(2, result.size());
     assertEquals("Value1", result.get("Key1"));
     assertEquals("value1", result.get("key1"));
@@ -369,10 +369,10 @@ public class TestConfigUtils {
   @Test
   void testPropertiesWithLeadingAndTrailingWhitespace() {
     TypedProperties props = new TypedProperties();
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "  leadingSpaceKey", 
"trailingSpaceValue  ");
-    props.setProperty(MERGE_PROPERTIES_PREFIX + "trailingSpaceKey  ", "  
leadingSpaceValue");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "  leadingSpaceKey", 
"trailingSpaceValue  ");
+    props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "trailingSpaceKey  ", "  
leadingSpaceValue");
 
-    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
MERGE_PROPERTIES_PREFIX);
+    Map<String, String> result = ConfigUtils.extractWithPrefix(props, 
RECORD_MERGE_PROPERTY_PREFIX);
     assertEquals(2, result.size());
     assertEquals("trailingSpaceValue", result.get("leadingSpaceKey")); // 
Trimmed
     assertEquals("leadingSpaceValue", result.get("trailingSpaceKey")); // 
Trimmed
@@ -380,7 +380,7 @@ public class TestConfigUtils {
 
   @Test
   void testNullProperties() {
-    Map<String, String> result = ConfigUtils.extractWithPrefix(null, 
MERGE_PROPERTIES_PREFIX);
+    Map<String, String> result = ConfigUtils.extractWithPrefix(null, 
RECORD_MERGE_PROPERTY_PREFIX);
     assertTrue(result.isEmpty());
   }
 }
\ No newline at end of file
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 0369d0e16803..3a3015e1f2a4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -403,7 +403,7 @@ public class StreamerUtil {
    * @return The correct merging behaviour: <merge_mode, payload_class, 
merge_strategy_id>
    */
   public static Triple<RecordMergeMode, String, String> 
inferMergingBehavior(Configuration conf) {
-    return HoodieTableConfig.inferCorrectMergingBehavior(
+    return HoodieTableConfig.inferMergingConfigsForWrites(
         getMergeMode(conf), getPayloadClass(conf), getMergeStrategyId(conf), 
OptionsResolver.getPreCombineField(conf), HoodieTableVersion.EIGHT);
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
index 7d8b39bd5942..4ef8480ad5fc 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
@@ -147,8 +147,8 @@ public class TestFlinkWriteClients {
     HoodieTableMetaClient metaClient = StreamerUtil.initTableIfNotExists(conf);
     HoodieTableConfig tableConfig = metaClient.getTableConfig();
 
-    assertThat(tableConfig.getRecordMergeMode(), is(RecordMergeMode.CUSTOM));
-    assertThat(tableConfig.getRecordMergeStrategyId(), 
is(HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID));
+    assertThat(tableConfig.getRecordMergeMode(), 
is(RecordMergeMode.EVENT_TIME_ORDERING));
+    assertThat(tableConfig.getRecordMergeStrategyId(), 
is(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID));
     assertThat(tableConfig.getPayloadClass(), 
is(PartialUpdateAvroPayload.class.getName()));
 
     HoodieWriteConfig writeConfig = 
FlinkWriteClients.getHoodieClientConfig(conf, false, false);
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
index 8b37c89a3a6c..cfe9d2835ea7 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
@@ -21,8 +21,14 @@ package org.apache.hudi.common.table;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.CollectionUtils;
@@ -61,12 +67,19 @@ import java.util.stream.Stream;
 import static 
org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING;
 import static org.apache.hudi.common.config.RecordMergeMode.CUSTOM;
 import static 
org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.getRecordMergeStrategyId;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
 import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
 import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.inferMergingConfigsForPreV9Table;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.inferRecordMergeModeFromPayloadClass;
 import static org.apache.hudi.common.util.ConfigUtils.recoverIfNeeded;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -298,7 +311,7 @@ class TestHoodieTableConfig extends HoodieCommonTestHarness 
{
   @Test
   void testDefinedTableConfigs() {
     List<ConfigProperty<?>> configProperties = 
HoodieTableConfig.definedTableConfigs();
-    assertEquals(40, configProperties.size());
+    assertEquals(41, configProperties.size());
     configProperties.forEach(c -> {
       assertNotNull(c);
       assertFalse(c.doc().isEmpty());
@@ -318,8 +331,8 @@ class TestHoodieTableConfig extends HoodieCommonTestHarness 
{
     Properties props = new Properties();
     props.setProperty(HoodieTableConfig.NAME.key(), "test-table");
     // no merge props
-    props.setProperty(HoodieTableConfig.MERGE_PROPERTIES_PREFIX + "key1", 
"value1");
-    props.setProperty(HoodieTableConfig.MERGE_PROPERTIES_PREFIX + "key2", 
"value2");
+    props.setProperty(HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + "key1", 
"value1");
+    props.setProperty(HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + "key2", 
"value2");
     // add some random property which does not match the prefix.
     props.setProperty("key3", "value3");
 
@@ -331,7 +344,7 @@ class TestHoodieTableConfig extends HoodieCommonTestHarness 
{
     assertEquals(expectedProps, config.getTableMergeProperties());
   }
 
-  private static Stream<Arguments> argumentsForInferringRecordMergeMode() {
+  private static Stream<Arguments> testInferMergingConfigsForPreV9Table() {
     String defaultPayload = DefaultHoodieRecordPayload.class.getName();
     String overwritePayload = OverwriteWithLatestAvroPayload.class.getName();
     String customPayload = "custom_payload";
@@ -475,11 +488,11 @@ class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
   }
 
   @ParameterizedTest
-  @MethodSource("argumentsForInferringRecordMergeMode")
-  void testInferMergeMode(RecordMergeMode inputMergeMode, String 
inputPayloadClass,
-                                 String inputMergeStrategy, String 
orderingFieldName,
-                                 String shouldThrowString, RecordMergeMode 
outputMergeMode,
-                                 String outputPayloadClass, String 
outputMergeStrategy) throws IOException {
+  @MethodSource
+  void testInferMergingConfigsForPreV9Table(RecordMergeMode inputMergeMode, 
String inputPayloadClass,
+                                            String inputMergeStrategy, String 
orderingFieldName,
+                                            String shouldThrowString, 
RecordMergeMode outputMergeMode,
+                                            String outputPayloadClass, String 
outputMergeStrategy) throws IOException {
     Arrays.stream(new HoodieTableVersion[] {HoodieTableVersion.EIGHT, 
HoodieTableVersion.SIX})
         .forEach(tableVersion -> {
           boolean shouldThrow = "eight-only".equals(shouldThrowString)
@@ -496,12 +509,12 @@ class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
           }
           if (shouldThrow) {
             assertThrows(IllegalArgumentException.class,
-                () -> HoodieTableConfig.inferCorrectMergingBehavior(
+                () -> inferMergingConfigsForPreV9Table(
                     inputMergeMode, inputPayloadClass, inputMergeStrategy, 
orderingFieldName,
                     tableVersion));
           } else {
             Triple<RecordMergeMode, String, String> inferredConfigs =
-                HoodieTableConfig.inferCorrectMergingBehavior(
+                inferMergingConfigsForPreV9Table(
                     inputMergeMode, inputPayloadClass, inputMergeStrategy, 
orderingFieldName,
                     tableVersion);
             assertEquals(expectedMergeMode, inferredConfigs.getLeft());
@@ -510,4 +523,205 @@ class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
           }
         });
   }
+
+  private static Stream<Arguments> testInferMergingConfigsForV9TableCreation() 
{
+    return Stream.of(
+        // Test case: Non-version 9 table should return empty configs
+        arguments(
+            "Non-version 9 table", EVENT_TIME_ORDERING, null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", HoodieTableVersion.EIGHT,
+            0, null, null, null, null, null, null, null, null, null),
+
+        // Test case: Version 9 table with null payload class and event time 
ordering
+        arguments("Version 9 with event time ordering", EVENT_TIME_ORDERING, 
null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", HoodieTableVersion.NINE,
+            2, EVENT_TIME_ORDERING.name(), null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null, null, null, null, null),
+
+        // Test case: Version 9 table with null payload class and commit time 
ordering
+        arguments("Version 9 with commit time ordering", COMMIT_TIME_ORDERING, 
null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null, HoodieTableVersion.NINE,
+            2, COMMIT_TIME_ORDERING.name(), null, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null, null, null, null, null),
+
+        // Test case: Version 9 table with null payload class and custom merge 
mode
+        arguments("Version 9 with custom merge mode", CUSTOM, null, 
CUSTOM_MERGE_STRATEGY_UUID, null, HoodieTableVersion.NINE,
+            2, CUSTOM.name(), null, CUSTOM_MERGE_STRATEGY_UUID, null, null, 
null, null, null),
+
+        // Test case: Version 9 table with custom payload class (not under 
deprecation)
+        arguments("Version 9 with custom payload", null, 
"com.example.CustomPayload", null, null, HoodieTableVersion.NINE,
+            3, CUSTOM.name(), "com.example.CustomPayload", 
PAYLOAD_BASED_MERGE_STRATEGY_UUID, null, null, null, null, null),
+
+        // Test case: Version 9 table with event time based payload 
(DefaultHoodieRecordPayload)
+        arguments("Version 9 with DefaultHoodieRecordPayload", null, 
DefaultHoodieRecordPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
+            3, EVENT_TIME_ORDERING.name(), null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
DefaultHoodieRecordPayload.class.getName(), null, null, null, null),
+
+        // Test case: Version 9 table with commit time based payload 
(OverwriteWithLatestAvroPayload)
+        arguments("Version 9 with OverwriteWithLatestAvroPayload", null, 
OverwriteWithLatestAvroPayload.class.getName(), null, null, 
HoodieTableVersion.NINE,
+            3, COMMIT_TIME_ORDERING.name(), null, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, 
OverwriteWithLatestAvroPayload.class.getName(), null, null, null, null),
+
+        // Test case: Version 9 table with PartialUpdateAvroPayload (should 
set partial update mode)
+        arguments("Version 9 with PartialUpdateAvroPayload", null, 
PartialUpdateAvroPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
+            4, EVENT_TIME_ORDERING.name(), null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, PartialUpdateAvroPayload.class.getName(), 
PartialUpdateMode.IGNORE_DEFAULTS.name(),
+            null, null, null),
+
+        // Test case: Version 9 table with 
OverwriteNonDefaultsWithLatestAvroPayload (should set partial update mode)
+        arguments("Version 9 with OverwriteNonDefaultsWithLatestAvroPayload", 
null, OverwriteNonDefaultsWithLatestAvroPayload.class.getName(), null, null, 
HoodieTableVersion.NINE,
+            4, COMMIT_TIME_ORDERING.name(), null, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, 
OverwriteNonDefaultsWithLatestAvroPayload.class.getName(),
+            PartialUpdateMode.IGNORE_DEFAULTS.name(), null, null, null),
+
+        // Test case: Version 9 table with PostgresDebeziumAvroPayload (should 
set partial update mode and custom properties)
+        arguments("Version 9 with PostgresDebeziumAvroPayload", null, 
PostgresDebeziumAvroPayload.class.getName(), null, "ts", 
HoodieTableVersion.NINE,
+            5, EVENT_TIME_ORDERING.name(), null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
PostgresDebeziumAvroPayload.class.getName(),
+            PartialUpdateMode.IGNORE_MARKERS.name(), 
HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE, null, null),
+
+        // Test case: Version 9 table with AWSDmsAvroPayload (should set 
custom delete properties)
+        arguments("Version 9 with AWSDmsAvroPayload", null, 
AWSDmsAvroPayload.class.getName(), null, null, HoodieTableVersion.NINE,
+            5, COMMIT_TIME_ORDERING.name(), null, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, AWSDmsAvroPayload.class.getName(), null, 
null, "Op", "D"),
+
+        // Test case: Version 9 table with EventTimeAvroPayload (event time 
based payload)
+        arguments("Version 9 with EventTimeAvroPayload", null, 
EventTimeAvroPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
+            3, EVENT_TIME_ORDERING.name(), null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, EventTimeAvroPayload.class.getName(), 
null, null, null, null),
+
+        // Test case: Version 9 table with MySqlDebeziumAvroPayload (event 
time based payload)
+        arguments("Version 9 with MySqlDebeziumAvroPayload", null, 
MySqlDebeziumAvroPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
+            3, EVENT_TIME_ORDERING.name(), null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, MySqlDebeziumAvroPayload.class.getName(), 
null, null, null, null)
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource
+  void testInferMergingConfigsForV9TableCreation(String testName, 
RecordMergeMode recordMergeMode, String payloadClassName,
+                                                 String recordMergeStrategyId, 
String orderingFieldName, HoodieTableVersion tableVersion,
+                                                 int expectedConfigSize, 
String expectedMergeMode, String expectedPayloadClass,
+                                                 String 
expectedMergeStrategyId, String expectedLegacyPayloadClass,
+                                                 String 
expectedPartialUpdateMode, String expectedDebeziumMarker,
+                                                 String expectedDeleteKey, 
String expectedDeleteMarker) {
+    if (tableVersion.lesserThan(HoodieTableVersion.NINE)) {
+      
assertExceptionWithInferMergingConfigsForV9TableCreation(recordMergeMode, 
payloadClassName, recordMergeStrategyId, orderingFieldName, tableVersion);
+    } else {
+      Map<String, String> configs = 
HoodieTableConfig.inferMergingConfigsForV9TableCreation(
+          recordMergeMode, payloadClassName, recordMergeStrategyId, 
orderingFieldName, tableVersion);
+
+      assertEquals(expectedConfigSize, configs.size(), "Config size mismatch 
for: " + testName);
+      if (expectedMergeMode != null) {
+        assertEquals(expectedMergeMode, configs.get(RECORD_MERGE_MODE.key()),
+            "Merge mode mismatch for: " + testName);
+      } else {
+        assertFalse(configs.containsKey(RECORD_MERGE_MODE.key()), "Record 
merge mode not expected to be set");
+      }
+      if (expectedPayloadClass != null) {
+        assertEquals(expectedPayloadClass, 
configs.get(PAYLOAD_CLASS_NAME.key()),
+            "Payload class mismatch for: " + testName);
+      } else {
+        assertFalse(configs.containsKey(PAYLOAD_CLASS_NAME.key()), "Payload 
class not expected to be set");
+      }
+      if (expectedMergeStrategyId != null) {
+        assertEquals(expectedMergeStrategyId, 
configs.get(RECORD_MERGE_STRATEGY_ID.key()),
+            "Merge strategy ID mismatch for: " + testName);
+      } else {
+        assertFalse(configs.containsKey(RECORD_MERGE_STRATEGY_ID.key()), 
"Record merge strategy id not expected to be set");
+      }
+
+      if (expectedLegacyPayloadClass != null) {
+        assertEquals(expectedLegacyPayloadClass, 
configs.get(LEGACY_PAYLOAD_CLASS_NAME.key()),
+            "Legacy payload class mismatch for: " + testName);
+      } else {
+        assertFalse(configs.containsKey(LEGACY_PAYLOAD_CLASS_NAME.key()), 
"Legacy payload class not expected to be set");
+      }
+
+      if (expectedPartialUpdateMode != null) {
+        assertEquals(expectedPartialUpdateMode, 
configs.get(HoodieTableConfig.PARTIAL_UPDATE_MODE.key()),
+            "Partial update mode mismatch for: " + testName);
+      } else {
+        
assertFalse(configs.containsKey(HoodieTableConfig.PARTIAL_UPDATE_MODE.key()), 
HoodieTableConfig.PARTIAL_UPDATE_MODE.key() + " not expected to be set");
+      }
+
+      if (expectedDebeziumMarker != null) {
+        assertEquals(expectedDebeziumMarker, configs.get(
+                HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER),
+            "Debezium marker mismatch for: " + testName);
+      } else {
+        
assertFalse(configs.containsKey(HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX 
+ HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER),
+            "Custom merge property " + 
HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER + " not expected to be set");
+      }
+
+      if (expectedDeleteKey != null) {
+        assertEquals(expectedDeleteKey, 
configs.get(HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY),
+            "Delete key mismatch for: " + testName);
+      } else {
+        
assertFalse(configs.containsKey(HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX 
+ DELETE_KEY),
+            "Custom merge property " + 
HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + "  not expected 
to be set");
+      }
+
+      if (expectedDeleteMarker != null) {
+        assertEquals(expectedDeleteMarker, 
configs.get(HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER),
+            "Delete marker mismatch for: " + testName);
+      } else {
+        
assertFalse(configs.containsKey(HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX 
+ DELETE_MARKER),
+            "Custom merge property " + 
HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "  not 
expected to be set");
+      }
+    }
+  }
+
+  @Test
+  void testInferMergingConfigsForVersion9WithInconsistentConfigs() {
+    // Test case: Inconsistent merge mode and strategy should throw exception
+    assertThrows(IllegalArgumentException.class, () -> {
+      HoodieTableConfig.inferMergingConfigsForV9TableCreation(
+          EVENT_TIME_ORDERING, null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, 
"ts", HoodieTableVersion.NINE);
+    });
+    assertThrows(IllegalArgumentException.class, () -> {
+      HoodieTableConfig.inferMergingConfigsForV9TableCreation(
+          COMMIT_TIME_ORDERING, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
null, HoodieTableVersion.NINE);
+    });
+    assertThrows(IllegalArgumentException.class, () -> {
+      HoodieTableConfig.inferMergingConfigsForV9TableCreation(
+          CUSTOM, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null, 
HoodieTableVersion.NINE);
+    });
+  }
+
+  @Test
+  void testInferMergingConfigsForVersion9EdgeCases() {
+    // Test case: Empty string payload class should be treated as null
+    Map<String, String> configs = 
HoodieTableConfig.inferMergingConfigsForV9TableCreation(
+        EVENT_TIME_ORDERING, "", EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", 
HoodieTableVersion.NINE);
+    assertEquals(2, configs.size());
+    assertEquals(EVENT_TIME_ORDERING.name(), 
configs.get(RECORD_MERGE_MODE.key()));
+    assertEquals(EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
configs.get(RECORD_MERGE_STRATEGY_ID.key()));
+
+    // Test case: Non-version 9 table with all parameters should throw
+    
assertExceptionWithInferMergingConfigsForV9TableCreation(EVENT_TIME_ORDERING, 
DefaultHoodieRecordPayload.class.getName(),
+        EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", HoodieTableVersion.EIGHT);
+
+    // Test case: Version 9 table with null ordering field for event time 
ordering should still work
+    configs = HoodieTableConfig.inferMergingConfigsForV9TableCreation(
+        EVENT_TIME_ORDERING, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null, 
HoodieTableVersion.NINE);
+    assertEquals(2, configs.size());
+    assertEquals(EVENT_TIME_ORDERING.name(), 
configs.get(RECORD_MERGE_MODE.key()));
+    assertEquals(EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
configs.get(RECORD_MERGE_STRATEGY_ID.key()));
+  }
+
+  @Test
+  void testInferMergingConfigsForVersion9WithAllTableVersions() {
+    // Test that only version 9 returns configs, others throw exception
+    for (HoodieTableVersion version : HoodieTableVersion.values()) {
+      if (version.lesserThan(HoodieTableVersion.NINE)) {
+        
assertExceptionWithInferMergingConfigsForV9TableCreation(EVENT_TIME_ORDERING, 
DefaultHoodieRecordPayload.class.getName(),
+            EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", version);
+      } else {
+        Map<String, String> configs = 
HoodieTableConfig.inferMergingConfigsForV9TableCreation(
+            EVENT_TIME_ORDERING, DefaultHoodieRecordPayload.class.getName(),
+            EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", version);
+        assertEquals(3, configs.size(), "Table version 9 and above should 
return 3 configs");
+      }
+    }
+  }
+
+  private void 
assertExceptionWithInferMergingConfigsForV9TableCreation(RecordMergeMode 
recordMergeMode,
+                                                                        String 
payloadClassName,
+                                                                        String 
recordMergeStrategyId,
+                                                                        String 
orderingFieldName,
+                                                                        
HoodieTableVersion tableVersion) {
+    HoodieIOException ioException = assertThrows(HoodieIOException.class, () 
-> {
+      HoodieTableConfig.inferMergingConfigsForV9TableCreation(recordMergeMode, 
payloadClassName,
+          recordMergeStrategyId, orderingFieldName, tableVersion);
+    });
+    assertEquals("Unsupported flow for table versions less than 9", 
ioException.getMessage().toString());
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 1ac3a3fb0f3c..f2d938ceeaed 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -1125,10 +1125,11 @@ class HoodieSparkSqlWriterInternal {
       HoodieTableVersion.fromVersionCode(
         SparkConfigUtils.getStringWithAltKeys(mergedParams, 
WRITE_TABLE_VERSION).toInt)
     }
+    // Handle merge properties.
     if (!mergedParams.contains(DataSourceWriteOptions.RECORD_MERGE_MODE.key())
       || 
!mergedParams.contains(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key())
       || 
!mergedParams.contains(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key())) {
-      val inferredMergeConfigs = HoodieTableConfig.inferCorrectMergingBehavior(
+      val inferredMergeConfigs = 
HoodieTableConfig.inferMergingConfigsForWrites(
         
RecordMergeMode.getValue(mergedParams.getOrElse(DataSourceWriteOptions.RECORD_MERGE_MODE.key(),
 null)),
         
mergedParams.getOrElse(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key(), ""),
         
mergedParams.getOrElse(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key(), 
""),
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index b4be1229605e..669642a644e2 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -24,6 +24,7 @@ import 
org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonCo
 import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
 import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, 
OverwriteWithLatestAvroPayload, WriteOperationType}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableVersion}
+import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.config.HoodieWriteConfig.{RECORD_MERGE_MODE, 
SPARK_SQL_MERGE_INTO_PREPPED_KEY}
 import org.apache.hudi.exception.HoodieException
@@ -216,6 +217,33 @@ object HoodieWriterUtils {
     validateTableConfig(spark, params, tableConfig, false)
   }
 
+  /**
+   * This function adds specific rules to choose the right config key for 
payload class for version 9 tables.
+   *
+   * RULE 1: When
+   *   1. table version is 9,
+   *   2. writer key is a payload class key, and
+   *   3. table config has legacy payload class configured,
+   * then
+   *   return legacy payload class key.
+   *
+   * Basic rule:
+   *   return writer key.
+   */
+  def getPayloadClassConfigKeyFromTableConfig(key: String, tableConfig: 
HoodieConfig): String = {
+    if (tableConfig == null) {
+      key
+    } else {
+      if (tableConfig.getInt(HoodieTableConfig.VERSION) == 
HoodieTableVersion.NINE.versionCode()
+        && !StringUtils.isNullOrEmpty(tableConfig.getStringOrDefault(
+        HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME, 
StringUtils.EMPTY_STRING).trim)) {
+        HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key
+      } else {
+        key
+      }
+    }
+  }
+
   /**
    * Detects conflicts between new parameters and existing table configurations
    */
@@ -227,9 +255,14 @@ object HoodieWriterUtils {
       val diffConfigs = StringBuilder.newBuilder
       params.foreach { case (key, value) =>
         if (!shouldIgnoreConfig(key, value, params, tableConfig)) {
-          val existingValue = 
getStringFromTableConfigWithAlternatives(tableConfig, key)
+          val keyInTableConfig = if 
(key.equals(HoodieTableConfig.PAYLOAD_CLASS_NAME.key))  {
+            getPayloadClassConfigKeyFromTableConfig(key, tableConfig)
+          } else {
+            key
+          }
+          val existingValue = 
getStringFromTableConfigWithAlternatives(tableConfig, keyInTableConfig)
           if (null != existingValue && !resolver(existingValue, value)) {
-            
diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
+            
diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(keyInTableConfig)}\n")
           }
         }
       }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java
 
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java
index 959654ff7680..a8ec70d657eb 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java
@@ -17,20 +17,27 @@
 
 package org.apache.hudi;
 
+import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.testutils.HoodieClientTestBase;
 import org.apache.hudi.util.JavaScalaConverters;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
 import java.util.Properties;
+import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getMetaClientBuilder;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
 
 class TestHoodieWriterUtils extends HoodieClientTestBase {
 
@@ -43,4 +50,39 @@ class TestHoodieWriterUtils extends HoodieClientTestBase {
     properties.put(HoodieTableConfig.DATABASE_NAME.key(), 
"databaseFromCatalog");
     Assertions.assertDoesNotThrow(() -> 
HoodieWriterUtils.validateTableConfig(sparkSession, 
JavaScalaConverters.convertJavaPropertiesToScalaMap(properties), tableConfig));
   }
+
+  @Test
+  void 
testGetKeyInTableConfigTableVersion9PayloadClassKeyWithoutLegacyPayloadClass() {
+    HoodieConfig config = new HoodieConfig();
+    String result = HoodieWriterUtils.getPayloadClassConfigKeyFromTableConfig(
+        HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), config);
+    Assertions.assertEquals(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), 
result);
+  }
+
+  private static Stream<Arguments> 
testGetKeyInTableConfigTableVersion9PayloadClassKeyWithLegacyPayloadClass() {
+
+    Stream<Arguments> arguments = Stream.of(
+        arguments(HoodieTableVersion.EIGHT,
+            "org.apache.hudi.common.model.DefaultHoodieRecordPayload", 
"hoodie.compaction.payload.class"),
+        arguments(HoodieTableVersion.NINE,
+            "", "hoodie.compaction.payload.class"),
+        arguments(HoodieTableVersion.NINE,
+            "  ", "hoodie.compaction.payload.class"),
+        arguments(HoodieTableVersion.NINE,
+            "org.apache.hudi.common.model.DefaultHoodieRecordPayload", 
"hoodie.table.legacy.payload.class")
+    );
+    return arguments;
+  }
+
+  @ParameterizedTest
+  @MethodSource()
+  void 
testGetKeyInTableConfigTableVersion9PayloadClassKeyWithLegacyPayloadClass(
+      HoodieTableVersion tableVersion, String legacyPayloadToSet, String 
expectedKey) {
+    HoodieConfig config = new HoodieConfig();
+    config.setValue(HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME, 
legacyPayloadToSet);
+    config.setValue(HoodieTableConfig.VERSION, 
String.valueOf(tableVersion.versionCode()));
+    String result = HoodieWriterUtils.getPayloadClassConfigKeyFromTableConfig(
+        HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), config);
+    Assertions.assertEquals(expectedKey, result);
+  }
 }
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 56507994fa69..bd4e62a070b7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -772,7 +772,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable,
     }
 
     val mergeMode = if 
(tableConfig.getTableVersion.lesserThan(HoodieTableVersion.EIGHT)) {
-      val inferredMergeConfigs = HoodieTableConfig.inferCorrectMergingBehavior(
+      val inferredMergeConfigs = 
HoodieTableConfig.inferMergingConfigsForWrites(
         tableConfig.getRecordMergeMode,
         tableConfig.getPayloadClass,
         tableConfig.getRecordMergeStrategyId,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDecimalTypeDataWorkflow.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDecimalTypeDataWorkflow.scala
index c4014bc5719a..8913e2a2df65 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDecimalTypeDataWorkflow.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDecimalTypeDataWorkflow.scala
@@ -26,7 +26,6 @@ import 
org.apache.hudi.testutils.SparkClientFunctionalTestHarness
 import org.apache.spark.sql.types.{Decimal, DecimalType, IntegerType, 
StructField, StructType}
 import org.apache.spark.sql.{DataFrame, Row, SaveMode}
 import org.junit.jupiter.api.Assertions.assertTrue
-import org.junit.jupiter.api.Test
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.CsvSource
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
index 5dbe1faca544..a5b2b996d9f6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
@@ -126,7 +126,7 @@ class TestBufferedRecordMerger extends 
SparkClientFunctionalTestHarness {
   void testRegularMerging(RecordMergeMode mergeMode, PartialUpdateMode 
updateMode, MergeStage stage) throws IOException {
     if (updateMode == PartialUpdateMode.IGNORE_MARKERS) {
       props.put(
-          HoodieTableConfig.MERGE_PROPERTIES_PREFIX + 
PARTIAL_UPDATE_CUSTOM_MARKER,
+          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
PARTIAL_UPDATE_CUSTOM_MARKER,
           IGNORE_MARKERS_VALUE);
     }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
index ecd3dd3f45f9..bc749a0c8428 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
@@ -34,7 +34,6 @@ import 
org.apache.hudi.client.transaction.lock.InProcessLockProvider;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.LockConfiguration;
-import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.data.HoodiePairData;
@@ -88,7 +87,6 @@ import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.common.util.collection.Triple;
 import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieClusteringConfig;
@@ -434,13 +432,8 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     validateMetadata(testTable, true);
 
     assertTrue(metadataWriter.isPresent());
-    Triple<RecordMergeMode, String, String> inferredMergeConfs =
-        HoodieTableConfig.inferCorrectMergingBehavior(
-            writeConfig.getRecordMergeMode(), writeConfig.getPayloadClass(),
-            writeConfig.getRecordMergeStrategyId(), String.join(",", 
writeConfig.getPreCombineFields()),
-            metaClient.getTableConfig().getTableVersion());
-    HoodieTableConfig hoodieTableConfig =
-        new HoodieTableConfig(this.storage, metaClient.getMetaPath(), 
inferredMergeConfs.getLeft(), inferredMergeConfs.getMiddle(), 
inferredMergeConfs.getRight());
+    HoodieTableConfig hoodieTableConfig = new HoodieTableConfig(
+        this.storage, metaClient.getMetaPath(), 
writeConfig.getRecordMergeMode(), writeConfig.getPayloadClass(), 
writeConfig.getRecordMergeStrategyId());
     assertFalse(hoodieTableConfig.getMetadataPartitions().isEmpty());
 
     // Turn off metadata table
@@ -457,16 +450,11 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     Option metadataWriter2 = table2.getMetadataWriter(instant2);
     assertFalse(metadataWriter2.isPresent());
 
-    Triple<RecordMergeMode, String, String> inferredMergeConfs2 =
-        HoodieTableConfig.inferCorrectMergingBehavior(
-            writeConfig2.getRecordMergeMode(), writeConfig2.getPayloadClass(),
-            writeConfig2.getRecordMergeStrategyId(), String.join(",", 
writeConfig2.getPreCombineFields()),
-            metaClient.getTableConfig().getTableVersion());
     HoodieTableConfig hoodieTableConfig2 =
         new HoodieTableConfig(this.storage, metaClient.getMetaPath(),
-            inferredMergeConfs2.getLeft(),
-            inferredMergeConfs2.getMiddle(),
-            inferredMergeConfs2.getRight());
+            writeConfig2.getRecordMergeMode(),
+            writeConfig2.getPayloadClass(),
+            writeConfig2.getRecordMergeStrategyId());
     assertEquals(Collections.emptySet(), 
hoodieTableConfig2.getMetadataPartitions());
     // Assert metadata table folder is deleted
     assertFalse(metaClient.getStorage().exists(
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 9176369a4d67..b76b8a611c49 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -1558,8 +1558,9 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
     }
   }
 
-  @Test
-  def testMergerStrategySet(): Unit = {
+  @ParameterizedTest
+  @CsvSource(Array("8", "9"))
+  def testMergerStrategySet(tableVersion: String): Unit = {
     val (writeOpts, _) = getWriterReaderOpts()
     val input = recordsToStrings(dataGen.generateInserts("000", 1)).asScala
     val inputDf= spark.read.json(spark.sparkContext.parallelize(input.toSeq, 
1))
@@ -1570,6 +1571,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key(), 
mergerStrategyName)
       .option(DataSourceWriteOptions.RECORD_MERGE_MODE.key(), 
RecordMergeMode.CUSTOM.name())
+      .option(HoodieWriteConfig.WRITE_TABLE_VERSION.key, tableVersion)
       .mode(SaveMode.Overwrite)
       .save(basePath)
     metaClient = createMetaClient(spark, basePath)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
index 647ee6868df5..3f6840fc6d5a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
@@ -21,27 +21,29 @@ package org.apache.hudi.functional
 
 import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.DataSourceWriteOptions.{OPERATION, PRECOMBINE_FIELD, 
RECORDKEY_FIELD, TABLE_TYPE}
-import org.apache.hudi.common.model.{AWSDmsAvroPayload, EventTimeAvroPayload, 
OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload, 
PartialUpdateAvroPayload}
-import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.model.{AWSDmsAvroPayload, 
DefaultHoodieRecordPayload, EventTimeAvroPayload, HoodieRecordMerger, 
OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload, 
PartialUpdateAvroPayload}
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload.{DELETE_KEY, 
DELETE_MARKER}
+import org.apache.hudi.common.model.debezium.{MySqlDebeziumAvroPayload, 
PostgresDebeziumAvroPayload}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
 
 import org.apache.spark.sql.SaveMode
-import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+import org.scalatest.Assertions.assertThrows
 
 class TestPayloadDeprecationFlow extends SparkClientFunctionalTestHarness {
   @ParameterizedTest
-  @MethodSource(Array("provideParams"))
+  @MethodSource(Array("providePayloadClassTestCases"))
   def testMergerBuiltinPayload(tableType: String,
-                               payloadClazz: String): Unit = {
+                               payloadClazz: String,
+                               expectedConfigs: Map[String, String]): Unit = {
     val opts: Map[String, String] = Map(
-      HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz,
-      HoodieTableConfig.MERGE_PROPERTIES_PREFIX + 
"hoodie.payload.delete.field" -> "Op",
-      HoodieTableConfig.MERGE_PROPERTIES_PREFIX + 
"hoodie.payload.delete.marker" -> "d")
+      HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz)
     val columns = Seq("ts", "key", "rider", "driver", "fare", "Op")
-
     // 1. Add an insert.
     val data = Seq(
       (10, "1", "rider-A", "driver-A", 19.10, "i"),
@@ -59,15 +61,30 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
       options(opts).
       mode(SaveMode.Overwrite).
       save(basePath)
+    // Verify table was created successfully
+    val metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(storageConf())
+      .build()
+    val tableConfig = metaClient.getTableConfig
+    // Verify table version is 9
+    assertEquals(9, tableConfig.getTableVersion.versionCode())
+    // Verify expected configs are set correctly
+    expectedConfigs.foreach { case (key, expectedValue) =>
+      if (expectedValue != null) {
+        assertEquals(expectedValue, tableConfig.getString(key), s"Config $key 
should be $expectedValue")
+      } else {
+        assertFalse(tableConfig.contains(key), s"Config $key should not be 
present")
+      }
+    }
     // 2. Add an update.
     val firstUpdateData = Seq(
-      (11, "1", "rider-X", "driver-X", 19.10, "d"),
+      (11, "1", "rider-X", "driver-X", 19.10, "D"),
       (11, "2", "rider-Y", "driver-Y", 27.70, "u"))
     val firstUpdate = spark.createDataFrame(firstUpdateData).toDF(columns: _*)
     firstUpdate.write.format("hudi").
       option(OPERATION.key(), "upsert").
       option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
-      options(opts).
       mode(SaveMode.Append).
       save(basePath)
     // 3. Add an update.
@@ -78,27 +95,41 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
     val secondUpdate = spark.createDataFrame(secondUpdateData).toDF(columns: 
_*)
     secondUpdate.write.format("hudi").
       option(OPERATION.key(), "upsert").
-      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "true").
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
       option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 
"1").
-      options(opts).
       mode(SaveMode.Append).
       save(basePath)
-    // 4. Validate.
-    val df = spark.read.format("hudi").options(opts).load(basePath)
+    // 4. Add a trivial update to trigger payload class mismatch.
+    val thirdUpdateData = Seq(
+      (12, "3", "rider-CC", "driver-CC", 33.90, "i"))
+    val thirdUpdate = spark.createDataFrame(thirdUpdateData).toDF(columns: _*)
+    if (!payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
+      assertThrows[HoodieException] {
+        thirdUpdate.write.format("hudi").
+          option(OPERATION.key(), "upsert").
+          option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+          
option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1").
+          option(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
+            classOf[MySqlDebeziumAvroPayload].getName). // Position is 
important.
+          mode(SaveMode.Append).
+          save(basePath)
+      }
+    }
+    // 5. Validate.
+    val df = spark.read.format("hudi").load(basePath)
     val finalDf = df.select("ts", "key", "rider", "driver", "fare", 
"Op").sort("key")
 
     val expectedData = if 
(!payloadClazz.equals(classOf[AWSDmsAvroPayload].getName)) {
-      if (payloadClazz.equals(classOf[PartialUpdateAvroPayload].getName)
-        || payloadClazz.equals(classOf[EventTimeAvroPayload].getName)) {
+      if 
(HoodieTableConfig.EVENT_TIME_ORDERING_PAYLOADS.contains(payloadClazz)) {
         Seq(
-          (11, "1", "rider-X", "driver-X", 19.10, "d"),
+          (11, "1", "rider-X", "driver-X", 19.10, "D"),
           (11, "2", "rider-Y", "driver-Y", 27.70, "u"),
           (12, "3", "rider-CC", "driver-CC", 33.90, "i"),
           (10, "4", "rider-D", "driver-D", 34.15, "i"),
           (12, "5", "rider-EE", "driver-EE", 17.85, "i"))
       } else {
         Seq(
-          (11, "1", "rider-X", "driver-X", 19.10, "d"),
+          (11, "1", "rider-X", "driver-X", 19.10, "D"),
           (11, "2", "rider-Y", "driver-Y", 27.70, "u"),
           (12, "3", "rider-CC", "driver-CC", 33.90, "i"),
           (9, "4", "rider-DD", "driver-DD", 34.15, "i"),
@@ -120,13 +151,77 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
 
 // TODO: Add COPY_ON_WRITE table type tests when write path is updated 
accordingly.
 object TestPayloadDeprecationFlow {
-  def provideParams(): java.util.List[Arguments] = {
+  def providePayloadClassTestCases(): java.util.List[Arguments] = {
     java.util.Arrays.asList(
-      Arguments.of("MERGE_ON_READ", 
classOf[OverwriteWithLatestAvroPayload].getName),
-      Arguments.of("MERGE_ON_READ", 
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName),
-      Arguments.of("MERGE_ON_READ", classOf[PartialUpdateAvroPayload].getName),
-      Arguments.of("MERGE_ON_READ", classOf[EventTimeAvroPayload].getName),
-      Arguments.of("MERGE_ON_READ", classOf[AWSDmsAvroPayload].getName)
+      Arguments.of(
+        "MERGE_ON_READ",
+        classOf[DefaultHoodieRecordPayload].getName,
+        Map(
+          HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[DefaultHoodieRecordPayload].getName,
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID)),
+      Arguments.of(
+        "MERGE_ON_READ",
+        classOf[OverwriteWithLatestAvroPayload].getName,
+        Map(
+          HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[OverwriteWithLatestAvroPayload].getName,
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID
+        )
+      ),
+      Arguments.of(
+        "MERGE_ON_READ",
+        classOf[PartialUpdateAvroPayload].getName,
+        Map(
+          HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[PartialUpdateAvroPayload].getName,
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS"),
+      Arguments.of(
+        "MERGE_ON_READ",
+        classOf[PostgresDebeziumAvroPayload].getName,
+        Map(
+          HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[PostgresDebeziumAvroPayload].getName,
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_MARKERS",
+        HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX
+            + HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER -> 
"__debezium_unavailable_value"),
+      Arguments.of(
+        "MERGE_ON_READ",
+        classOf[AWSDmsAvroPayload].getName,
+        Map(
+          HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[AWSDmsAvroPayload].getName,
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+        HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY -> "Op",
+        HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER -> "D"),
+      Arguments.of(
+        "MERGE_ON_READ",
+        classOf[MySqlDebeziumAvroPayload].getName,
+        Map(
+          HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[MySqlDebeziumAvroPayload].getName,
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID)),
+      Arguments.of(
+        "MERGE_ON_READ",
+        classOf[EventTimeAvroPayload].getName,
+        Map(
+          HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[EventTimeAvroPayload].getName,
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID
+        )
+      ),
+      Arguments.of(
+        "MERGE_ON_READ",
+        classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName,
+        Map(
+          HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName,
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS"
+        )
+      )
     )
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index 61df5fc76d17..5127924ac08f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -267,7 +267,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
{
     deleteDf.cache()
     deleteDf.write.format("hudi")
       .options(hudiOpts)
-      .option("hoodie.datasource.write.payload.class", 
"org.apache.hudi.common.model.EmptyHoodieRecordPayload")
+      .option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
       .mode(SaveMode.Append)
       .save(basePath)
     val prevDf = mergedDfList.last
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeCommitTimeOrdering.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeCommitTimeOrdering.scala
index 2b359d483ba0..eac9a29e5d88 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeCommitTimeOrdering.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeCommitTimeOrdering.scala
@@ -71,7 +71,6 @@ class TestMergeModeCommitTimeOrdering extends 
HoodieSparkSqlTestBase {
       Map(
         HoodieTableConfig.VERSION.key -> tableVersion,
         HoodieTableConfig.RECORD_MERGE_MODE.key -> COMMIT_TIME_ORDERING.name(),
-        HoodieTableConfig.PAYLOAD_CLASS_NAME.key -> 
classOf[OverwriteWithLatestAvroPayload].getName,
         HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key -> 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)
     }
     val nonExistentConfigs = if (tableVersion.toInt == 6) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeEventTimeOrdering.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeEventTimeOrdering.scala
index 572eaa82744b..7122183e249c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeEventTimeOrdering.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeEventTimeOrdering.scala
@@ -78,7 +78,6 @@ class TestMergeModeEventTimeOrdering extends 
HoodieSparkSqlTestBase {
         HoodieTableConfig.VERSION.key -> 
String.valueOf(HoodieTableVersion.current().versionCode()),
         HoodieTableConfig.PRECOMBINE_FIELDS.key -> "ts",
         HoodieTableConfig.RECORD_MERGE_MODE.key -> EVENT_TIME_ORDERING.name(),
-        HoodieTableConfig.PAYLOAD_CLASS_NAME.key -> 
classOf[DefaultHoodieRecordPayload].getName,
         HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key -> 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID)
     }
     val nonExistentConfigs = if (tableVersion.toInt == 6) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
index a5e9e56fb81d..0fc23f5af60b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
@@ -136,7 +136,6 @@ class TestRepairsProcedure extends 
HoodieSparkProcedureTestBase {
       val tableVersion = HoodieTableVersion.current().versionCode()
       val expectedOutput =s"""
           |[hoodie.archivelog.folder,archived,archive]
-          
|[hoodie.compaction.payload.class,org.apache.hudi.common.model.DefaultHoodieRecordPayload,null]
           |[hoodie.database.name,default,null]
           |[hoodie.datasource.write.drop.partition.columns,false,false]
           |[hoodie.datasource.write.hive_style_partitioning,true,null]
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index 02b189e382f4..191308ad4ddc 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -157,7 +157,7 @@ public class HoodieStreamer implements Serializable {
                         Option<TypedProperties> propsOverride, 
Option<SourceProfileSupplier> sourceProfileSupplier) throws IOException {
     this.properties = combineProperties(cfg, propsOverride, 
jssc.hadoopConfiguration());
     Triple<RecordMergeMode, String, String> mergingConfigs =
-        HoodieTableConfig.inferCorrectMergingBehavior(
+        HoodieTableConfig.inferMergingConfigsForWrites(
             cfg.recordMergeMode, cfg.payloadClassName, 
cfg.recordMergeStrategyId, cfg.sourceOrderingFields,
             
HoodieTableVersion.fromVersionCode(ConfigUtils.getIntWithAltKeys(this.properties,
 HoodieWriteConfig.WRITE_TABLE_VERSION)));
     cfg.recordMergeMode = mergingConfigs.getLeft();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 7ba953907280..a5a72e395aa7 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -650,7 +650,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
       }
       cfg.allowCommitOnNoCheckpointChange = allowCommitOnNoCheckpointChange;
       Triple<RecordMergeMode, String, String> mergeCfgs =
-          HoodieTableConfig.inferCorrectMergingBehavior(
+          HoodieTableConfig.inferMergingConfigsForWrites(
               cfg.recordMergeMode, cfg.payloadClassName, 
cfg.recordMergeStrategyId, cfg.sourceOrderingFields,
               HoodieTableVersion.current());
       cfg.recordMergeMode = mergeCfgs.getLeft();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index a5e0ad7d6831..f5148f3fcd85 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -1858,6 +1858,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         true, false, null, "MERGE_ON_READ");
     new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
     assertRecordCount(1000, dataSetBasePath, sqlContext);
+    HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, 
dataSetBasePath, false);
+    assertEquals(metaClient.getTableConfig().getPayloadClass(), 
DefaultHoodieRecordPayload.class.getName());
 
     //now create one more deltaStreamer instance and update payload class
     cfg = TestHelpers.makeConfig(dataSetBasePath, 
WriteOperationType.BULK_INSERT,
@@ -1865,9 +1867,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
     new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
 
-    //now assert that hoodie.properties file now has updated payload class name
-    HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, 
dataSetBasePath, false);
-    assertEquals(metaClient.getTableConfig().getPayloadClass(), 
DummyAvroPayload.class.getName());
+    // NOTE: Payload class cannot be updated, though the write can be executed 
using different payload classes in the runtime.
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    assertEquals(metaClient.getTableConfig().getPayloadClass(), 
DefaultHoodieRecordPayload.class.getName());
   }
 
   @Test
@@ -1884,6 +1886,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     assertEquals(metaClient.getTableConfig().getPayloadClass(), 
PartialUpdateAvroPayload.class.getName());
   }
 
+  @Disabled("To be fixed with HUDI-9714")
   @Test
   public void testPayloadClassUpdateWithCOWTable() throws Exception {
     String dataSetBasePath = basePath + "/test_dataset_cow";
@@ -1900,7 +1903,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       props.load(inputStream);
     }
 
-    assertTrue(props.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()));
+    assertFalse(props.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()));
     assertTrue(props.containsKey(HoodieTableConfig.RECORD_MERGE_MODE.key()));
     
assertTrue(props.containsKey(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key()));
 

Reply via email to