vinothchandar commented on code in PR #13519:
URL: https://github.com/apache/hudi/pull/13519#discussion_r2226892658
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,55 +19,198 @@
package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.PartialUpdateMode;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieIndexVersion;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.table.HoodieTable;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.PAYLOAD_CLASSES_TO_HANDLE;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.checkAndHandleMetadataTable;
import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
+/**
+ * Version 8 presents Hudi version from 1.0.0 to 1.0.2.
+ * Version 9 presents Hudi version >= 1.1.0.
+ * Major upgrade logic:
+ * Deprecate a given set of payload classes to prefer merge mode. That is,
+ * for table with payload class defined in RFC-97,
+ * remove hoodie.compaction.payload.class from table_configs
+ * add hoodie.legacy.payload.class=payload to table_configs
+ * set hoodie.table.partial.update.mode based on RFC-97
+ * set hoodie.table.merge.properties based on RFC-97
+ * set hoodie.record.merge.mode based on RFC-97
+ * set hoodie.record.merge.strategy.id based on RFC-97
+ * for table with event_time/commit_time merge mode,
+ * set hoodie.table.partial.update.mode to default value
+ * set hoodie.table.merge.properties to default value
+ * for table with custom merger or payload,
+ * set hoodie.table.partial.update.mode to default value
+ * set hoodie.table.merge.properties to default value
+ */
public class EightToNineUpgradeHandler implements UpgradeHandler {
+ private static final Set<String> PAYLOADS_UPGRADE_TO_EVENT_TIME_MERGE_MODE =
new HashSet<>(Arrays.asList(
+ EventTimeAvroPayload.class.getName(),
+ MySqlDebeziumAvroPayload.class.getName(),
+ PartialUpdateAvroPayload.class.getName(),
+ PostgresDebeziumAvroPayload.class.getName()));
+ private static final Set<String> PAYLOADS_UPGRADE_TO_COMMIT_TIME_MERGE_MODE
= new HashSet<>(Arrays.asList(
+ AWSDmsAvroPayload.class.getName(),
+ OverwriteNonDefaultsWithLatestAvroPayload.class.getName()));
@Override
- public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config,
- HoodieEngineContext context,
- String instantTime,
- SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
+ public Pair<Map<ConfigProperty, String>, List<ConfigProperty>>
upgrade(HoodieWriteConfig config,
+
HoodieEngineContext context,
+
String instantTime,
+
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
- // Rollback and run compaction in one step
- rollbackFailedWritesAndCompact(
- table, context, config, upgradeDowngradeHelper,
-
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
- HoodieTableVersion.NINE);
-
- // If auto upgrade is disabled, set writer version to 8 and return
+ Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+ // If auto upgrade is disabled, set writer version to 8 and return.
if (!config.autoUpgrade()) {
- config.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION,
String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
- return Collections.emptyMap();
+ config.setValue(
+ HoodieWriteConfig.WRITE_TABLE_VERSION,
+ String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
+ return Pair.of(tablePropsToAdd, Collections.emptyList());
}
HoodieTableMetaClient metaClient = table.getMetaClient();
-
+ HoodieTableConfig tableConfig = metaClient.getTableConfig();
// Populate missing index versions indexes
Option<HoodieIndexMetadata> indexMetadataOpt =
metaClient.getIndexMetadata();
if (indexMetadataOpt.isPresent()) {
populateIndexVersionIfMissing(indexMetadataOpt);
-
// Write the updated index metadata back to storage
HoodieTableMetaClient.writeIndexMetadataToStorage(
metaClient.getStorage(),
metaClient.getIndexDefinitionPath(),
indexMetadataOpt.get(),
metaClient.getTableConfig().getTableVersion());
}
- return Collections.emptyMap();
+ // If metadata is enabled for the data table, and
+ // existing metadata table is behind the data table, then delete it.
+ checkAndHandleMetadataTable(context, table, config, metaClient);
+ // Rollback and run compaction in one step.
+ rollbackFailedWritesAndCompact(
+ table, context, config, upgradeDowngradeHelper,
+
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
+ HoodieTableVersion.EIGHT);
+ // Handle merge mode config.
+ upgradeMergeModeConfig(tablePropsToAdd, tableConfig);
Review Comment:
have we thought about idempotency/recovery if upgrade fails in between..
what happens?
We should make 1 write to `hoodie.properties`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,55 +19,198 @@
package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.PartialUpdateMode;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieIndexVersion;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.table.HoodieTable;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.PAYLOAD_CLASSES_TO_HANDLE;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.checkAndHandleMetadataTable;
import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
+/**
+ * Version 8 presents Hudi version from 1.0.0 to 1.0.2.
+ * Version 9 presents Hudi version >= 1.1.0.
+ * Major upgrade logic:
+ * Deprecate a given set of payload classes to prefer merge mode. That is,
+ * for table with payload class defined in RFC-97,
+ * remove hoodie.compaction.payload.class from table_configs
+ * add hoodie.legacy.payload.class=payload to table_configs
+ * set hoodie.table.partial.update.mode based on RFC-97
+ * set hoodie.table.merge.properties based on RFC-97
+ * set hoodie.record.merge.mode based on RFC-97
+ * set hoodie.record.merge.strategy.id based on RFC-97
+ * for table with event_time/commit_time merge mode,
+ * set hoodie.table.partial.update.mode to default value
+ * set hoodie.table.merge.properties to default value
+ * for table with custom merger or payload,
+ * set hoodie.table.partial.update.mode to default value
+ * set hoodie.table.merge.properties to default value
+ */
public class EightToNineUpgradeHandler implements UpgradeHandler {
+ private static final Set<String> PAYLOADS_UPGRADE_TO_EVENT_TIME_MERGE_MODE =
new HashSet<>(Arrays.asList(
+ EventTimeAvroPayload.class.getName(),
+ MySqlDebeziumAvroPayload.class.getName(),
+ PartialUpdateAvroPayload.class.getName(),
+ PostgresDebeziumAvroPayload.class.getName()));
+ private static final Set<String> PAYLOADS_UPGRADE_TO_COMMIT_TIME_MERGE_MODE
= new HashSet<>(Arrays.asList(
+ AWSDmsAvroPayload.class.getName(),
+ OverwriteNonDefaultsWithLatestAvroPayload.class.getName()));
@Override
- public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config,
- HoodieEngineContext context,
- String instantTime,
- SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
+ public Pair<Map<ConfigProperty, String>, List<ConfigProperty>>
upgrade(HoodieWriteConfig config,
+
HoodieEngineContext context,
+
String instantTime,
+
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
- // Rollback and run compaction in one step
- rollbackFailedWritesAndCompact(
- table, context, config, upgradeDowngradeHelper,
-
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
- HoodieTableVersion.NINE);
-
- // If auto upgrade is disabled, set writer version to 8 and return
+ Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+ // If auto upgrade is disabled, set writer version to 8 and return.
if (!config.autoUpgrade()) {
- config.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION,
String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
- return Collections.emptyMap();
+ config.setValue(
+ HoodieWriteConfig.WRITE_TABLE_VERSION,
+ String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
+ return Pair.of(tablePropsToAdd, Collections.emptyList());
}
HoodieTableMetaClient metaClient = table.getMetaClient();
-
+ HoodieTableConfig tableConfig = metaClient.getTableConfig();
// Populate missing index versions indexes
Option<HoodieIndexMetadata> indexMetadataOpt =
metaClient.getIndexMetadata();
if (indexMetadataOpt.isPresent()) {
populateIndexVersionIfMissing(indexMetadataOpt);
-
// Write the updated index metadata back to storage
HoodieTableMetaClient.writeIndexMetadataToStorage(
metaClient.getStorage(),
metaClient.getIndexDefinitionPath(),
indexMetadataOpt.get(),
metaClient.getTableConfig().getTableVersion());
}
- return Collections.emptyMap();
+ // If metadata is enabled for the data table, and
+ // existing metadata table is behind the data table, then delete it.
+ checkAndHandleMetadataTable(context, table, config, metaClient);
+ // Rollback and run compaction in one step.
+ rollbackFailedWritesAndCompact(
+ table, context, config, upgradeDowngradeHelper,
+
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
+ HoodieTableVersion.EIGHT);
+ // Handle merge mode config.
+ upgradeMergeModeConfig(tablePropsToAdd, tableConfig);
+ // Handle partial update mode config.
+ upgradePartialUpdateModeConfig(tablePropsToAdd, tableConfig);
+ // Handle merge properties config.
+ upgradeMergePropertiesConfig(tablePropsToAdd, tableConfig);
+ // Handle payload class configs.
+ List<ConfigProperty> tablePropsToRemove = new ArrayList<>();
+ upgradePayloadClassConfig(tablePropsToAdd, tablePropsToRemove,
tableConfig);
+ return Pair.of(tablePropsToAdd, tablePropsToRemove);
+ }
+
+ private void upgradePayloadClassConfig(Map<ConfigProperty, String>
tablePropsToAdd,
Review Comment:
the `upgradeXXXX` naming is misleading. it feels as though you are actually
changing storage..
rename all these to `changeXXX` e.g `changePayloadClassConfigs`
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java:
##########
@@ -35,85 +46,200 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
-import org.mockito.Mock;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.MockedStatic;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.junit.jupiter.MockitoSettings;
-import org.mockito.quality.Strictness;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.file.Path;
+import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING;
+import static
org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static org.apache.hudi.common.table.PartialUpdateMode.IGNORE_MARKERS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-@ExtendWith(MockitoExtension.class)
-@MockitoSettings(strictness = Strictness.LENIENT)
class TestEightToNineUpgradeHandler {
-
- @TempDir
- private Path tempDir;
-
- @Mock
- private HoodieWriteConfig config;
- @Mock
- private HoodieEngineContext context;
- @Mock
- private SupportsUpgradeDowngrade upgradeDowngradeHelper;
- @Mock
- private HoodieTable table;
- @Mock
- private HoodieTableMetaClient metaClient;
- @Mock
- private HoodieTableConfig tableConfig;
- @Mock
- private HoodieStorage storage;
-
- private EightToNineUpgradeHandler upgradeHandler;
+ private final EightToNineUpgradeHandler handler = new
EightToNineUpgradeHandler();
Review Comment:
i want to ensure we cover all major payloads in use. with a functional test
as well.. where we ensure the table is writable/readable after.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,55 +19,198 @@
package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.PartialUpdateMode;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieIndexVersion;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.table.HoodieTable;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.PAYLOAD_CLASSES_TO_HANDLE;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.checkAndHandleMetadataTable;
import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
+/**
+ * Version 8 presents Hudi version from 1.0.0 to 1.0.2.
+ * Version 9 presents Hudi version >= 1.1.0.
+ * Major upgrade logic:
+ * Deprecate a given set of payload classes to prefer merge mode. That is,
+ * for table with payload class defined in RFC-97,
+ * remove hoodie.compaction.payload.class from table_configs
+ * add hoodie.legacy.payload.class=payload to table_configs
+ * set hoodie.table.partial.update.mode based on RFC-97
+ * set hoodie.table.merge.properties based on RFC-97
+ * set hoodie.record.merge.mode based on RFC-97
+ * set hoodie.record.merge.strategy.id based on RFC-97
+ * for table with event_time/commit_time merge mode,
+ * set hoodie.table.partial.update.mode to default value
+ * set hoodie.table.merge.properties to default value
+ * for table with custom merger or payload,
+ * set hoodie.table.partial.update.mode to default value
+ * set hoodie.table.merge.properties to default value
+ */
public class EightToNineUpgradeHandler implements UpgradeHandler {
+ private static final Set<String> PAYLOADS_UPGRADE_TO_EVENT_TIME_MERGE_MODE =
new HashSet<>(Arrays.asList(
+ EventTimeAvroPayload.class.getName(),
+ MySqlDebeziumAvroPayload.class.getName(),
+ PartialUpdateAvroPayload.class.getName(),
+ PostgresDebeziumAvroPayload.class.getName()));
+ private static final Set<String> PAYLOADS_UPGRADE_TO_COMMIT_TIME_MERGE_MODE
= new HashSet<>(Arrays.asList(
+ AWSDmsAvroPayload.class.getName(),
+ OverwriteNonDefaultsWithLatestAvroPayload.class.getName()));
@Override
- public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config,
- HoodieEngineContext context,
- String instantTime,
- SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
+ public Pair<Map<ConfigProperty, String>, List<ConfigProperty>>
upgrade(HoodieWriteConfig config,
+
HoodieEngineContext context,
+
String instantTime,
+
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
- // Rollback and run compaction in one step
- rollbackFailedWritesAndCompact(
- table, context, config, upgradeDowngradeHelper,
-
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
- HoodieTableVersion.NINE);
-
- // If auto upgrade is disabled, set writer version to 8 and return
+ Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+ // If auto upgrade is disabled, set writer version to 8 and return.
if (!config.autoUpgrade()) {
- config.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION,
String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
- return Collections.emptyMap();
+ config.setValue(
+ HoodieWriteConfig.WRITE_TABLE_VERSION,
+ String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
+ return Pair.of(tablePropsToAdd, Collections.emptyList());
}
HoodieTableMetaClient metaClient = table.getMetaClient();
-
+ HoodieTableConfig tableConfig = metaClient.getTableConfig();
// Populate missing index versions indexes
Option<HoodieIndexMetadata> indexMetadataOpt =
metaClient.getIndexMetadata();
if (indexMetadataOpt.isPresent()) {
populateIndexVersionIfMissing(indexMetadataOpt);
-
// Write the updated index metadata back to storage
HoodieTableMetaClient.writeIndexMetadataToStorage(
metaClient.getStorage(),
metaClient.getIndexDefinitionPath(),
indexMetadataOpt.get(),
metaClient.getTableConfig().getTableVersion());
}
- return Collections.emptyMap();
+ // If metadata is enabled for the data table, and
+ // existing metadata table is behind the data table, then delete it.
+ checkAndHandleMetadataTable(context, table, config, metaClient);
Review Comment:
what do we mean by
> existing metadata table is behind the data table
we cannot blow away MT for this change. I'd like to understand why this is
absolutely needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]