yihua commented on code in PR #13519:
URL: https://github.com/apache/hudi/pull/13519#discussion_r2263549244
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,55 +19,192 @@
package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.PartialUpdateMode;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieIndexVersion;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.table.HoodieTable;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static
org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES_PREFIX;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.PAYLOAD_CLASSES_TO_HANDLE;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.checkAndHandleMetadataTable;
import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
+/**
+ * Version 8 presents Hudi version from 1.0.0 to 1.0.2.
+ * Version 9 presents Hudi version >= 1.1.0.
+ * Major upgrade logic:
+ * Deprecate a given set of payload classes to prefer merge mode. That is,
+ * for table with payload class defined in RFC-97,
+ * remove hoodie.compaction.payload.class from table_configs
+ * add hoodie.legacy.payload.class=payload to table_configs
+ * set hoodie.table.partial.update.mode based on RFC-97
+ * set hoodie.table.merge.properties based on RFC-97
+ * set hoodie.record.merge.mode based on RFC-97
+ * set hoodie.record.merge.strategy.id based on RFC-97
+ * for table with event_time/commit_time merge mode,
+ * set hoodie.table.partial.update.mode to default value
+ * set hoodie.table.merge.properties to default value
+ * for table with custom merger or payload,
+ * set hoodie.table.partial.update.mode to default value
+ * set hoodie.table.merge.properties to default value
+ */
public class EightToNineUpgradeHandler implements UpgradeHandler {
+ private static final Set<String> PAYLOADS_UPGRADE_TO_EVENT_TIME_MERGE_MODE =
new HashSet<>(Arrays.asList(
+ EventTimeAvroPayload.class.getName(),
+ MySqlDebeziumAvroPayload.class.getName(),
+ PartialUpdateAvroPayload.class.getName(),
+ PostgresDebeziumAvroPayload.class.getName()));
+ private static final Set<String> PAYLOADS_UPGRADE_TO_COMMIT_TIME_MERGE_MODE
= new HashSet<>(Arrays.asList(
+ AWSDmsAvroPayload.class.getName(),
+ OverwriteNonDefaultsWithLatestAvroPayload.class.getName()));
@Override
- public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config,
- HoodieEngineContext context,
- String instantTime,
- SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
+ public UpgradeDowngrade.TableConfigChangeSet upgrade(HoodieWriteConfig
config,
+ HoodieEngineContext
context,
+ String instantTime,
+
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
- // Rollback and run compaction in one step
- rollbackFailedWritesAndCompact(
- table, context, config, upgradeDowngradeHelper,
-
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
- HoodieTableVersion.NINE);
-
- // If auto upgrade is disabled, set writer version to 8 and return
+ Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+ // If auto upgrade is disabled, set writer version to 8 and return.
if (!config.autoUpgrade()) {
- config.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION,
String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
- return Collections.emptyMap();
+ config.setValue(
+ HoodieWriteConfig.WRITE_TABLE_VERSION,
+ String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
+ return new UpgradeDowngrade.TableConfigChangeSet(tablePropsToAdd,
Collections.emptyList());
}
HoodieTableMetaClient metaClient = table.getMetaClient();
-
+ HoodieTableConfig tableConfig = metaClient.getTableConfig();
Review Comment:
If the table is upgraded from 6 to 9, when executing the eight-to-nine
upgrade handler, based on the logic the table config fetched here is still from
original version 6, because the table config is only rewritten after all
`upgrade` is called. Does the logic here assume that some table configs are
already updated to table version 8? I think this assumption make senses for
maintaining the upgrade logic, but it requires changing the logic to follow
that, i.e., passing in the updated and deleted configs from 6->8 upgrades.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,55 +19,192 @@
package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.PartialUpdateMode;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieIndexVersion;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.table.HoodieTable;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static
org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES_PREFIX;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.PAYLOAD_CLASSES_TO_HANDLE;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.checkAndHandleMetadataTable;
import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
+/**
+ * Version 8 presents Hudi version from 1.0.0 to 1.0.2.
+ * Version 9 presents Hudi version >= 1.1.0.
+ * Major upgrade logic:
+ * Deprecate a given set of payload classes to prefer merge mode. That is,
+ * for table with payload class defined in RFC-97,
+ * remove hoodie.compaction.payload.class from table_configs
+ * add hoodie.legacy.payload.class=payload to table_configs
+ * set hoodie.table.partial.update.mode based on RFC-97
+ * set hoodie.table.merge.properties based on RFC-97
+ * set hoodie.record.merge.mode based on RFC-97
+ * set hoodie.record.merge.strategy.id based on RFC-97
+ * for table with event_time/commit_time merge mode,
+ * set hoodie.table.partial.update.mode to default value
+ * set hoodie.table.merge.properties to default value
+ * for table with custom merger or payload,
+ * set hoodie.table.partial.update.mode to default value
+ * set hoodie.table.merge.properties to default value
+ */
public class EightToNineUpgradeHandler implements UpgradeHandler {
+ private static final Set<String> PAYLOADS_UPGRADE_TO_EVENT_TIME_MERGE_MODE =
new HashSet<>(Arrays.asList(
+ EventTimeAvroPayload.class.getName(),
+ MySqlDebeziumAvroPayload.class.getName(),
+ PartialUpdateAvroPayload.class.getName(),
+ PostgresDebeziumAvroPayload.class.getName()));
+ private static final Set<String> PAYLOADS_UPGRADE_TO_COMMIT_TIME_MERGE_MODE
= new HashSet<>(Arrays.asList(
+ AWSDmsAvroPayload.class.getName(),
+ OverwriteNonDefaultsWithLatestAvroPayload.class.getName()));
@Override
- public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config,
- HoodieEngineContext context,
- String instantTime,
- SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
+ public UpgradeDowngrade.TableConfigChangeSet upgrade(HoodieWriteConfig
config,
+ HoodieEngineContext
context,
+ String instantTime,
+
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
- // Rollback and run compaction in one step
- rollbackFailedWritesAndCompact(
- table, context, config, upgradeDowngradeHelper,
-
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
- HoodieTableVersion.NINE);
Review Comment:
#13642 has fixed the rollback and compaction to be idempotent.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,55 +19,192 @@
package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.PartialUpdateMode;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieIndexVersion;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.table.HoodieTable;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static
org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES_PREFIX;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.PAYLOAD_CLASSES_TO_HANDLE;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.checkAndHandleMetadataTable;
import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
+/**
+ * Version 8 presents Hudi version from 1.0.0 to 1.0.2.
+ * Version 9 presents Hudi version >= 1.1.0.
+ * Major upgrade logic:
+ * Deprecate a given set of payload classes to prefer merge mode. That is,
+ * for table with payload class defined in RFC-97,
+ * remove hoodie.compaction.payload.class from table_configs
+ * add hoodie.legacy.payload.class=payload to table_configs
+ * set hoodie.table.partial.update.mode based on RFC-97
+ * set hoodie.table.merge.properties based on RFC-97
+ * set hoodie.record.merge.mode based on RFC-97
+ * set hoodie.record.merge.strategy.id based on RFC-97
+ * for table with event_time/commit_time merge mode,
+ * set hoodie.table.partial.update.mode to default value
+ * set hoodie.table.merge.properties to default value
+ * for table with custom merger or payload,
+ * set hoodie.table.partial.update.mode to default value
+ * set hoodie.table.merge.properties to default value
+ */
public class EightToNineUpgradeHandler implements UpgradeHandler {
+ private static final Set<String> PAYLOADS_UPGRADE_TO_EVENT_TIME_MERGE_MODE =
new HashSet<>(Arrays.asList(
+ EventTimeAvroPayload.class.getName(),
+ MySqlDebeziumAvroPayload.class.getName(),
+ PartialUpdateAvroPayload.class.getName(),
+ PostgresDebeziumAvroPayload.class.getName()));
Review Comment:
I assume that `DefaultHoodieRecordPayload` is not needed since it should
already use `EVENT_TIME_ORDERING` as the merge mode.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]