This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ab6d69b17af fix: Remove default record key and ordering fields values 
on the Flink side, consistent with Spark (#17994)
6ab6d69b17af is described below

commit 6ab6d69b17afef6866f7dc6f8b13f264894bfa42
Author: Geser Dugarov <[email protected]>
AuthorDate: Wed Jan 28 10:18:17 2026 +0700

    fix: Remove default record key and ordering fields values on the Flink 
side, consistent with Spark (#17994)
---
 .../org/apache/hudi/config/HoodieIndexConfig.java  |  14 ++-
 .../apache/hudi/config/HoodiePayloadConfig.java    |  15 ++-
 .../examples/quickstart/HoodieFlinkQuickstart.java |   2 +
 .../apache/hudi/configuration/FlinkOptions.java    |   4 +-
 .../apache/hudi/configuration/OptionsResolver.java |  25 +++--
 .../hudi/sink/append/AppendWriteFunctions.java     |  14 ++-
 .../org/apache/hudi/sink/bulk/RowDataKeyGens.java  |   4 +-
 .../hudi/sink/compact/HoodieFlinkCompactor.java    |   7 ++
 .../java/org/apache/hudi/sink/utils/Pipelines.java |   2 +-
 .../hudi/source/prune/PrimaryKeyPruners.java       |   3 +-
 .../org/apache/hudi/table/HoodieTableFactory.java  |  29 +++--
 .../org/apache/hudi/table/HoodieTableSource.java   |   2 +-
 .../apache/hudi/table/catalog/HoodieCatalog.java   |   3 +-
 .../hudi/table/catalog/HoodieCatalogUtil.java      |  13 ++-
 .../hudi/table/catalog/HoodieHiveCatalog.java      |   2 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |  13 ++-
 .../sink/compact/ITTestHoodieFlinkCompactor.java   |  26 ++++-
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 124 +++++++++++++++++++--
 .../apache/hudi/table/TestHoodieTableFactory.java  |   8 +-
 .../apache/hudi/table/format/TestInputFormat.java  |   5 +
 .../org/apache/hudi/utils/TestConfigurations.java  |   4 +
 21 files changed, 249 insertions(+), 70 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index 5c00d0d4bcb1..9cedac1be74d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -719,12 +719,22 @@ public class HoodieIndexConfig extends HoodieConfig {
     }
 
     public Builder withIndexKeyField(String keyField) {
-      hoodieIndexConfig.setValue(BUCKET_INDEX_HASH_FIELD, keyField);
+      if (StringUtils.nonEmpty(keyField)) {
+        hoodieIndexConfig.setValue(BUCKET_INDEX_HASH_FIELD, keyField);
+      } else {
+        log.warn("'{}' wasn't set during Hoodie index building due to absent 
key field passed.",
+            BUCKET_INDEX_HASH_FIELD.key());
+      }
       return this;
     }
 
     public Builder withRecordKeyField(String keyField) {
-      hoodieIndexConfig.setValue(KeyGeneratorOptions.RECORDKEY_FIELD_NAME, 
keyField);
+      if (StringUtils.nonEmpty(keyField)) {
+        hoodieIndexConfig.setValue(KeyGeneratorOptions.RECORDKEY_FIELD_NAME, 
keyField);
+      } else {
+        log.warn("'{}' wasn't set during Hoodie index building due to absent 
key field passed.",
+            KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
+      }
       return this;
     }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
index 37106389af5d..e733fe6e2375 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
@@ -25,6 +25,8 @@ import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.StringUtils;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
@@ -36,6 +38,7 @@ import static 
org.apache.hudi.common.model.HoodiePayloadProps.PAYLOAD_ORDERING_F
 /**
  * Hoodie payload related configs.
  */
+@Slf4j
 @ConfigClassProperty(name = "Payload Configurations",
     groupName = ConfigGroups.Names.RECORD_PAYLOAD,
     description = "Payload related configs, that can be leveraged to "
@@ -44,7 +47,7 @@ public class HoodiePayloadConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> EVENT_TIME_FIELD = ConfigProperty
       .key(PAYLOAD_EVENT_TIME_FIELD_PROP_KEY)
-      .defaultValue("ts")
+      .noDefaultValue()
       .markAdvanced()
       .withDocumentation("Table column/field name to derive timestamp 
associated with the records. This can"
           + "be useful for e.g, determining the freshness of the table.");
@@ -97,12 +100,20 @@ public class HoodiePayloadConfig extends HoodieConfig {
     public Builder withPayloadOrderingFields(String payloadOrderingFields) {
       if (StringUtils.nonEmpty(payloadOrderingFields)) {
         payloadConfig.setValue(ORDERING_FIELDS, payloadOrderingFields);
+      } else {
+        log.warn("'{}' wasn't set during Hoodie payload building due to absent 
key field passed.",
+            ORDERING_FIELDS.key());
       }
       return this;
     }
 
     public Builder withPayloadEventTimeField(String payloadEventTimeField) {
-      payloadConfig.setValue(EVENT_TIME_FIELD, 
String.valueOf(payloadEventTimeField));
+      if (StringUtils.nonEmpty(payloadEventTimeField)) {
+        payloadConfig.setValue(EVENT_TIME_FIELD, payloadEventTimeField);
+      } else {
+        log.warn("'{}' wasn't set during Hoodie payload building due to absent 
key field passed.",
+            EVENT_TIME_FIELD.key());
+      }
       return this;
     }
 
diff --git 
a/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/HoodieFlinkQuickstart.java
 
b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/HoodieFlinkQuickstart.java
index b657430f8fea..1b527651de88 100644
--- 
a/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/HoodieFlinkQuickstart.java
+++ 
b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/HoodieFlinkQuickstart.java
@@ -139,6 +139,8 @@ public final class HoodieFlinkQuickstart {
     // create hudi table
     String hoodieTableDDL = sql(tableName)
         .option(FlinkOptions.PATH, tablePath)
+        .option(FlinkOptions.RECORD_KEY_FIELD, "uuid")
+        .option(FlinkOptions.ORDERING_FIELDS, "ts")
         .option(FlinkOptions.READ_AS_STREAMING, true)
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false)
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 773c8721900f..d58cde528105 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -114,7 +114,7 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<String> ORDERING_FIELDS = ConfigOptions
       .key("ordering.fields")
       .stringType()
-      .defaultValue("ts")
+      .noDefaultValue()
       .withFallbackKeys("precombine.field", "write.precombine.field", 
HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key())
       .withDescription("Comma separated list of fields used in records 
merging. When two records have the same\n"
           + "key value, we will pick the one with the largest value for the 
ordering field,\n"
@@ -539,7 +539,7 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<String> RECORD_KEY_FIELD = ConfigOptions
       .key(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
       .stringType()
-      .defaultValue("uuid")
+      .noDefaultValue()
       .withDescription("Record key field. Value to be used as the `recordKey` 
component of `HoodieKey`.\n"
           + "Actual value will be obtained by invoking .toString() on the 
field value. Nested fields can be specified using "
           + "the dot notation eg: `a.b.c`");
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 6f083daac07a..fd69ed9e61e2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -34,6 +34,7 @@ import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -151,13 +152,26 @@ public class OptionsResolver {
     return 
conf.get(FlinkOptions.PAYLOAD_CLASS_NAME).contains(DefaultHoodieRecordPayload.class.getSimpleName());
   }
 
+  /**
+   * Return value of {@link FlinkOptions#RECORD_KEY_FIELD} if it was set,
+   * or throw exception otherwise.
+   */
+  public static String getRecordKeyStr(Configuration conf) {
+    final String recordKeyStr = conf.get(FlinkOptions.RECORD_KEY_FIELD);
+    ValidationUtils.checkArgument(
+        recordKeyStr != null,
+        "Primary key definition is required, use either PRIMARY KEY syntax or 
option '"
+            + FlinkOptions.RECORD_KEY_FIELD.key() + "' to specify.");
+    return recordKeyStr;
+  }
+
   /**
    * Returns the ordering fields as comma separated string
    * or null if the value is set as {@link FlinkOptions#NO_PRE_COMBINE}.
    */
   public static String getOrderingFieldsStr(Configuration conf) {
     final String orderingFields = conf.get(FlinkOptions.ORDERING_FIELDS);
-    return orderingFields.equals(FlinkOptions.NO_PRE_COMBINE) ? null : 
orderingFields;
+    return FlinkOptions.NO_PRE_COMBINE.equals(orderingFields) ? null : 
orderingFields;
   }
 
   /**
@@ -429,14 +443,7 @@ public class OptionsResolver {
    * Returns the index key field.
    */
   public static String getIndexKeyField(Configuration conf) {
-    return conf.getString(FlinkOptions.INDEX_KEY_FIELD.key(), 
conf.get(FlinkOptions.RECORD_KEY_FIELD));
-  }
-
-  /**
-   * Returns the index key field values.
-   */
-  public static String[] getIndexKeys(Configuration conf) {
-    return getIndexKeyField(conf).split(",");
+    return conf.getString(FlinkOptions.INDEX_KEY_FIELD.key(), 
getRecordKeyStr(conf));
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java
index a73f923782a2..39cda29ab422 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java
@@ -21,14 +21,14 @@ package org.apache.hudi.sink.append;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.sink.buffer.BufferType;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.types.logical.RowType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.List;
@@ -38,11 +38,10 @@ import java.util.stream.Collectors;
  * Factory utilities for creating {@link AppendWriteFunction} instances based 
on configuration.
  * Handles buffer type selection, sort key resolution, and rate limiting.
  */
+@Slf4j
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public abstract class AppendWriteFunctions {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(AppendWriteFunctions.class);
-
   /**
    * Creates a {@link AppendWriteFunction} instance based on the given 
configuration.
    */
@@ -71,7 +70,7 @@ public abstract class AppendWriteFunctions {
 
     // Backward compatibility: write.buffer.sort.enabled=true → DISRUPTOR
     if (conf.get(FlinkOptions.WRITE_BUFFER_SORT_ENABLED)) {
-      LOG.info("write.buffer.sort.enabled is deprecated. Use 
write.buffer.type=DISRUPTOR instead.");
+      log.info("write.buffer.sort.enabled is deprecated. Use 
write.buffer.type=DISRUPTOR instead.");
       return BufferType.DISRUPTOR.name();
     }
 
@@ -90,7 +89,10 @@ public abstract class AppendWriteFunctions {
     String sortKeys = conf.get(FlinkOptions.WRITE_BUFFER_SORT_KEYS);
     if (StringUtils.isNullOrEmpty(sortKeys)) {
       // Default to record key field(s)
-      sortKeys = conf.get(FlinkOptions.RECORD_KEY_FIELD);
+      log.info("'{}' is not set, therefore '{}' value will be used as sort 
keys instead",
+          FlinkOptions.WRITE_BUFFER_SORT_KEYS.key(),
+          FlinkOptions.RECORD_KEY_FIELD.key());
+      sortKeys = OptionsResolver.getRecordKeyStr(conf);
     }
     ValidationUtils.checkArgument(StringUtils.nonEmpty(sortKeys),
         "Sort keys can't be null or empty for append write with buffer sort. "
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
index df0e553feb29..fc6128e6918d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.sink.bulk;
 
-import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieValidationException;
 
 import org.apache.flink.configuration.Configuration;
@@ -37,7 +37,7 @@ public class RowDataKeyGens {
    * Creates {@link RowDataKeyGen} of corresponding type depending on table 
configuration.
    */
   public static RowDataKeyGen instance(Configuration conf, RowType rowType, 
@Nullable Integer taskId, @Nullable String instantTime) {
-    String recordKeys = conf.get(FlinkOptions.RECORD_KEY_FIELD);
+    String recordKeys = OptionsResolver.getRecordKeyStr(conf);
     if (hasRecordKey(recordKeys, rowType.getFieldNames())) {
       return RowDataKeyGen.instance(conf, rowType);
     } else {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index 367ff9e5f1a8..34ac7e801570 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.InstantGenerator;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.TableServiceUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -209,6 +210,12 @@ public class HoodieFlinkCompactor {
       // get the table name
       conf.set(FlinkOptions.TABLE_NAME, 
metaClient.getTableConfig().getTableName());
 
+      // get the primary key if absent in conf, but presented in table configs
+      if (!conf.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())
+          && 
StringUtils.nonEmpty(metaClient.getTableConfig().getRecordKeyFieldProp())) {
+        conf.set(FlinkOptions.RECORD_KEY_FIELD, 
metaClient.getTableConfig().getRecordKeyFieldProp());
+      }
+
       // set table schema
       CompactionUtil.setAvroSchema(conf, metaClient);
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index ae0dce254222..cc7b523385a9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -157,7 +157,7 @@ public class Pipelines {
       if (conf.get(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
         final boolean isNeededSortInput = 
conf.get(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT_BY_RECORD_KEY);
         final String[] partitionFields = 
FilePathUtils.extractPartitionKeys(conf);
-        final String[] recordKeyFields = 
conf.get(FlinkOptions.RECORD_KEY_FIELD).split(",");
+        final String[] recordKeyFields = 
OptionsResolver.getRecordKeyStr(conf).split(",");
 
         // if sort input by record key is needed then add record keys to 
partition keys
         String[] sortFields = isNeededSortInput
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java
index 24c82fbffc6a..46a71c41518f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java
@@ -20,7 +20,6 @@ package org.apache.hudi.source.prune;
 
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.index.bucket.BucketIdentifier;
 import org.apache.hudi.util.ExpressionUtils;
@@ -44,7 +43,7 @@ import java.util.stream.Collectors;
 public class PrimaryKeyPruners {
 
   public static Function<Integer, Integer> 
getBucketIdFunc(List<ResolvedExpression> hashKeyFilters, Configuration conf) {
-    List<String> pkFields = 
Arrays.asList(conf.get(FlinkOptions.RECORD_KEY_FIELD).split(","));
+    List<String> pkFields = 
Arrays.asList(OptionsResolver.getRecordKeyStr(conf).split(","));
     // step1: resolve the hash key values
     final boolean logicalTimestamp = 
OptionsResolver.isConsistentLogicalTimestampEnabled(conf);
     List<String> values = hashKeyFilters.stream()
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 05d04553d0c0..c369434f604d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -214,16 +214,8 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
    */
   private void checkRecordKey(Configuration conf, ResolvedSchema schema) {
     List<String> fields = schema.getColumnNames();
-    if (!schema.getPrimaryKey().isPresent()) {
-      String[] recordKeys = conf.get(FlinkOptions.RECORD_KEY_FIELD).split(",");
-      if (recordKeys.length == 1
-          && FlinkOptions.RECORD_KEY_FIELD.defaultValue().equals(recordKeys[0])
-          && !fields.contains(recordKeys[0])) {
-        throw new HoodieValidationException("Primary key definition is 
required, the default primary key field "
-            + "'" + FlinkOptions.RECORD_KEY_FIELD.defaultValue() + "' does not 
exist in the table schema, "
-            + "use either PRIMARY KEY syntax or option '" + 
FlinkOptions.RECORD_KEY_FIELD.key() + "' to speciy.");
-      }
-
+    if (schema.getPrimaryKey().isEmpty()) {
+      String[] recordKeys = OptionsResolver.getRecordKeyStr(conf).split(",");
       Arrays.stream(recordKeys)
           .filter(field -> !fields.contains(field))
           .findAny()
@@ -232,8 +224,10 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
                 + "'" + FlinkOptions.RECORD_KEY_FIELD.key() + "' does not 
exist in the table schema.");
           });
     }
-    if (schema.getPrimaryKey().isPresent() && 
conf.containsKey(FlinkOptions.RECORD_KEY_FIELD.key()))   {
-      log.warn("PRIMARY KEY syntax and option '{}' was used. Value of the 
PRIMARY KEY will be used and option will be ignored!", 
FlinkOptions.RECORD_KEY_FIELD.key());
+    if (schema.getPrimaryKey().isPresent() && 
conf.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) {
+      log.warn("PRIMARY KEY syntax and option '{}' was used. "
+          + "Value of the PRIMARY KEY will be used and option will be 
ignored!",
+          FlinkOptions.RECORD_KEY_FIELD.key());
     }
   }
 
@@ -280,17 +274,20 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
       conf.set(FlinkOptions.RECORD_KEY_FIELD, recordKey);
     }
     List<String> partitionKeys = table.getPartitionKeys();
-    if (partitionKeys.size() > 0) {
+    if (!partitionKeys.isEmpty()) {
       // the PARTITIONED BY syntax always has higher priority than option 
FlinkOptions#PARTITION_PATH_FIELD
       conf.set(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", 
partitionKeys));
     }
     // set index key for bucket index if not defined
     if 
(conf.get(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name())) 
{
       if (conf.get(FlinkOptions.INDEX_KEY_FIELD).isEmpty()) {
-        conf.set(FlinkOptions.INDEX_KEY_FIELD, 
conf.get(FlinkOptions.RECORD_KEY_FIELD));
+        log.info("'{}' is not set, therefore '{}' value will be used as index 
key instead",
+            FlinkOptions.INDEX_KEY_FIELD.key(),
+            FlinkOptions.RECORD_KEY_FIELD.key());
+        conf.set(FlinkOptions.INDEX_KEY_FIELD, 
OptionsResolver.getRecordKeyStr(conf));
       } else {
         Set<String> recordKeySet =
-            
Arrays.stream(conf.get(FlinkOptions.RECORD_KEY_FIELD).split(",")).collect(Collectors.toSet());
+            
Arrays.stream(OptionsResolver.getRecordKeyStr(conf).split(",")).collect(Collectors.toSet());
         Set<String> indexKeySet =
             
Arrays.stream(conf.get(FlinkOptions.INDEX_KEY_FIELD).split(",")).collect(Collectors.toSet());
         if (!recordKeySet.containsAll(indexKeySet)) {
@@ -302,7 +299,7 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
 
     // tweak the key gen class if possible
     final String[] partitions = 
conf.get(FlinkOptions.PARTITION_PATH_FIELD).split(",");
-    final String[] pks = conf.get(FlinkOptions.RECORD_KEY_FIELD).split(",");
+    final String[] pks = OptionsResolver.getRecordKeyStr(conf).split(",");
     if (partitions.length == 1) {
       final String partitionField = partitions[0];
       if (partitionField.isEmpty()) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index b1bba78f0b07..32fe9e5d9f8f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -381,7 +381,7 @@ public class HoodieTableSource implements
     if (!OptionsResolver.isBucketIndexType(conf) || dataFilters.isEmpty()) {
       return Option.empty();
     }
-    Set<String> indexKeyFields = 
Arrays.stream(OptionsResolver.getIndexKeys(conf)).collect(Collectors.toSet());
+    Set<String> indexKeyFields = 
Arrays.stream(OptionsResolver.getIndexKeyField(conf).split(",")).collect(Collectors.toSet());
     List<ResolvedExpression> indexKeyFilters = 
dataFilters.stream().filter(expr -> ExpressionUtils.isEqualsLitExpr(expr, 
indexKeyFields)).collect(Collectors.toList());
     if (!ExpressionUtils.isFilteringByAllFields(indexKeyFilters, 
indexKeyFields)) {
       return Option.empty();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
index cc8ed951f1c0..065a8a00c276 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
@@ -349,7 +350,7 @@ public class HoodieCatalog extends AbstractCatalog {
       conf.set(FlinkOptions.PARTITION_PATH_FIELD, partitions);
       options.put(TableOptionProperties.PARTITION_COLUMNS, partitions);
 
-      final String[] pks = conf.get(FlinkOptions.RECORD_KEY_FIELD).split(",");
+      final String[] pks = OptionsResolver.getRecordKeyStr(conf).split(",");
       boolean complexHoodieKey = pks.length > 1 || 
resolvedTable.getPartitionKeys().size() > 1;
       StreamerUtil.checkKeygenGenerator(complexHoodieKey, conf);
     } else {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
index ab54f3b393e0..7e37f67b18cd 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
@@ -22,7 +22,9 @@ import org.apache.hudi.adapter.Utils;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
@@ -256,13 +258,18 @@ public class HoodieCatalogUtil {
       String tablePathStr,
       ObjectPath tablePath,
       org.apache.hadoop.conf.Configuration hadoopConf) {
-    return FlinkWriteClients.createWriteClientV2(
+    HoodieTableConfig tableConfig = 
StreamerUtil.createMetaClient(tablePathStr, hadoopConf).getTableConfig();
+    org.apache.flink.configuration.Configuration conf =
         org.apache.flink.configuration.Configuration.fromMap(options)
             .set(FlinkOptions.TABLE_NAME, tablePath.getObjectName())
             .set(
                 FlinkOptions.SOURCE_AVRO_SCHEMA,
-                StreamerUtil.createMetaClient(tablePathStr, hadoopConf)
-                    
.getTableConfig().getTableCreateSchema().get().toString()));
+                tableConfig.getTableCreateSchema().get().toString());
+    String recordKey = tableConfig.getRecordKeyFieldProp();
+    if (StringUtils.nonEmpty(recordKey)) {
+      conf.set(FlinkOptions.RECORD_KEY_FIELD, recordKey);
+    }
+    return FlinkWriteClients.createWriteClientV2(conf);
   }
 
   private static boolean sameOptions(Map<String, String> parameters1, 
Map<String, String> parameters2, ConfigOption<String> option) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 31017d1ebf15..5b1b1767c772 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -513,7 +513,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     if (catalogTable.isPartitioned() && 
!flinkConf.contains(FlinkOptions.PARTITION_PATH_FIELD)) {
       final String partitions = String.join(",", 
catalogTable.getPartitionKeys());
       flinkConf.set(FlinkOptions.PARTITION_PATH_FIELD, partitions);
-      final String[] pks = 
flinkConf.get(FlinkOptions.RECORD_KEY_FIELD).split(",");
+      final String[] pks = 
OptionsResolver.getRecordKeyStr(flinkConf).split(",");
       boolean complexHoodieKey = pks.length > 1 || 
catalogTable.getPartitionKeys().size() > 1;
       StreamerUtil.checkKeygenGenerator(complexHoodieKey, flinkConf);
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index c13eda93667a..56e3c7646066 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -173,8 +173,8 @@ public class StreamerUtil {
   public static HoodiePayloadConfig getPayloadConfig(Configuration conf) {
     return HoodiePayloadConfig.newBuilder()
         .withPayloadClass(conf.get(FlinkOptions.PAYLOAD_CLASS_NAME))
-        .withPayloadOrderingFields(conf.get(FlinkOptions.ORDERING_FIELDS))
-        .withPayloadEventTimeField(conf.get(FlinkOptions.ORDERING_FIELDS))
+        .withPayloadOrderingFields(OptionsResolver.getOrderingFieldsStr(conf))
+        .withPayloadEventTimeField(OptionsResolver.getOrderingFieldsStr(conf))
         .build();
   }
 
@@ -652,15 +652,18 @@ public class StreamerUtil {
    */
   public static void checkOrderingFields(Configuration conf, List<String> 
fields) {
     String orderingFields = conf.get(FlinkOptions.ORDERING_FIELDS);
-    if (!fields.contains(orderingFields)) {
+    if (null == orderingFields || !fields.contains(orderingFields)) {
       if (OptionsResolver.isDefaultHoodieRecordPayloadClazz(conf)) {
+        // default payload force set of some columns existed in schema as 
ordering ones
         throw new HoodieValidationException("Option '" + 
FlinkOptions.ORDERING_FIELDS.key()
                 + "' is required for payload class: " + 
DefaultHoodieRecordPayload.class.getName());
       }
-      if (orderingFields.equals(FlinkOptions.ORDERING_FIELDS.defaultValue())) {
+      if (null == orderingFields) {
+        // if there is no ordering fields we set them as no-precombine
         conf.set(FlinkOptions.ORDERING_FIELDS, FlinkOptions.NO_PRE_COMBINE);
       } else if (!orderingFields.equals(FlinkOptions.NO_PRE_COMBINE)) {
-        throw new HoodieValidationException("Field " + orderingFields + " does 
not exist in the table schema."
+        // but if no-precombine was passed initially then we shouldn't fail 
here on schema check
+        throw new HoodieValidationException("Field " + orderingFields + " does 
not exist in the table schema. "
                 + "Please check '" + FlinkOptions.ORDERING_FIELDS.key() + "' 
option.");
       }
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
index f2646aba5269..3ae8ea520f2c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -62,8 +62,6 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -90,8 +88,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 @ExtendWith(FlinkMiniCluster.class)
 public class ITTestHoodieFlinkCompactor {
 
-  protected static final Logger LOG = 
LoggerFactory.getLogger(ITTestHoodieFlinkCompactor.class);
-
   private static final Map<String, List<String>> EXPECTED1 = new HashMap<>();
 
   private static final Map<String, List<String>> EXPECTED2 = new HashMap<>();
@@ -131,6 +127,8 @@ public class ITTestHoodieFlinkCompactor {
     options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
     options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
     options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
+    options.put(FlinkOptions.ORDERING_FIELDS.key(), "ts");
     options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
     options.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), 
logBlockFormat);
     options.put(FlinkOptions.CHANGELOG_ENABLED.key(), enableChangelog + "");
@@ -146,6 +144,8 @@ public class ITTestHoodieFlinkCompactor {
     FlinkCompactionConfig cfg = new FlinkCompactionConfig();
     cfg.path = tempFile.getAbsolutePath();
     Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
     conf.set(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
 
     // create metaClient
@@ -206,6 +206,8 @@ public class ITTestHoodieFlinkCompactor {
     options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
     options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
     options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
+    options.put(FlinkOptions.ORDERING_FIELDS.key(), "ts");
     options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
     String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
     tableEnv.executeSql(hoodieTableDDL);
@@ -219,6 +221,8 @@ public class ITTestHoodieFlinkCompactor {
     FlinkCompactionConfig cfg = new FlinkCompactionConfig();
     cfg.path = tempFile.getAbsolutePath();
     Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
     conf.set(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
 
     // create metaClient
@@ -293,6 +297,8 @@ public class ITTestHoodieFlinkCompactor {
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
     options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
+    options.put(FlinkOptions.ORDERING_FIELDS.key(), "ts");
     options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
     options.put(FlinkOptions.CHANGELOG_ENABLED.key(), enableChangelog + "");
     String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
@@ -309,6 +315,8 @@ public class ITTestHoodieFlinkCompactor {
     cfg.minCompactionIntervalSeconds = 3;
     cfg.schedule = true;
     Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
     conf.set(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
     conf.set(FlinkOptions.COMPACTION_TASKS, 
FlinkMiniCluster.DEFAULT_PARALLELISM);
 
@@ -334,6 +342,8 @@ public class ITTestHoodieFlinkCompactor {
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
     options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
+    options.put(FlinkOptions.ORDERING_FIELDS.key(), "ts");
     options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
     options.put(FlinkOptions.CHANGELOG_ENABLED.key(), enableChangelog + "");
     String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
@@ -344,6 +354,8 @@ public class ITTestHoodieFlinkCompactor {
     FlinkCompactionConfig cfg = new FlinkCompactionConfig();
     cfg.path = tempFile.getAbsolutePath();
     Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
     conf.set(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
 
     HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
@@ -412,6 +424,8 @@ public class ITTestHoodieFlinkCompactor {
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "2");
     options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
+    options.put(FlinkOptions.ORDERING_FIELDS.key(), "ts");
     options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
     String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
     tableEnv.executeSql(hoodieTableDDL);
@@ -429,6 +443,8 @@ public class ITTestHoodieFlinkCompactor {
     FlinkCompactionConfig cfg = new FlinkCompactionConfig();
     cfg.path = tempFile.getAbsolutePath();
     Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
 
     assertDoesNotThrow(() -> runOfflineCompact(tableEnv, conf));
     assertNoDuplicateFile(conf);
@@ -467,6 +483,8 @@ public class ITTestHoodieFlinkCompactor {
     options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
     options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
     options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
+    options.put(FlinkOptions.ORDERING_FIELDS.key(), "ts");
     options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
     String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
     tableEnv.executeSql(hoodieTableDDL);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 60a55a74f1a5..ef0d57c1c683 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -83,6 +83,7 @@ import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -94,7 +95,6 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static java.util.Arrays.asList;
-import static java.util.Arrays.stream;
 import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
 import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
 import static org.apache.hudi.utils.TestConfigurations.catalog;
@@ -107,6 +107,7 @@ import static org.apache.hudi.utils.TestData.map;
 import static org.apache.hudi.utils.TestData.row;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertLinesMatch;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -148,6 +149,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.READ_AS_STREAMING, true)
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .end();
@@ -159,6 +161,7 @@ public class ITTestHoodieDataSource {
     streamTableEnv.executeSql("drop table t1");
     hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.READ_AS_STREAMING, true)
         .option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
         .option(FlinkOptions.TABLE_TYPE, tableType)
@@ -192,6 +195,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.READ_AS_STREAMING, true)
         .option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
         .option(FlinkOptions.CDC_ENABLED, true)
@@ -226,6 +230,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.READ_AS_STREAMING, true)
         .option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
         .option(FlinkOptions.TABLE_TYPE, tableType)
@@ -256,6 +261,7 @@ public class ITTestHoodieDataSource {
 
     String createHoodieTable = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.READ_AS_STREAMING, true)
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .end();
@@ -271,6 +277,7 @@ public class ITTestHoodieDataSource {
     // now we consume starting from the oldest commit
     String createHoodieTable2 = sql("t2")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.READ_AS_STREAMING, true)
         .option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
         .option(FlinkOptions.TABLE_TYPE, tableType)
@@ -291,6 +298,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .end();
     streamTableEnv.executeSql(hoodieTableDDL);
     String insertInto = "insert into t1 select * from source";
@@ -309,6 +317,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
         // read optimized is supported for both MOR and COR table,
         // test MOR streaming write with compaction then reads as
@@ -343,6 +352,7 @@ public class ITTestHoodieDataSource {
   void testStreamWriteBatchReadOptimizedWithoutCompaction() {
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
         .option(FlinkOptions.QUERY_TYPE, 
FlinkOptions.QUERY_TYPE_READ_OPTIMIZED)
         .end();
@@ -364,6 +374,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
         .option(FlinkOptions.READ_AS_STREAMING, true)
         .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
@@ -390,6 +401,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.OPERATION, "insert")
         .option(FlinkOptions.READ_AS_STREAMING, true)
         .option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true)
@@ -418,6 +430,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.OPERATION, "insert")
         .option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true)
         .option(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true)
@@ -452,6 +465,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.CLEAN_RETAIN_COMMITS, 1)
         .end();
     streamTableEnv.executeSql(hoodieTableDDL);
@@ -472,6 +486,7 @@ public class ITTestHoodieDataSource {
   void testBatchWriteWithCleaning() {
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.CLEAN_RETAIN_COMMITS, 1)
         .end();
     batchTableEnv.executeSql(hoodieTableDDL);
@@ -496,6 +511,8 @@ public class ITTestHoodieDataSource {
 
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.set(FlinkOptions.TABLE_NAME, "t1");
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
     conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
     conf.set(FlinkOptions.CHANGELOG_ENABLED, true);
 
@@ -508,6 +525,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
         .option(FlinkOptions.READ_AS_STREAMING, true)
         .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2)
@@ -533,6 +551,7 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = batchTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.METADATA_ENABLED, true)
         .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
         
.option(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(), true)
@@ -563,6 +582,7 @@ public class ITTestHoodieDataSource {
   void testReadWithPartitionStatsPruning(HoodieTableType tableType, boolean 
hiveStylePartitioning) throws Exception {
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.METADATA_ENABLED, true)
         .option(FlinkOptions.READ_AS_STREAMING, true)
         .option(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), 
false)
@@ -572,6 +592,8 @@ public class ITTestHoodieDataSource {
         .end();
     streamTableEnv.executeSql(hoodieTableDDL);
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
     
conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), 
"true");
     conf.set(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true);
     conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
@@ -614,6 +636,8 @@ public class ITTestHoodieDataSource {
   void testStreamReadFilterByPartition(HoodieTableType tableType, boolean 
hiveStylePartitioning) throws Exception {
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.set(FlinkOptions.TABLE_NAME, "t1");
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
     conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
     conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning);
 
@@ -622,6 +646,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .option(FlinkOptions.READ_AS_STREAMING, true)
         .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2)
@@ -645,6 +670,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
         .option(FlinkOptions.READ_AS_STREAMING, true)
         .option(FlinkOptions.READ_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST)
@@ -669,6 +695,7 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : 
streamTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
         .end();
     tableEnv.executeSql(hoodieTableDDL);
@@ -698,16 +725,19 @@ public class ITTestHoodieDataSource {
         .field("tss timestamp(3)") // use a different field with default 
precombine field 'ts'
         .field("`partition` varchar(10)")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.RECORD_KEY_FIELD, "uuid")
+        .option(FlinkOptions.ORDERING_FIELDS, "ts")
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
         .end();
     tableEnv.executeSql(hoodieTableDDL);
 
-    execInsertSql(tableEnv, TestSQL.INSERT_SAME_KEY_T1);
-
-    List<Row> result1 = CollectionUtil.iterableToList(
-        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
-    assertRowsEquals(result1, "[+I[id1, Danny, 23, 1970-01-01T00:00:01, 
par1]]");
+    ValidationException exception =
+        assertThrows(ValidationException.class, () -> execInsertSql(tableEnv, 
TestSQL.INSERT_SAME_KEY_T1));
+    assertLinesMatch(
+        Collections.singletonList("Field ts does not exist in the table 
schema. Please check '"
+            + FlinkOptions.ORDERING_FIELDS.key() + "' option."),
+        Collections.singletonList(exception.getCause().getMessage()));
   }
 
   @ParameterizedTest
@@ -721,9 +751,10 @@ public class ITTestHoodieDataSource {
         .field("ts timestamp(3)") // use the default precombine field 'ts'
         .field("`partition` varchar(10)")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.RECORD_KEY_FIELD, "uuid")
+        .option(FlinkOptions.ORDERING_FIELDS, FlinkOptions.NO_PRE_COMBINE)
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
-        .option(FlinkOptions.ORDERING_FIELDS, FlinkOptions.NO_PRE_COMBINE)
         .end();
     tableEnv.executeSql(hoodieTableDDL);
 
@@ -740,6 +771,7 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = batchTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .option("hoodie.parquet.small.file.limit", "0") // invalidate the 
small file strategy
         .option("hoodie.parquet.max.file.size", "0")
@@ -766,6 +798,7 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = batchTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
         .end();
@@ -791,12 +824,14 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = streamTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath() + "/t1")
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .end();
     tableEnv.executeSql(hoodieTableDDL);
 
     String hoodieTableDDL2 = sql("t2")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath() + "/t2")
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .end();
     tableEnv.executeSql(hoodieTableDDL2);
@@ -878,6 +913,7 @@ public class ITTestHoodieDataSource {
         .pkField("id")
         .noPartition()
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.ORDERING_FIELDS, "ts")
         .option(FlinkOptions.READ_AS_STREAMING, streaming)
         .option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
         .end();
@@ -912,6 +948,7 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = batchTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .option(FlinkOptions.INDEX_TYPE, indexType)
         .option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false)
@@ -983,6 +1020,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.READ_AS_STREAMING, true)
         .option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
         .option(FlinkOptions.TABLE_TYPE, tableType)
@@ -1005,6 +1043,7 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : 
streamTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.WRITE_BATCH_SIZE, "0.001")
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .end();
@@ -1034,6 +1073,7 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : 
streamTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.WRITE_BATCH_SIZE, "0.001")
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .option(FlinkOptions.INDEX_GLOBAL_ENABLED, true)
@@ -1069,6 +1109,7 @@ public class ITTestHoodieDataSource {
         .pkField("id")
         .noPartition()
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.ORDERING_FIELDS, "ts")
         .option(FlinkOptions.PAYLOAD_CLASS_NAME, 
DefaultHoodieRecordPayload.class.getName())
         .end();
     tableEnv.executeSql(hoodieTableDDL);
@@ -1092,6 +1133,7 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : 
streamTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .noPartition()
         .end();
@@ -1124,6 +1166,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.INDEX_GLOBAL_ENABLED, true)
         .option(FlinkOptions.PRE_COMBINE, true)
         .end();
@@ -1146,6 +1189,7 @@ public class ITTestHoodieDataSource {
     streamTableEnv.executeSql(createSource);
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.INDEX_GLOBAL_ENABLED, false)
         .option(FlinkOptions.PRE_COMBINE, true)
         .end();
@@ -1171,6 +1215,7 @@ public class ITTestHoodieDataSource {
     // create a flink source table
     String createHoodieTable = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.READ_AS_STREAMING, "true")
         .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
         .end();
@@ -1194,6 +1239,7 @@ public class ITTestHoodieDataSource {
     // create a flink source table
     String createHoodieTable = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
         .end();
     batchTableEnv.executeSql(createHoodieTable);
@@ -1275,6 +1321,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("hoodie_sink")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.OPERATION, "bulk_insert")
         .option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT, true)
         .option(FlinkOptions.INDEX_TYPE, indexType)
@@ -1316,6 +1363,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql(catalogName + ".hudi.hoodie_sink")
         .option(FlinkOptions.PATH, basePath)
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .option(FlinkOptions.OPERATION, operationType)
         .option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT, true)
@@ -1377,6 +1425,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql(catalogName + ".hudi.hoodie_sink")
         .option(FlinkOptions.PATH, basePath)
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .option(FlinkOptions.OPERATION, operationType)
         .option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT, true)
@@ -1405,6 +1454,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.OPERATION, "bulk_insert")
         .option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT, true)
         .option(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT, true)
@@ -1430,6 +1480,7 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = batchTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.OPERATION, "bulk_insert")
         .noPartition()
         .end();
@@ -1468,6 +1519,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("hoodie_sink")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.OPERATION, "insert")
         .option(FlinkOptions.INSERT_CLUSTER, clustering)
         .end();
@@ -1494,6 +1546,7 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : 
streamTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
         .partitionField("ts") // use timestamp as partition path field
         .end();
@@ -1519,6 +1572,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
         .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
         .option(FlinkOptions.COMPACTION_TASKS, 1)
@@ -1543,6 +1597,7 @@ public class ITTestHoodieDataSource {
         .field("age int")
         .field("ts date")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.PARTITION_FORMAT, partitionFormat)
         .partitionField("ts") // use date as partition path field
         .end();
@@ -1596,6 +1651,8 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = batchTableEnv;
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.set(FlinkOptions.TABLE_NAME, "t1");
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
     conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
 
     // write 3 batches of data set
@@ -1607,6 +1664,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .option(FlinkOptions.READ_START_COMMIT, latestCommit)
         .end();
@@ -1623,6 +1681,8 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = streamTableEnv;
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.set(FlinkOptions.TABLE_NAME, "t1");
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
     conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
     conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, compactionEnabled);
     conf.set(FlinkOptions.READ_CDC_FROM_CHANGELOG, false); // calculate the 
changes on the fly
@@ -1638,6 +1698,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, compactionEnabled)
         .option(FlinkOptions.READ_CDC_FROM_CHANGELOG, false)
@@ -1668,6 +1729,8 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = streamTableEnv;
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.set(FlinkOptions.TABLE_NAME, "t1");
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
     conf.set(FlinkOptions.TABLE_TYPE, MERGE_ON_READ.name());
     conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, compactionEnabled);
     // schedule compaction after 2 commits
@@ -1685,6 +1748,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
         .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, compactionEnabled)
         .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 2)
@@ -1720,6 +1784,8 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = batchTableEnv;
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.set(FlinkOptions.TABLE_NAME, "t1");
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
     conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
     conf.set(FlinkOptions.ARCHIVE_MIN_COMMITS, 4);
     conf.set(FlinkOptions.ARCHIVE_MAX_COMMITS, 5);
@@ -1736,6 +1802,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .option(FlinkOptions.READ_START_COMMIT, secondArchived)
         .end();
@@ -1753,6 +1820,8 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = batchTableEnv;
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.set(FlinkOptions.TABLE_NAME, "t1");
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
     conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
 
     // write a batch of data set
@@ -1768,6 +1837,7 @@ public class ITTestHoodieDataSource {
         .pkField("uuid")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
         .option(FlinkOptions.TABLE_TYPE, tableType)
+        .option(FlinkOptions.ORDERING_FIELDS, "ts")
         .end();
     tableEnv.executeSql(hoodieTableDDL);
 
@@ -1947,6 +2017,7 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = batchTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.METADATA_ENABLED, true)
         .option("hoodie.metadata.index.column.stats.enable", true)
         .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
@@ -2045,6 +2116,7 @@ public class ITTestHoodieDataSource {
     String path = tempFile.getAbsolutePath();
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, path)
+        .options(getDefaultKeys())
         .option(FlinkOptions.METADATA_ENABLED, true)
         .option("hoodie.metadata.index.column.stats.enable", true)
         .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
@@ -2066,6 +2138,7 @@ public class ITTestHoodieDataSource {
 
     hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, path)
+        .options(getDefaultKeys())
         .option(FlinkOptions.METADATA_ENABLED, true)
         .option("hoodie.metadata.index.column.stats.enable", true)
         .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
@@ -2096,6 +2169,7 @@ public class ITTestHoodieDataSource {
     String path = tempFile.getAbsolutePath();
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, path)
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
         .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
         .option(FlinkOptions.COMPACTION_TASKS, 1)
@@ -2110,6 +2184,7 @@ public class ITTestHoodieDataSource {
 
     hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, path)
+        .options(getDefaultKeys())
         .option(FlinkOptions.METADATA_ENABLED, true)
         .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
         .end();
@@ -2128,6 +2203,7 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = batchTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.METADATA_ENABLED, true)
         .option("hoodie.metadata.index.column.stats.enable", true)
         .option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet")
@@ -2169,6 +2245,7 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = batchTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.METADATA_ENABLED, true)
         .option("hoodie.metadata.index.column.stats.enable", true)
         .option("hoodie.metadata.index.column.stats.file.group.count", 2)
@@ -2192,6 +2269,7 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = batchTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.METADATA_ENABLED, true)
         .option("hoodie.metadata.index.column.stats.enable", true)
         .option("hoodie.metadata.index.column.stats.file.group.count", 2)
@@ -2215,6 +2293,7 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = batchTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.METADATA_ENABLED, false)
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .end();
@@ -2249,6 +2328,7 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = batchTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.INDEX_TYPE, "BUCKET")
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .end();
@@ -2466,6 +2546,7 @@ public class ITTestHoodieDataSource {
         .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
         .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
         .option(FlinkOptions.PATH, path)
+        .option(FlinkOptions.ORDERING_FIELDS, "ts")
         .end();
     batchTableEnv.executeSql(hoodieTableDDL);
 
@@ -2527,6 +2608,7 @@ public class ITTestHoodieDataSource {
         .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
         .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
         .option(FlinkOptions.PATH, path)
+        .option(FlinkOptions.ORDERING_FIELDS, "ts")
         .end();
     batchTableEnv.executeSql(readHoodieTableDDL);
 
@@ -2549,6 +2631,7 @@ public class ITTestHoodieDataSource {
         .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
         .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
         .option(FlinkOptions.PATH, path)
+        .option(FlinkOptions.ORDERING_FIELDS, "ts")
         .end();
     batchTableEnv.executeSql(readHoodieTableDDL);
 
@@ -2571,6 +2654,7 @@ public class ITTestHoodieDataSource {
         .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
         .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
         .option(FlinkOptions.PATH, path)
+        .option(FlinkOptions.ORDERING_FIELDS, "ts")
         .end();
     batchTableEnv.executeSql(readHoodieTableDDL);
 
@@ -2584,6 +2668,8 @@ public class ITTestHoodieDataSource {
   void testDynamicPartitionPrune(HoodieTableType tableType, boolean 
hiveStylePartitioning) throws Exception {
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.set(FlinkOptions.TABLE_NAME, "t1");
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
     conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
     conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning);
 
@@ -2591,6 +2677,7 @@ public class ITTestHoodieDataSource {
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .option(FlinkOptions.READ_AS_STREAMING, true)
         .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2)
@@ -2616,6 +2703,7 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = batchTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .option(FlinkOptions.INDEX_TYPE, indexType)
         .end();
@@ -2672,6 +2760,7 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .option(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false")
         .end();
@@ -2682,6 +2771,7 @@ public class ITTestHoodieDataSource {
     streamTableEnv.executeSql("drop table t1");
     hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .end();
     streamTableEnv.executeSql(hoodieTableDDL);
@@ -2692,7 +2782,10 @@ public class ITTestHoodieDataSource {
   @Test
   void testReadWithParquetPredicatePushDown() {
     TableEnvironment tableEnv = batchTableEnv;
-    String hoodieTableDDL = sql("t1").option(FlinkOptions.PATH, 
tempFile.getAbsolutePath()).end();
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
+        .end();
     tableEnv.executeSql(hoodieTableDDL);
     execInsertSql(tableEnv, TestSQL.INSERT_T1);
     // apply filters to push down predicates
@@ -2713,6 +2806,7 @@ public class ITTestHoodieDataSource {
     // insert first batch of data with parquet log block
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ)
         .option(FlinkOptions.INDEX_TYPE, indexType)
         .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
@@ -2728,6 +2822,7 @@ public class ITTestHoodieDataSource {
     // insert second batch of data with avro log block
     hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ)
         .option(FlinkOptions.INDEX_TYPE, indexType)
         .option(FlinkOptions.READ_AS_STREAMING, true)
@@ -2754,6 +2849,7 @@ public class ITTestHoodieDataSource {
     // insert first batch of data with rowdata mode writing disabled
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ)
         .option(FlinkOptions.INDEX_TYPE, index)
         .option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet")
@@ -2775,6 +2871,7 @@ public class ITTestHoodieDataSource {
         JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Default);
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
         .option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false)
         .end();
@@ -2795,6 +2892,7 @@ public class ITTestHoodieDataSource {
     // init and write data with table version SIX
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ)
         .option(FlinkOptions.WRITE_TABLE_VERSION, 
HoodieTableVersion.SIX.versionCode() + "")
         .option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false)
@@ -2807,6 +2905,7 @@ public class ITTestHoodieDataSource {
 
     hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ)
         .option(FlinkOptions.WRITE_TABLE_VERSION, 
HoodieTableVersion.EIGHT.versionCode() + "")
         .option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false)
@@ -2826,6 +2925,7 @@ public class ITTestHoodieDataSource {
   void testWriteWithTimelineServerBasedMarker(HoodieTableType tableType) {
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .option(HoodieWriteConfig.MARKERS_TYPE.key(), 
MarkerType.TIMELINE_SERVER_BASED.name())
         .end();
@@ -2909,6 +3009,7 @@ public class ITTestHoodieDataSource {
     TableEnvironment tableEnv = streamTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
         .option(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name())
         .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
         .option(FlinkOptions.TABLE_TYPE, COPY_ON_WRITE)
@@ -2928,6 +3029,13 @@ public class ITTestHoodieDataSource {
     BATCH, STREAM
   }
 
+  public static Map<String, String> getDefaultKeys() {
+    Configuration conf = new Configuration();
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
+    return conf.toMap();
+  }
+
   /**
    * Return test params => (execution mode, table type).
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index 9558250b89c6..bd38b0d7e100 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -88,6 +88,7 @@ public class TestHoodieTableFactory {
     this.conf = new Configuration();
     this.conf.set(FlinkOptions.PATH, tempFile.getAbsolutePath());
     this.conf.set(FlinkOptions.TABLE_NAME, "t1");
+    this.conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
     StreamerUtil.initTableIfNotExists(this.conf);
   }
 
@@ -110,12 +111,9 @@ public class TestHoodieTableFactory {
     final MockContext sourceContext11 = MockContext.getInstance(this.conf, 
schema1, "f2");
     assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSource(sourceContext11));
     assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext11));
-    //miss the pre combine key will be ok
-    HoodieTableSink tableSink11 = (HoodieTableSink) new 
HoodieTableFactory().createDynamicTableSink(sourceContext11);
-    assertThat(tableSink11.getConf().get(FlinkOptions.ORDERING_FIELDS), 
is(FlinkOptions.NO_PRE_COMBINE));
-    this.conf.set(FlinkOptions.OPERATION, 
FlinkOptions.OPERATION.defaultValue());
 
     // a non-exists precombine key will throw exception
+    this.conf.set(FlinkOptions.OPERATION, "upsert");
     ResolvedSchema schema2 = SchemaBuilder.instance()
         .field("f0", DataTypes.INT().notNull())
         .field("f1", DataTypes.VARCHAR(20))
@@ -126,7 +124,6 @@ public class TestHoodieTableFactory {
     // createDynamicTableSource doesn't call sanity check, will not throw 
exception
     assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSource(sourceContext2));
     assertThrows(HoodieValidationException.class, () -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext2));
-    this.conf.set(FlinkOptions.ORDERING_FIELDS, 
FlinkOptions.ORDERING_FIELDS.defaultValue());
 
     // given the pk but miss the pre combine key will be ok
     ResolvedSchema schema3 = SchemaBuilder.instance()
@@ -135,6 +132,7 @@ public class TestHoodieTableFactory {
         .field("f2", DataTypes.TIMESTAMP(3))
         .primaryKey("f0")
         .build();
+    this.conf.removeConfig(FlinkOptions.ORDERING_FIELDS);
     final MockContext sourceContext3 = MockContext.getInstance(this.conf, 
schema3, "f2");
     HoodieTableSource tableSource = (HoodieTableSource) new 
HoodieTableFactory().createDynamicTableSource(sourceContext3);
     HoodieTableSink tableSink = (HoodieTableSink) new 
HoodieTableFactory().createDynamicTableSink(sourceContext3);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 8431a8e0490b..7d3dcc64d08c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -117,6 +117,9 @@ public class TestInputFormat {
 
   void beforeEach(HoodieTableType tableType, Map<String, String> options) 
throws IOException {
     conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    // all test cases here expect former default values for record key and 
ordering fields
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
     conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
     if (!conf.contains(FlinkOptions.COMPACTION_ASYNC_ENABLED)) {
       conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // by default 
close the async compaction
@@ -1222,6 +1225,8 @@ public class TestInputFormat {
     conf.set(FlinkOptions.PATH, tempFile.getAbsolutePath());
     conf.set(FlinkOptions.TABLE_NAME, "TestHoodieTable");
     conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
     conf.set(FlinkOptions.PARTITION_PATH_FIELD, "partition");
     conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA.key(), 
HoodieSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE_DECIMAL_ORDERING).toString());
     conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // by default 
close the async compaction
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index 46c90afa7a10..e2a4558e0ee3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -363,6 +363,8 @@ public class TestConfigurations {
   public static Configuration getDefaultConf(String tablePath) {
     Configuration conf = new Configuration();
     conf.set(FlinkOptions.PATH, tablePath);
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
     conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH,
         Objects.requireNonNull(Thread.currentThread()
             
.getContextClassLoader().getResource("test_read_schema.avsc")).toString());
@@ -374,6 +376,8 @@ public class TestConfigurations {
   public static Configuration getDefaultConf(String tablePath, DataType 
dataType) {
     Configuration conf = new Configuration();
     conf.set(FlinkOptions.PATH, tablePath);
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
     conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, 
HoodieSchemaConverter.convertToSchema(dataType.getLogicalType()).toString());
     conf.set(FlinkOptions.TABLE_NAME, "TestHoodieTable");
     conf.set(FlinkOptions.PARTITION_PATH_FIELD, "partition");

Reply via email to