This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6ab6d69b17af fix: Remove default record key and ordering fields values
on the Flink side, consistent with Spark (#17994)
6ab6d69b17af is described below
commit 6ab6d69b17afef6866f7dc6f8b13f264894bfa42
Author: Geser Dugarov <[email protected]>
AuthorDate: Wed Jan 28 10:18:17 2026 +0700
fix: Remove default record key and ordering fields values on the Flink
side, consistent with Spark (#17994)
---
.../org/apache/hudi/config/HoodieIndexConfig.java | 14 ++-
.../apache/hudi/config/HoodiePayloadConfig.java | 15 ++-
.../examples/quickstart/HoodieFlinkQuickstart.java | 2 +
.../apache/hudi/configuration/FlinkOptions.java | 4 +-
.../apache/hudi/configuration/OptionsResolver.java | 25 +++--
.../hudi/sink/append/AppendWriteFunctions.java | 14 ++-
.../org/apache/hudi/sink/bulk/RowDataKeyGens.java | 4 +-
.../hudi/sink/compact/HoodieFlinkCompactor.java | 7 ++
.../java/org/apache/hudi/sink/utils/Pipelines.java | 2 +-
.../hudi/source/prune/PrimaryKeyPruners.java | 3 +-
.../org/apache/hudi/table/HoodieTableFactory.java | 29 +++--
.../org/apache/hudi/table/HoodieTableSource.java | 2 +-
.../apache/hudi/table/catalog/HoodieCatalog.java | 3 +-
.../hudi/table/catalog/HoodieCatalogUtil.java | 13 ++-
.../hudi/table/catalog/HoodieHiveCatalog.java | 2 +-
.../java/org/apache/hudi/util/StreamerUtil.java | 13 ++-
.../sink/compact/ITTestHoodieFlinkCompactor.java | 26 ++++-
.../apache/hudi/table/ITTestHoodieDataSource.java | 124 +++++++++++++++++++--
.../apache/hudi/table/TestHoodieTableFactory.java | 8 +-
.../apache/hudi/table/format/TestInputFormat.java | 5 +
.../org/apache/hudi/utils/TestConfigurations.java | 4 +
21 files changed, 249 insertions(+), 70 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index 5c00d0d4bcb1..9cedac1be74d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -719,12 +719,22 @@ public class HoodieIndexConfig extends HoodieConfig {
}
public Builder withIndexKeyField(String keyField) {
- hoodieIndexConfig.setValue(BUCKET_INDEX_HASH_FIELD, keyField);
+ if (StringUtils.nonEmpty(keyField)) {
+ hoodieIndexConfig.setValue(BUCKET_INDEX_HASH_FIELD, keyField);
+ } else {
+ log.warn("'{}' wasn't set during Hoodie index building due to absent
key field passed.",
+ BUCKET_INDEX_HASH_FIELD.key());
+ }
return this;
}
public Builder withRecordKeyField(String keyField) {
- hoodieIndexConfig.setValue(KeyGeneratorOptions.RECORDKEY_FIELD_NAME,
keyField);
+ if (StringUtils.nonEmpty(keyField)) {
+ hoodieIndexConfig.setValue(KeyGeneratorOptions.RECORDKEY_FIELD_NAME,
keyField);
+ } else {
+ log.warn("'{}' wasn't set during Hoodie index building due to absent
key field passed.",
+ KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
+ }
return this;
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
index 37106389af5d..e733fe6e2375 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
@@ -25,6 +25,8 @@ import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.StringUtils;
+import lombok.extern.slf4j.Slf4j;
+
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
@@ -36,6 +38,7 @@ import static
org.apache.hudi.common.model.HoodiePayloadProps.PAYLOAD_ORDERING_F
/**
* Hoodie payload related configs.
*/
+@Slf4j
@ConfigClassProperty(name = "Payload Configurations",
groupName = ConfigGroups.Names.RECORD_PAYLOAD,
description = "Payload related configs, that can be leveraged to "
@@ -44,7 +47,7 @@ public class HoodiePayloadConfig extends HoodieConfig {
public static final ConfigProperty<String> EVENT_TIME_FIELD = ConfigProperty
.key(PAYLOAD_EVENT_TIME_FIELD_PROP_KEY)
- .defaultValue("ts")
+ .noDefaultValue()
.markAdvanced()
.withDocumentation("Table column/field name to derive timestamp
associated with the records. This can"
+ "be useful for e.g, determining the freshness of the table.");
@@ -97,12 +100,20 @@ public class HoodiePayloadConfig extends HoodieConfig {
public Builder withPayloadOrderingFields(String payloadOrderingFields) {
if (StringUtils.nonEmpty(payloadOrderingFields)) {
payloadConfig.setValue(ORDERING_FIELDS, payloadOrderingFields);
+ } else {
+ log.warn("'{}' wasn't set during Hoodie payload building due to absent
key field passed.",
+ ORDERING_FIELDS.key());
}
return this;
}
public Builder withPayloadEventTimeField(String payloadEventTimeField) {
- payloadConfig.setValue(EVENT_TIME_FIELD,
String.valueOf(payloadEventTimeField));
+ if (StringUtils.nonEmpty(payloadEventTimeField)) {
+ payloadConfig.setValue(EVENT_TIME_FIELD, payloadEventTimeField);
+ } else {
+ log.warn("'{}' wasn't set during Hoodie payload building due to absent
key field passed.",
+ EVENT_TIME_FIELD.key());
+ }
return this;
}
diff --git
a/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/HoodieFlinkQuickstart.java
b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/HoodieFlinkQuickstart.java
index b657430f8fea..1b527651de88 100644
---
a/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/HoodieFlinkQuickstart.java
+++
b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/HoodieFlinkQuickstart.java
@@ -139,6 +139,8 @@ public final class HoodieFlinkQuickstart {
// create hudi table
String hoodieTableDDL = sql(tableName)
.option(FlinkOptions.PATH, tablePath)
+ .option(FlinkOptions.RECORD_KEY_FIELD, "uuid")
+ .option(FlinkOptions.ORDERING_FIELDS, "ts")
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 773c8721900f..d58cde528105 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -114,7 +114,7 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<String> ORDERING_FIELDS = ConfigOptions
.key("ordering.fields")
.stringType()
- .defaultValue("ts")
+ .noDefaultValue()
.withFallbackKeys("precombine.field", "write.precombine.field",
HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key())
.withDescription("Comma separated list of fields used in records
merging. When two records have the same\n"
+ "key value, we will pick the one with the largest value for the
ordering field,\n"
@@ -539,7 +539,7 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<String> RECORD_KEY_FIELD = ConfigOptions
.key(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
.stringType()
- .defaultValue("uuid")
+ .noDefaultValue()
.withDescription("Record key field. Value to be used as the `recordKey`
component of `HoodieKey`.\n"
+ "Actual value will be obtained by invoking .toString() on the
field value. Nested fields can be specified using "
+ "the dot notation eg: `a.b.c`");
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 6f083daac07a..fd69ed9e61e2 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -34,6 +34,7 @@ import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
@@ -151,13 +152,26 @@ public class OptionsResolver {
return
conf.get(FlinkOptions.PAYLOAD_CLASS_NAME).contains(DefaultHoodieRecordPayload.class.getSimpleName());
}
+ /**
+ * Return value of {@link FlinkOptions#RECORD_KEY_FIELD} if it was set,
+ * or throw exception otherwise.
+ */
+ public static String getRecordKeyStr(Configuration conf) {
+ final String recordKeyStr = conf.get(FlinkOptions.RECORD_KEY_FIELD);
+ ValidationUtils.checkArgument(
+ recordKeyStr != null,
+ "Primary key definition is required, use either PRIMARY KEY syntax or
option '"
+ + FlinkOptions.RECORD_KEY_FIELD.key() + "' to specify.");
+ return recordKeyStr;
+ }
+
/**
* Returns the ordering fields as comma separated string
* or null if the value is set as {@link FlinkOptions#NO_PRE_COMBINE}.
*/
public static String getOrderingFieldsStr(Configuration conf) {
final String orderingFields = conf.get(FlinkOptions.ORDERING_FIELDS);
- return orderingFields.equals(FlinkOptions.NO_PRE_COMBINE) ? null :
orderingFields;
+ return FlinkOptions.NO_PRE_COMBINE.equals(orderingFields) ? null :
orderingFields;
}
/**
@@ -429,14 +443,7 @@ public class OptionsResolver {
* Returns the index key field.
*/
public static String getIndexKeyField(Configuration conf) {
- return conf.getString(FlinkOptions.INDEX_KEY_FIELD.key(),
conf.get(FlinkOptions.RECORD_KEY_FIELD));
- }
-
- /**
- * Returns the index key field values.
- */
- public static String[] getIndexKeys(Configuration conf) {
- return getIndexKeyField(conf).split(",");
+ return conf.getString(FlinkOptions.INDEX_KEY_FIELD.key(),
getRecordKeyStr(conf));
}
/**
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java
index a73f923782a2..39cda29ab422 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java
@@ -21,14 +21,14 @@ package org.apache.hudi.sink.append;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.buffer.BufferType;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.types.logical.RowType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
@@ -38,11 +38,10 @@ import java.util.stream.Collectors;
* Factory utilities for creating {@link AppendWriteFunction} instances based
on configuration.
* Handles buffer type selection, sort key resolution, and rate limiting.
*/
+@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public abstract class AppendWriteFunctions {
- private static final Logger LOG =
LoggerFactory.getLogger(AppendWriteFunctions.class);
-
/**
* Creates a {@link AppendWriteFunction} instance based on the given
configuration.
*/
@@ -71,7 +70,7 @@ public abstract class AppendWriteFunctions {
// Backward compatibility: write.buffer.sort.enabled=true → DISRUPTOR
if (conf.get(FlinkOptions.WRITE_BUFFER_SORT_ENABLED)) {
- LOG.info("write.buffer.sort.enabled is deprecated. Use
write.buffer.type=DISRUPTOR instead.");
+ log.info("write.buffer.sort.enabled is deprecated. Use
write.buffer.type=DISRUPTOR instead.");
return BufferType.DISRUPTOR.name();
}
@@ -90,7 +89,10 @@ public abstract class AppendWriteFunctions {
String sortKeys = conf.get(FlinkOptions.WRITE_BUFFER_SORT_KEYS);
if (StringUtils.isNullOrEmpty(sortKeys)) {
// Default to record key field(s)
- sortKeys = conf.get(FlinkOptions.RECORD_KEY_FIELD);
+ log.info("'{}' is not set, therefore '{}' value will be used as sort
keys instead",
+ FlinkOptions.WRITE_BUFFER_SORT_KEYS.key(),
+ FlinkOptions.RECORD_KEY_FIELD.key());
+ sortKeys = OptionsResolver.getRecordKeyStr(conf);
}
ValidationUtils.checkArgument(StringUtils.nonEmpty(sortKeys),
"Sort keys can't be null or empty for append write with buffer sort. "
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
index df0e553feb29..fc6128e6918d 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
@@ -18,7 +18,7 @@
package org.apache.hudi.sink.bulk;
-import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.flink.configuration.Configuration;
@@ -37,7 +37,7 @@ public class RowDataKeyGens {
* Creates {@link RowDataKeyGen} of corresponding type depending on table
configuration.
*/
public static RowDataKeyGen instance(Configuration conf, RowType rowType,
@Nullable Integer taskId, @Nullable String instantTime) {
- String recordKeys = conf.get(FlinkOptions.RECORD_KEY_FIELD);
+ String recordKeys = OptionsResolver.getRecordKeyStr(conf);
if (hasRecordKey(recordKeys, rowType.getFieldNames())) {
return RowDataKeyGen.instance(conf, rowType);
} else {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index 367ff9e5f1a8..34ac7e801570 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.TableServiceUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
@@ -209,6 +210,12 @@ public class HoodieFlinkCompactor {
// get the table name
conf.set(FlinkOptions.TABLE_NAME,
metaClient.getTableConfig().getTableName());
+ // get the primary key if absent in conf, but presented in table configs
+ if (!conf.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())
+ &&
StringUtils.nonEmpty(metaClient.getTableConfig().getRecordKeyFieldProp())) {
+ conf.set(FlinkOptions.RECORD_KEY_FIELD,
metaClient.getTableConfig().getRecordKeyFieldProp());
+ }
+
// set table schema
CompactionUtil.setAvroSchema(conf, metaClient);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index ae0dce254222..cc7b523385a9 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -157,7 +157,7 @@ public class Pipelines {
if (conf.get(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
final boolean isNeededSortInput =
conf.get(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT_BY_RECORD_KEY);
final String[] partitionFields =
FilePathUtils.extractPartitionKeys(conf);
- final String[] recordKeyFields =
conf.get(FlinkOptions.RECORD_KEY_FIELD).split(",");
+ final String[] recordKeyFields =
OptionsResolver.getRecordKeyStr(conf).split(",");
// if sort input by record key is needed then add record keys to
partition keys
String[] sortFields = isNeededSortInput
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java
index 24c82fbffc6a..46a71c41518f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java
@@ -20,7 +20,6 @@ package org.apache.hudi.source.prune;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.hudi.util.ExpressionUtils;
@@ -44,7 +43,7 @@ import java.util.stream.Collectors;
public class PrimaryKeyPruners {
public static Function<Integer, Integer>
getBucketIdFunc(List<ResolvedExpression> hashKeyFilters, Configuration conf) {
- List<String> pkFields =
Arrays.asList(conf.get(FlinkOptions.RECORD_KEY_FIELD).split(","));
+ List<String> pkFields =
Arrays.asList(OptionsResolver.getRecordKeyStr(conf).split(","));
// step1: resolve the hash key values
final boolean logicalTimestamp =
OptionsResolver.isConsistentLogicalTimestampEnabled(conf);
List<String> values = hashKeyFilters.stream()
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 05d04553d0c0..c369434f604d 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -214,16 +214,8 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
*/
private void checkRecordKey(Configuration conf, ResolvedSchema schema) {
List<String> fields = schema.getColumnNames();
- if (!schema.getPrimaryKey().isPresent()) {
- String[] recordKeys = conf.get(FlinkOptions.RECORD_KEY_FIELD).split(",");
- if (recordKeys.length == 1
- && FlinkOptions.RECORD_KEY_FIELD.defaultValue().equals(recordKeys[0])
- && !fields.contains(recordKeys[0])) {
- throw new HoodieValidationException("Primary key definition is
required, the default primary key field "
- + "'" + FlinkOptions.RECORD_KEY_FIELD.defaultValue() + "' does not
exist in the table schema, "
- + "use either PRIMARY KEY syntax or option '" +
FlinkOptions.RECORD_KEY_FIELD.key() + "' to speciy.");
- }
-
+ if (schema.getPrimaryKey().isEmpty()) {
+ String[] recordKeys = OptionsResolver.getRecordKeyStr(conf).split(",");
Arrays.stream(recordKeys)
.filter(field -> !fields.contains(field))
.findAny()
@@ -232,8 +224,10 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
+ "'" + FlinkOptions.RECORD_KEY_FIELD.key() + "' does not
exist in the table schema.");
});
}
- if (schema.getPrimaryKey().isPresent() &&
conf.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) {
- log.warn("PRIMARY KEY syntax and option '{}' was used. Value of the
PRIMARY KEY will be used and option will be ignored!",
FlinkOptions.RECORD_KEY_FIELD.key());
+ if (schema.getPrimaryKey().isPresent() &&
conf.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) {
+ log.warn("PRIMARY KEY syntax and option '{}' was used. "
+ + "Value of the PRIMARY KEY will be used and option will be
ignored!",
+ FlinkOptions.RECORD_KEY_FIELD.key());
}
}
@@ -280,17 +274,20 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
conf.set(FlinkOptions.RECORD_KEY_FIELD, recordKey);
}
List<String> partitionKeys = table.getPartitionKeys();
- if (partitionKeys.size() > 0) {
+ if (!partitionKeys.isEmpty()) {
// the PARTITIONED BY syntax always has higher priority than option
FlinkOptions#PARTITION_PATH_FIELD
conf.set(FlinkOptions.PARTITION_PATH_FIELD, String.join(",",
partitionKeys));
}
// set index key for bucket index if not defined
if
(conf.get(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name()))
{
if (conf.get(FlinkOptions.INDEX_KEY_FIELD).isEmpty()) {
- conf.set(FlinkOptions.INDEX_KEY_FIELD,
conf.get(FlinkOptions.RECORD_KEY_FIELD));
+ log.info("'{}' is not set, therefore '{}' value will be used as index
key instead",
+ FlinkOptions.INDEX_KEY_FIELD.key(),
+ FlinkOptions.RECORD_KEY_FIELD.key());
+ conf.set(FlinkOptions.INDEX_KEY_FIELD,
OptionsResolver.getRecordKeyStr(conf));
} else {
Set<String> recordKeySet =
-
Arrays.stream(conf.get(FlinkOptions.RECORD_KEY_FIELD).split(",")).collect(Collectors.toSet());
+
Arrays.stream(OptionsResolver.getRecordKeyStr(conf).split(",")).collect(Collectors.toSet());
Set<String> indexKeySet =
Arrays.stream(conf.get(FlinkOptions.INDEX_KEY_FIELD).split(",")).collect(Collectors.toSet());
if (!recordKeySet.containsAll(indexKeySet)) {
@@ -302,7 +299,7 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
// tweak the key gen class if possible
final String[] partitions =
conf.get(FlinkOptions.PARTITION_PATH_FIELD).split(",");
- final String[] pks = conf.get(FlinkOptions.RECORD_KEY_FIELD).split(",");
+ final String[] pks = OptionsResolver.getRecordKeyStr(conf).split(",");
if (partitions.length == 1) {
final String partitionField = partitions[0];
if (partitionField.isEmpty()) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index b1bba78f0b07..32fe9e5d9f8f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -381,7 +381,7 @@ public class HoodieTableSource implements
if (!OptionsResolver.isBucketIndexType(conf) || dataFilters.isEmpty()) {
return Option.empty();
}
- Set<String> indexKeyFields =
Arrays.stream(OptionsResolver.getIndexKeys(conf)).collect(Collectors.toSet());
+ Set<String> indexKeyFields =
Arrays.stream(OptionsResolver.getIndexKeyField(conf).split(",")).collect(Collectors.toSet());
List<ResolvedExpression> indexKeyFilters =
dataFilters.stream().filter(expr -> ExpressionUtils.isEqualsLitExpr(expr,
indexKeyFields)).collect(Collectors.toList());
if (!ExpressionUtils.isFilteringByAllFields(indexKeyFilters,
indexKeyFields)) {
return Option.empty();
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
index cc8ed951f1c0..065a8a00c276 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
@@ -349,7 +350,7 @@ public class HoodieCatalog extends AbstractCatalog {
conf.set(FlinkOptions.PARTITION_PATH_FIELD, partitions);
options.put(TableOptionProperties.PARTITION_COLUMNS, partitions);
- final String[] pks = conf.get(FlinkOptions.RECORD_KEY_FIELD).split(",");
+ final String[] pks = OptionsResolver.getRecordKeyStr(conf).split(",");
boolean complexHoodieKey = pks.length > 1 ||
resolvedTable.getPartitionKeys().size() > 1;
StreamerUtil.checkKeygenGenerator(complexHoodieKey, conf);
} else {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
index ab54f3b393e0..7e37f67b18cd 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
@@ -22,7 +22,9 @@ import org.apache.hudi.adapter.Utils;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
@@ -256,13 +258,18 @@ public class HoodieCatalogUtil {
String tablePathStr,
ObjectPath tablePath,
org.apache.hadoop.conf.Configuration hadoopConf) {
- return FlinkWriteClients.createWriteClientV2(
+ HoodieTableConfig tableConfig =
StreamerUtil.createMetaClient(tablePathStr, hadoopConf).getTableConfig();
+ org.apache.flink.configuration.Configuration conf =
org.apache.flink.configuration.Configuration.fromMap(options)
.set(FlinkOptions.TABLE_NAME, tablePath.getObjectName())
.set(
FlinkOptions.SOURCE_AVRO_SCHEMA,
- StreamerUtil.createMetaClient(tablePathStr, hadoopConf)
-
.getTableConfig().getTableCreateSchema().get().toString()));
+ tableConfig.getTableCreateSchema().get().toString());
+ String recordKey = tableConfig.getRecordKeyFieldProp();
+ if (StringUtils.nonEmpty(recordKey)) {
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, recordKey);
+ }
+ return FlinkWriteClients.createWriteClientV2(conf);
}
private static boolean sameOptions(Map<String, String> parameters1,
Map<String, String> parameters2, ConfigOption<String> option) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 31017d1ebf15..5b1b1767c772 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -513,7 +513,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
if (catalogTable.isPartitioned() &&
!flinkConf.contains(FlinkOptions.PARTITION_PATH_FIELD)) {
final String partitions = String.join(",",
catalogTable.getPartitionKeys());
flinkConf.set(FlinkOptions.PARTITION_PATH_FIELD, partitions);
- final String[] pks =
flinkConf.get(FlinkOptions.RECORD_KEY_FIELD).split(",");
+ final String[] pks =
OptionsResolver.getRecordKeyStr(flinkConf).split(",");
boolean complexHoodieKey = pks.length > 1 ||
catalogTable.getPartitionKeys().size() > 1;
StreamerUtil.checkKeygenGenerator(complexHoodieKey, flinkConf);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index c13eda93667a..56e3c7646066 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -173,8 +173,8 @@ public class StreamerUtil {
public static HoodiePayloadConfig getPayloadConfig(Configuration conf) {
return HoodiePayloadConfig.newBuilder()
.withPayloadClass(conf.get(FlinkOptions.PAYLOAD_CLASS_NAME))
- .withPayloadOrderingFields(conf.get(FlinkOptions.ORDERING_FIELDS))
- .withPayloadEventTimeField(conf.get(FlinkOptions.ORDERING_FIELDS))
+ .withPayloadOrderingFields(OptionsResolver.getOrderingFieldsStr(conf))
+ .withPayloadEventTimeField(OptionsResolver.getOrderingFieldsStr(conf))
.build();
}
@@ -652,15 +652,18 @@ public class StreamerUtil {
*/
public static void checkOrderingFields(Configuration conf, List<String>
fields) {
String orderingFields = conf.get(FlinkOptions.ORDERING_FIELDS);
- if (!fields.contains(orderingFields)) {
+ if (null == orderingFields || !fields.contains(orderingFields)) {
if (OptionsResolver.isDefaultHoodieRecordPayloadClazz(conf)) {
+ // default payload force set of some columns existed in schema as
ordering ones
throw new HoodieValidationException("Option '" +
FlinkOptions.ORDERING_FIELDS.key()
+ "' is required for payload class: " +
DefaultHoodieRecordPayload.class.getName());
}
- if (orderingFields.equals(FlinkOptions.ORDERING_FIELDS.defaultValue())) {
+ if (null == orderingFields) {
+ // if there is no ordering fields we set them as no-precombine
conf.set(FlinkOptions.ORDERING_FIELDS, FlinkOptions.NO_PRE_COMBINE);
} else if (!orderingFields.equals(FlinkOptions.NO_PRE_COMBINE)) {
- throw new HoodieValidationException("Field " + orderingFields + " does
not exist in the table schema."
+ // but if no-precombine was passed initially then we shouldn't fail
here on schema check
+ throw new HoodieValidationException("Field " + orderingFields + " does
not exist in the table schema. "
+ "Please check '" + FlinkOptions.ORDERING_FIELDS.key() + "'
option.");
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
index f2646aba5269..3ae8ea520f2c 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -62,8 +62,6 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -90,8 +88,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(FlinkMiniCluster.class)
public class ITTestHoodieFlinkCompactor {
- protected static final Logger LOG =
LoggerFactory.getLogger(ITTestHoodieFlinkCompactor.class);
-
private static final Map<String, List<String>> EXPECTED1 = new HashMap<>();
private static final Map<String, List<String>> EXPECTED2 = new HashMap<>();
@@ -131,6 +127,8 @@ public class ITTestHoodieFlinkCompactor {
options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
+ options.put(FlinkOptions.ORDERING_FIELDS.key(), "ts");
options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
options.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
logBlockFormat);
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), enableChangelog + "");
@@ -146,6 +144,8 @@ public class ITTestHoodieFlinkCompactor {
FlinkCompactionConfig cfg = new FlinkCompactionConfig();
cfg.path = tempFile.getAbsolutePath();
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+ conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
conf.set(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
// create metaClient
@@ -206,6 +206,8 @@ public class ITTestHoodieFlinkCompactor {
options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
+ options.put(FlinkOptions.ORDERING_FIELDS.key(), "ts");
options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1",
options);
tableEnv.executeSql(hoodieTableDDL);
@@ -219,6 +221,8 @@ public class ITTestHoodieFlinkCompactor {
FlinkCompactionConfig cfg = new FlinkCompactionConfig();
cfg.path = tempFile.getAbsolutePath();
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+ conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
conf.set(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
// create metaClient
@@ -293,6 +297,8 @@ public class ITTestHoodieFlinkCompactor {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
+ options.put(FlinkOptions.ORDERING_FIELDS.key(), "ts");
options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), enableChangelog + "");
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1",
options);
@@ -309,6 +315,8 @@ public class ITTestHoodieFlinkCompactor {
cfg.minCompactionIntervalSeconds = 3;
cfg.schedule = true;
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+ conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
conf.set(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
conf.set(FlinkOptions.COMPACTION_TASKS,
FlinkMiniCluster.DEFAULT_PARALLELISM);
@@ -334,6 +342,8 @@ public class ITTestHoodieFlinkCompactor {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
+ options.put(FlinkOptions.ORDERING_FIELDS.key(), "ts");
options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), enableChangelog + "");
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1",
options);
@@ -344,6 +354,8 @@ public class ITTestHoodieFlinkCompactor {
FlinkCompactionConfig cfg = new FlinkCompactionConfig();
cfg.path = tempFile.getAbsolutePath();
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+ conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
conf.set(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
@@ -412,6 +424,8 @@ public class ITTestHoodieFlinkCompactor {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "2");
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
+ options.put(FlinkOptions.ORDERING_FIELDS.key(), "ts");
options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1",
options);
tableEnv.executeSql(hoodieTableDDL);
@@ -429,6 +443,8 @@ public class ITTestHoodieFlinkCompactor {
FlinkCompactionConfig cfg = new FlinkCompactionConfig();
cfg.path = tempFile.getAbsolutePath();
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+ conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
assertDoesNotThrow(() -> runOfflineCompact(tableEnv, conf));
assertNoDuplicateFile(conf);
@@ -467,6 +483,8 @@ public class ITTestHoodieFlinkCompactor {
options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
+ options.put(FlinkOptions.ORDERING_FIELDS.key(), "ts");
options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1",
options);
tableEnv.executeSql(hoodieTableDDL);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 60a55a74f1a5..ef0d57c1c683 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -83,6 +83,7 @@ import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -94,7 +95,6 @@ import java.util.stream.IntStream;
import java.util.stream.Stream;
import static java.util.Arrays.asList;
-import static java.util.Arrays.stream;
import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
import static org.apache.hudi.utils.TestConfigurations.catalog;
@@ -107,6 +107,7 @@ import static org.apache.hudi.utils.TestData.map;
import static org.apache.hudi.utils.TestData.row;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertLinesMatch;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -148,6 +149,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.TABLE_TYPE, tableType)
.end();
@@ -159,6 +161,7 @@ public class ITTestHoodieDataSource {
streamTableEnv.executeSql("drop table t1");
hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
.option(FlinkOptions.TABLE_TYPE, tableType)
@@ -192,6 +195,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
.option(FlinkOptions.CDC_ENABLED, true)
@@ -226,6 +230,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
.option(FlinkOptions.TABLE_TYPE, tableType)
@@ -256,6 +261,7 @@ public class ITTestHoodieDataSource {
String createHoodieTable = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.TABLE_TYPE, tableType)
.end();
@@ -271,6 +277,7 @@ public class ITTestHoodieDataSource {
// now we consume starting from the oldest commit
String createHoodieTable2 = sql("t2")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
.option(FlinkOptions.TABLE_TYPE, tableType)
@@ -291,6 +298,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.end();
streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 select * from source";
@@ -309,6 +317,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
// read optimized is supported for both MOR and COR table,
// test MOR streaming write with compaction then reads as
@@ -343,6 +352,7 @@ public class ITTestHoodieDataSource {
void testStreamWriteBatchReadOptimizedWithoutCompaction() {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
.option(FlinkOptions.QUERY_TYPE,
FlinkOptions.QUERY_TYPE_READ_OPTIMIZED)
.end();
@@ -364,6 +374,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
@@ -390,6 +401,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.OPERATION, "insert")
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true)
@@ -418,6 +430,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.OPERATION, "insert")
.option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true)
.option(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true)
@@ -452,6 +465,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.CLEAN_RETAIN_COMMITS, 1)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
@@ -472,6 +486,7 @@ public class ITTestHoodieDataSource {
void testBatchWriteWithCleaning() {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.CLEAN_RETAIN_COMMITS, 1)
.end();
batchTableEnv.executeSql(hoodieTableDDL);
@@ -496,6 +511,8 @@ public class ITTestHoodieDataSource {
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.set(FlinkOptions.TABLE_NAME, "t1");
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+ conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
conf.set(FlinkOptions.CHANGELOG_ENABLED, true);
@@ -508,6 +525,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2)
@@ -533,6 +551,7 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = batchTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.METADATA_ENABLED, true)
.option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
.option(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(), true)
@@ -563,6 +582,7 @@ public class ITTestHoodieDataSource {
void testReadWithPartitionStatsPruning(HoodieTableType tableType, boolean
hiveStylePartitioning) throws Exception {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.METADATA_ENABLED, true)
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(),
false)
@@ -572,6 +592,8 @@ public class ITTestHoodieDataSource {
.end();
streamTableEnv.executeSql(hoodieTableDDL);
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+ conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(),
"true");
conf.set(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true);
conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
@@ -614,6 +636,8 @@ public class ITTestHoodieDataSource {
void testStreamReadFilterByPartition(HoodieTableType tableType, boolean
hiveStylePartitioning) throws Exception {
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.set(FlinkOptions.TABLE_NAME, "t1");
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+ conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning);
@@ -622,6 +646,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2)
@@ -645,6 +670,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.READ_START_COMMIT,
FlinkOptions.START_COMMIT_EARLIEST)
@@ -669,6 +695,7 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv :
streamTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
.end();
tableEnv.executeSql(hoodieTableDDL);
@@ -698,16 +725,19 @@ public class ITTestHoodieDataSource {
.field("tss timestamp(3)") // use a different field with default
precombine field 'ts'
.field("`partition` varchar(10)")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.RECORD_KEY_FIELD, "uuid")
+ .option(FlinkOptions.ORDERING_FIELDS, "ts")
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
.end();
tableEnv.executeSql(hoodieTableDDL);
- execInsertSql(tableEnv, TestSQL.INSERT_SAME_KEY_T1);
-
- List<Row> result1 = CollectionUtil.iterableToList(
- () -> tableEnv.sqlQuery("select * from t1").execute().collect());
- assertRowsEquals(result1, "[+I[id1, Danny, 23, 1970-01-01T00:00:01,
par1]]");
+ ValidationException exception =
+ assertThrows(ValidationException.class, () -> execInsertSql(tableEnv,
TestSQL.INSERT_SAME_KEY_T1));
+ assertLinesMatch(
+ Collections.singletonList("Field ts does not exist in the table
schema. Please check '"
+ + FlinkOptions.ORDERING_FIELDS.key() + "' option."),
+ Collections.singletonList(exception.getCause().getMessage()));
}
@ParameterizedTest
@@ -721,9 +751,10 @@ public class ITTestHoodieDataSource {
.field("ts timestamp(3)") // use the default precombine field 'ts'
.field("`partition` varchar(10)")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.RECORD_KEY_FIELD, "uuid")
+ .option(FlinkOptions.ORDERING_FIELDS, FlinkOptions.NO_PRE_COMBINE)
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
- .option(FlinkOptions.ORDERING_FIELDS, FlinkOptions.NO_PRE_COMBINE)
.end();
tableEnv.executeSql(hoodieTableDDL);
@@ -740,6 +771,7 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = batchTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, tableType)
.option("hoodie.parquet.small.file.limit", "0") // invalidate the
small file strategy
.option("hoodie.parquet.max.file.size", "0")
@@ -766,6 +798,7 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = batchTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
.end();
@@ -791,12 +824,14 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = streamTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath() + "/t1")
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, tableType)
.end();
tableEnv.executeSql(hoodieTableDDL);
String hoodieTableDDL2 = sql("t2")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath() + "/t2")
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, tableType)
.end();
tableEnv.executeSql(hoodieTableDDL2);
@@ -878,6 +913,7 @@ public class ITTestHoodieDataSource {
.pkField("id")
.noPartition()
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.ORDERING_FIELDS, "ts")
.option(FlinkOptions.READ_AS_STREAMING, streaming)
.option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
.end();
@@ -912,6 +948,7 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = batchTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.INDEX_TYPE, indexType)
.option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false)
@@ -983,6 +1020,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
.option(FlinkOptions.TABLE_TYPE, tableType)
@@ -1005,6 +1043,7 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv :
streamTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.WRITE_BATCH_SIZE, "0.001")
.option(FlinkOptions.TABLE_TYPE, tableType)
.end();
@@ -1034,6 +1073,7 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv :
streamTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.WRITE_BATCH_SIZE, "0.001")
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.INDEX_GLOBAL_ENABLED, true)
@@ -1069,6 +1109,7 @@ public class ITTestHoodieDataSource {
.pkField("id")
.noPartition()
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.ORDERING_FIELDS, "ts")
.option(FlinkOptions.PAYLOAD_CLASS_NAME,
DefaultHoodieRecordPayload.class.getName())
.end();
tableEnv.executeSql(hoodieTableDDL);
@@ -1092,6 +1133,7 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv :
streamTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, tableType)
.noPartition()
.end();
@@ -1124,6 +1166,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.INDEX_GLOBAL_ENABLED, true)
.option(FlinkOptions.PRE_COMBINE, true)
.end();
@@ -1146,6 +1189,7 @@ public class ITTestHoodieDataSource {
streamTableEnv.executeSql(createSource);
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.INDEX_GLOBAL_ENABLED, false)
.option(FlinkOptions.PRE_COMBINE, true)
.end();
@@ -1171,6 +1215,7 @@ public class ITTestHoodieDataSource {
// create a flink source table
String createHoodieTable = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.READ_AS_STREAMING, "true")
.option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
.end();
@@ -1194,6 +1239,7 @@ public class ITTestHoodieDataSource {
// create a flink source table
String createHoodieTable = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
.end();
batchTableEnv.executeSql(createHoodieTable);
@@ -1275,6 +1321,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("hoodie_sink")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.OPERATION, "bulk_insert")
.option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT, true)
.option(FlinkOptions.INDEX_TYPE, indexType)
@@ -1316,6 +1363,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql(catalogName + ".hudi.hoodie_sink")
.option(FlinkOptions.PATH, basePath)
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.OPERATION, operationType)
.option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT, true)
@@ -1377,6 +1425,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql(catalogName + ".hudi.hoodie_sink")
.option(FlinkOptions.PATH, basePath)
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.OPERATION, operationType)
.option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT, true)
@@ -1405,6 +1454,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.OPERATION, "bulk_insert")
.option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT, true)
.option(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT, true)
@@ -1430,6 +1480,7 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = batchTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.OPERATION, "bulk_insert")
.noPartition()
.end();
@@ -1468,6 +1519,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("hoodie_sink")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.OPERATION, "insert")
.option(FlinkOptions.INSERT_CLUSTER, clustering)
.end();
@@ -1494,6 +1546,7 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv :
streamTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
.partitionField("ts") // use timestamp as partition path field
.end();
@@ -1519,6 +1572,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
.option(FlinkOptions.COMPACTION_TASKS, 1)
@@ -1543,6 +1597,7 @@ public class ITTestHoodieDataSource {
.field("age int")
.field("ts date")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.PARTITION_FORMAT, partitionFormat)
.partitionField("ts") // use date as partition path field
.end();
@@ -1596,6 +1651,8 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = batchTableEnv;
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.set(FlinkOptions.TABLE_NAME, "t1");
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+ conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
// write 3 batches of data set
@@ -1607,6 +1664,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.READ_START_COMMIT, latestCommit)
.end();
@@ -1623,6 +1681,8 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = streamTableEnv;
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.set(FlinkOptions.TABLE_NAME, "t1");
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+ conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, compactionEnabled);
conf.set(FlinkOptions.READ_CDC_FROM_CHANGELOG, false); // calculate the
changes on the fly
@@ -1638,6 +1698,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.COMPACTION_ASYNC_ENABLED, compactionEnabled)
.option(FlinkOptions.READ_CDC_FROM_CHANGELOG, false)
@@ -1668,6 +1729,8 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = streamTableEnv;
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.set(FlinkOptions.TABLE_NAME, "t1");
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+ conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
conf.set(FlinkOptions.TABLE_TYPE, MERGE_ON_READ.name());
conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, compactionEnabled);
// schedule compaction after 2 commits
@@ -1685,6 +1748,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
.option(FlinkOptions.COMPACTION_ASYNC_ENABLED, compactionEnabled)
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, 2)
@@ -1720,6 +1784,8 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = batchTableEnv;
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.set(FlinkOptions.TABLE_NAME, "t1");
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+ conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
conf.set(FlinkOptions.ARCHIVE_MIN_COMMITS, 4);
conf.set(FlinkOptions.ARCHIVE_MAX_COMMITS, 5);
@@ -1736,6 +1802,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.READ_START_COMMIT, secondArchived)
.end();
@@ -1753,6 +1820,8 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = batchTableEnv;
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.set(FlinkOptions.TABLE_NAME, "t1");
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+ conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
// write a batch of data set
@@ -1768,6 +1837,7 @@ public class ITTestHoodieDataSource {
.pkField("uuid")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.TABLE_TYPE, tableType)
+ .option(FlinkOptions.ORDERING_FIELDS, "ts")
.end();
tableEnv.executeSql(hoodieTableDDL);
@@ -1947,6 +2017,7 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = batchTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.METADATA_ENABLED, true)
.option("hoodie.metadata.index.column.stats.enable", true)
.option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
@@ -2045,6 +2116,7 @@ public class ITTestHoodieDataSource {
String path = tempFile.getAbsolutePath();
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, path)
+ .options(getDefaultKeys())
.option(FlinkOptions.METADATA_ENABLED, true)
.option("hoodie.metadata.index.column.stats.enable", true)
.option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
@@ -2066,6 +2138,7 @@ public class ITTestHoodieDataSource {
hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, path)
+ .options(getDefaultKeys())
.option(FlinkOptions.METADATA_ENABLED, true)
.option("hoodie.metadata.index.column.stats.enable", true)
.option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
@@ -2096,6 +2169,7 @@ public class ITTestHoodieDataSource {
String path = tempFile.getAbsolutePath();
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, path)
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
.option(FlinkOptions.COMPACTION_TASKS, 1)
@@ -2110,6 +2184,7 @@ public class ITTestHoodieDataSource {
hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, path)
+ .options(getDefaultKeys())
.option(FlinkOptions.METADATA_ENABLED, true)
.option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
.end();
@@ -2128,6 +2203,7 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = batchTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.METADATA_ENABLED, true)
.option("hoodie.metadata.index.column.stats.enable", true)
.option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet")
@@ -2169,6 +2245,7 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = batchTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.METADATA_ENABLED, true)
.option("hoodie.metadata.index.column.stats.enable", true)
.option("hoodie.metadata.index.column.stats.file.group.count", 2)
@@ -2192,6 +2269,7 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = batchTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.METADATA_ENABLED, true)
.option("hoodie.metadata.index.column.stats.enable", true)
.option("hoodie.metadata.index.column.stats.file.group.count", 2)
@@ -2215,6 +2293,7 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = batchTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.METADATA_ENABLED, false)
.option(FlinkOptions.TABLE_TYPE, tableType)
.end();
@@ -2249,6 +2328,7 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = batchTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.INDEX_TYPE, "BUCKET")
.option(FlinkOptions.TABLE_TYPE, tableType)
.end();
@@ -2466,6 +2546,7 @@ public class ITTestHoodieDataSource {
.option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
.option(FlinkOptions.PATH, path)
+ .option(FlinkOptions.ORDERING_FIELDS, "ts")
.end();
batchTableEnv.executeSql(hoodieTableDDL);
@@ -2527,6 +2608,7 @@ public class ITTestHoodieDataSource {
.option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
.option(FlinkOptions.PATH, path)
+ .option(FlinkOptions.ORDERING_FIELDS, "ts")
.end();
batchTableEnv.executeSql(readHoodieTableDDL);
@@ -2549,6 +2631,7 @@ public class ITTestHoodieDataSource {
.option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
.option(FlinkOptions.PATH, path)
+ .option(FlinkOptions.ORDERING_FIELDS, "ts")
.end();
batchTableEnv.executeSql(readHoodieTableDDL);
@@ -2571,6 +2654,7 @@ public class ITTestHoodieDataSource {
.option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
.option(FlinkOptions.PATH, path)
+ .option(FlinkOptions.ORDERING_FIELDS, "ts")
.end();
batchTableEnv.executeSql(readHoodieTableDDL);
@@ -2584,6 +2668,8 @@ public class ITTestHoodieDataSource {
void testDynamicPartitionPrune(HoodieTableType tableType, boolean
hiveStylePartitioning) throws Exception {
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.set(FlinkOptions.TABLE_NAME, "t1");
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+ conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning);
@@ -2591,6 +2677,7 @@ public class ITTestHoodieDataSource {
TestData.writeData(TestData.DATA_SET_INSERT, conf);
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2)
@@ -2616,6 +2703,7 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = batchTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.INDEX_TYPE, indexType)
.end();
@@ -2672,6 +2760,7 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false")
.end();
@@ -2682,6 +2771,7 @@ public class ITTestHoodieDataSource {
streamTableEnv.executeSql("drop table t1");
hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, tableType)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
@@ -2692,7 +2782,10 @@ public class ITTestHoodieDataSource {
@Test
void testReadWithParquetPredicatePushDown() {
TableEnvironment tableEnv = batchTableEnv;
- String hoodieTableDDL = sql("t1").option(FlinkOptions.PATH,
tempFile.getAbsolutePath()).end();
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
+ .end();
tableEnv.executeSql(hoodieTableDDL);
execInsertSql(tableEnv, TestSQL.INSERT_T1);
// apply filters to push down predicates
@@ -2713,6 +2806,7 @@ public class ITTestHoodieDataSource {
// insert first batch of data with parquet log block
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ)
.option(FlinkOptions.INDEX_TYPE, indexType)
.option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
@@ -2728,6 +2822,7 @@ public class ITTestHoodieDataSource {
// insert second batch of data with avro log block
hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ)
.option(FlinkOptions.INDEX_TYPE, indexType)
.option(FlinkOptions.READ_AS_STREAMING, true)
@@ -2754,6 +2849,7 @@ public class ITTestHoodieDataSource {
// insert first batch of data with rowdata mode writing disabled
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ)
.option(FlinkOptions.INDEX_TYPE, index)
.option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet")
@@ -2775,6 +2871,7 @@ public class ITTestHoodieDataSource {
JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Default);
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
.option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false)
.end();
@@ -2795,6 +2892,7 @@ public class ITTestHoodieDataSource {
// init and write data with table version SIX
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ)
.option(FlinkOptions.WRITE_TABLE_VERSION,
HoodieTableVersion.SIX.versionCode() + "")
.option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false)
@@ -2807,6 +2905,7 @@ public class ITTestHoodieDataSource {
hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ)
.option(FlinkOptions.WRITE_TABLE_VERSION,
HoodieTableVersion.EIGHT.versionCode() + "")
.option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false)
@@ -2826,6 +2925,7 @@ public class ITTestHoodieDataSource {
void testWriteWithTimelineServerBasedMarker(HoodieTableType tableType) {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(HoodieWriteConfig.MARKERS_TYPE.key(),
MarkerType.TIMELINE_SERVER_BASED.name())
.end();
@@ -2909,6 +3009,7 @@ public class ITTestHoodieDataSource {
TableEnvironment tableEnv = streamTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
.option(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name())
.option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
.option(FlinkOptions.TABLE_TYPE, COPY_ON_WRITE)
@@ -2928,6 +3029,13 @@ public class ITTestHoodieDataSource {
BATCH, STREAM
}
+ public static Map<String, String> getDefaultKeys() {
+ Configuration conf = new Configuration();
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+ conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
+ return conf.toMap();
+ }
+
/**
* Return test params => (execution mode, table type).
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index 9558250b89c6..bd38b0d7e100 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -88,6 +88,7 @@ public class TestHoodieTableFactory {
this.conf = new Configuration();
this.conf.set(FlinkOptions.PATH, tempFile.getAbsolutePath());
this.conf.set(FlinkOptions.TABLE_NAME, "t1");
+ this.conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
StreamerUtil.initTableIfNotExists(this.conf);
}
@@ -110,12 +111,9 @@ public class TestHoodieTableFactory {
final MockContext sourceContext11 = MockContext.getInstance(this.conf,
schema1, "f2");
assertDoesNotThrow(() -> new
HoodieTableFactory().createDynamicTableSource(sourceContext11));
assertDoesNotThrow(() -> new
HoodieTableFactory().createDynamicTableSink(sourceContext11));
- //miss the pre combine key will be ok
- HoodieTableSink tableSink11 = (HoodieTableSink) new
HoodieTableFactory().createDynamicTableSink(sourceContext11);
- assertThat(tableSink11.getConf().get(FlinkOptions.ORDERING_FIELDS),
is(FlinkOptions.NO_PRE_COMBINE));
- this.conf.set(FlinkOptions.OPERATION,
FlinkOptions.OPERATION.defaultValue());
// a non-exists precombine key will throw exception
+ this.conf.set(FlinkOptions.OPERATION, "upsert");
ResolvedSchema schema2 = SchemaBuilder.instance()
.field("f0", DataTypes.INT().notNull())
.field("f1", DataTypes.VARCHAR(20))
@@ -126,7 +124,6 @@ public class TestHoodieTableFactory {
// createDynamicTableSource doesn't call sanity check, will not throw
exception
assertDoesNotThrow(() -> new
HoodieTableFactory().createDynamicTableSource(sourceContext2));
assertThrows(HoodieValidationException.class, () -> new
HoodieTableFactory().createDynamicTableSink(sourceContext2));
- this.conf.set(FlinkOptions.ORDERING_FIELDS,
FlinkOptions.ORDERING_FIELDS.defaultValue());
// given the pk but miss the pre combine key will be ok
ResolvedSchema schema3 = SchemaBuilder.instance()
@@ -135,6 +132,7 @@ public class TestHoodieTableFactory {
.field("f2", DataTypes.TIMESTAMP(3))
.primaryKey("f0")
.build();
+ this.conf.removeConfig(FlinkOptions.ORDERING_FIELDS);
final MockContext sourceContext3 = MockContext.getInstance(this.conf,
schema3, "f2");
HoodieTableSource tableSource = (HoodieTableSource) new
HoodieTableFactory().createDynamicTableSource(sourceContext3);
HoodieTableSink tableSink = (HoodieTableSink) new
HoodieTableFactory().createDynamicTableSink(sourceContext3);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 8431a8e0490b..7d3dcc64d08c 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -117,6 +117,9 @@ public class TestInputFormat {
void beforeEach(HoodieTableType tableType, Map<String, String> options)
throws IOException {
conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ // all test cases here expect former default values for record key and
ordering fields
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+ conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
if (!conf.contains(FlinkOptions.COMPACTION_ASYNC_ENABLED)) {
conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // by default
close the async compaction
@@ -1222,6 +1225,8 @@ public class TestInputFormat {
conf.set(FlinkOptions.PATH, tempFile.getAbsolutePath());
conf.set(FlinkOptions.TABLE_NAME, "TestHoodieTable");
conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+ conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
conf.set(FlinkOptions.PARTITION_PATH_FIELD, "partition");
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA.key(),
HoodieSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE_DECIMAL_ORDERING).toString());
conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // by default
close the async compaction
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index 46c90afa7a10..e2a4558e0ee3 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -363,6 +363,8 @@ public class TestConfigurations {
public static Configuration getDefaultConf(String tablePath) {
Configuration conf = new Configuration();
conf.set(FlinkOptions.PATH, tablePath);
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+ conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH,
Objects.requireNonNull(Thread.currentThread()
.getContextClassLoader().getResource("test_read_schema.avsc")).toString());
@@ -374,6 +376,8 @@ public class TestConfigurations {
public static Configuration getDefaultConf(String tablePath, DataType
dataType) {
Configuration conf = new Configuration();
conf.set(FlinkOptions.PATH, tablePath);
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+ conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA,
HoodieSchemaConverter.convertToSchema(dataType.getLogicalType()).toString());
conf.set(FlinkOptions.TABLE_NAME, "TestHoodieTable");
conf.set(FlinkOptions.PARTITION_PATH_FIELD, "partition");