This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 84064a9 [HUDI-3772] Fixing auto adjustment of lock configs for deltastreamer (#5207) 84064a9 is described below commit 84064a9b081c246f306855ae125f0dae5eb8f6d0 Author: Sivabalan Narayanan <n.siv...@gmail.com> AuthorDate: Sat Apr 2 23:44:10 2022 -0700 [HUDI-3772] Fixing auto adjustment of lock configs for deltastreamer (#5207) --- .../org/apache/hudi/config/HoodieWriteConfig.java | 79 +++++++++++++--------- .../apache/hudi/config/TestHoodieWriteConfig.java | 44 ++++++++++++ .../org/apache/hudi/HoodieStreamingSink.scala | 4 +- .../org/apache/hudi/utilities/UtilHelpers.java | 3 +- .../deltastreamer/HoodieDeltaStreamer.java | 9 ++- 5 files changed, 104 insertions(+), 35 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 23f1f38..87dd56b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -480,6 +480,12 @@ public class HoodieWriteConfig extends HoodieConfig { .sinceVersion("0.11.0") .withDocumentation("Control to enable release all persist rdds when the spark job finish."); + public static final ConfigProperty<Boolean> AUTO_ADJUST_LOCK_CONFIGS = ConfigProperty + .key("hoodie.auto.adjust.lock.configs") + .defaultValue(false) + .sinceVersion("0.11.0") + .withDocumentation("Auto adjust lock configurations when metadata table is enabled and for async table services."); + private ConsistencyGuardConfig consistencyGuardConfig; private FileSystemRetryConfig fileSystemRetryConfig; @@ -1968,6 +1974,9 @@ public class HoodieWriteConfig extends HoodieConfig { * Hoodie Client Lock Configs. * @return */ + public boolean isAutoAdjustLockConfigs() { + return getBooleanOrDefault(AUTO_ADJUST_LOCK_CONFIGS); + } public String getLockProviderClass() { return getString(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME); @@ -2443,6 +2452,11 @@ public class HoodieWriteConfig extends HoodieConfig { return this; } + public Builder withAutoAdjustLockConfigs(boolean autoAdjustLockConfigs) { + writeConfig.setValue(AUTO_ADJUST_LOCK_CONFIGS, String.valueOf(autoAdjustLockConfigs)); + return this; + } + protected void setDefaults() { writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType)); // Check for mandatory properties @@ -2480,41 +2494,42 @@ public class HoodieWriteConfig extends HoodieConfig { HoodieLayoutConfig.newBuilder().fromProperties(writeConfig.getProps()).build()); writeConfig.setDefaultValue(TIMELINE_LAYOUT_VERSION_NUM, String.valueOf(TimelineLayoutVersion.CURR_VERSION)); - autoAdjustConfigsForConcurrencyMode(); - } - - private void autoAdjustConfigsForConcurrencyMode() { - boolean isMetadataTableEnabled = writeConfig.getBoolean(HoodieMetadataConfig.ENABLE); + // isLockProviderPropertySet must be fetched before setting defaults of HoodieLockConfig final TypedProperties writeConfigProperties = writeConfig.getProps(); final boolean isLockProviderPropertySet = writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME) || writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_PROP); - - if (!isLockConfigSet) { - HoodieLockConfig.Builder lockConfigBuilder = HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps()); - writeConfig.setDefault(lockConfigBuilder.build()); - } - - if (isMetadataTableEnabled) { - // When metadata table is enabled, optimistic concurrency control must be used for - // single writer with async table services. - // Async table services can update the metadata table and a lock provider is - // needed to guard against any concurrent table write operations. If user has - // not configured any lock provider, let's use the InProcess lock provider. - boolean areTableServicesEnabled = writeConfig.areTableServicesEnabled(); - boolean areAsyncTableServicesEnabled = writeConfig.areAnyTableServicesAsync(); - - if (!isLockProviderPropertySet && areTableServicesEnabled && areAsyncTableServicesEnabled) { - // This is targeted at Single writer with async table services - // If user does not set the lock provider, likely that the concurrency mode is not set either - // Override the configs for metadata table - writeConfig.setValue(WRITE_CONCURRENCY_MODE.key(), - WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value()); - writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), - InProcessLockProvider.class.getName()); - LOG.info(String.format("Automatically set %s=%s and %s=%s since user has not set the " - + "lock provider for single writer with async table services", - WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value(), - HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getName())); + writeConfig.setDefaultOnCondition(!isLockConfigSet, + HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps()).build()); + + autoAdjustConfigsForConcurrencyMode(isLockProviderPropertySet); + } + + private void autoAdjustConfigsForConcurrencyMode(boolean isLockProviderPropertySet) { + if (writeConfig.isAutoAdjustLockConfigs()) { + // auto adjustment is required only for deltastreamer and spark streaming where async table services can be executed in the same JVM. + boolean isMetadataTableEnabled = writeConfig.getBoolean(HoodieMetadataConfig.ENABLE); + + if (isMetadataTableEnabled) { + // When metadata table is enabled, optimistic concurrency control must be used for + // single writer with async table services. + // Async table services can update the metadata table and a lock provider is + // needed to guard against any concurrent table write operations. If user has + // not configured any lock provider, let's use the InProcess lock provider. + boolean areTableServicesEnabled = writeConfig.areTableServicesEnabled(); + boolean areAsyncTableServicesEnabled = writeConfig.areAnyTableServicesAsync(); + if (!isLockProviderPropertySet && areTableServicesEnabled && areAsyncTableServicesEnabled) { + // This is targeted at Single writer with async table services + // If user does not set the lock provider, likely that the concurrency mode is not set either + // Override the configs for metadata table + writeConfig.setValue(WRITE_CONCURRENCY_MODE.key(), + WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value()); + writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), + InProcessLockProvider.class.getName()); + LOG.info(String.format("Automatically set %s=%s and %s=%s since user has not set the " + + "lock provider for single writer with async table services", + WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value(), + HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getName())); + } } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java index 778bef7..85d4096 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java @@ -136,6 +136,7 @@ public class TestHoodieWriteConfig { put(INLINE_COMPACT.key(), "true"); put(AUTO_CLEAN.key(), "true"); put(ASYNC_CLEAN.key(), "false"); + put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true"); } }), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName); @@ -148,6 +149,7 @@ public class TestHoodieWriteConfig { put(INLINE_COMPACT.key(), "true"); put(AUTO_CLEAN.key(), "true"); put(ASYNC_CLEAN.key(), "true"); + put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true"); } }), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName); @@ -160,6 +162,7 @@ public class TestHoodieWriteConfig { put(INLINE_COMPACT.key(), "false"); put(AUTO_CLEAN.key(), "true"); put(ASYNC_CLEAN.key(), "false"); + put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true"); } }), true, tableType == HoodieTableType.MERGE_ON_READ, @@ -181,6 +184,7 @@ public class TestHoodieWriteConfig { put(INLINE_COMPACT.key(), "true"); put(AUTO_CLEAN.key(), "true"); put(ASYNC_CLEAN.key(), "false"); + put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true"); } }), Option.of(true), Option.of(false), Option.of(true), WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()), @@ -190,6 +194,38 @@ public class TestHoodieWriteConfig { @ParameterizedTest @EnumSource(HoodieTableType.class) + public void testAutoAdjustLockConfigs(HoodieTableType tableType) { + TypedProperties properties = new TypedProperties(); + properties.setProperty(HoodieTableConfig.TYPE.key(), tableType.name()); + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp") + .withAutoAdjustLockConfigs(false) + .withClusteringConfig(new HoodieClusteringConfig.Builder().withAsyncClustering(true).build()) + .withProperties(properties) + .build(); + + verifyConcurrencyControlRelatedConfigs(writeConfig, + true, true, + WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()), + HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()), + HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue()); + + writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp") + .withAutoAdjustLockConfigs(false) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withClusteringConfig(new HoodieClusteringConfig.Builder().withAsyncClustering(true).build()) + .withProperties(properties) + .build(); + + verifyConcurrencyControlRelatedConfigs(writeConfig, + true, true, + WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, HoodieFailedWritesCleaningPolicy.LAZY, + HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue()); + } + + @ParameterizedTest + @EnumSource(HoodieTableType.class) public void testAutoConcurrencyConfigAdjustmentWithUserConfigs(HoodieTableType tableType) { // 1. User override for the lock provider should always take the precedence TypedProperties properties = new TypedProperties(); @@ -199,8 +235,10 @@ public class TestHoodieWriteConfig { .withLockConfig(HoodieLockConfig.newBuilder() .withLockProvider(FileSystemBasedLockProviderTestClass.class) .build()) + .withAutoAdjustLockConfigs(true) .withProperties(properties) .build(); + verifyConcurrencyControlRelatedConfigs(writeConfig, true, tableType == HoodieTableType.MERGE_ON_READ, WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()), @@ -217,6 +255,7 @@ public class TestHoodieWriteConfig { put(ASYNC_CLEAN.key(), "true"); put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), ZookeeperBasedLockProvider.class.getName()); + put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true"); } }), true, true, WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()), @@ -227,6 +266,7 @@ public class TestHoodieWriteConfig { writeConfig = createWriteConfig(new HashMap<String, String>() { { put(HoodieTableConfig.TYPE.key(), tableType.name()); + put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true"); } }); if (writeConfig.areAnyTableServicesAsync()) { @@ -252,6 +292,7 @@ public class TestHoodieWriteConfig { { put(HoodieTableConfig.TYPE.key(), tableType.name()); put(TABLE_SERVICES_ENABLED.key(), "false"); + put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true"); } }), false, tableType == HoodieTableType.MERGE_ON_READ, WriteConcurrencyMode.fromValue(WRITE_CONCURRENCY_MODE.defaultValue()), @@ -268,6 +309,7 @@ public class TestHoodieWriteConfig { WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value()); put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), FileSystemBasedLockProviderTestClass.class.getName()); + put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true"); } }), false, tableType == HoodieTableType.MERGE_ON_READ, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, @@ -288,6 +330,7 @@ public class TestHoodieWriteConfig { put(INLINE_COMPACT.key(), "true"); put(AUTO_CLEAN.key(), "true"); put(ASYNC_CLEAN.key(), "false"); + put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true"); } }), true, true, WriteConcurrencyMode.fromValue(WRITE_CONCURRENCY_MODE.defaultValue()), @@ -306,6 +349,7 @@ public class TestHoodieWriteConfig { WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value()); put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), FileSystemBasedLockProviderTestClass.class.getName()); + put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true"); } }), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, HoodieFailedWritesCleaningPolicy.LAZY, FileSystemBasedLockProviderTestClass.class.getName()); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala index 93580de..2befb47 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala @@ -81,7 +81,9 @@ class HoodieStreamingSink(sqlContext: SQLContext, // Override to use direct markers. In Structured streaming, timeline server is closed after // first micro-batch and subsequent micro-batches do not have timeline server running. // Thus, we can't use timeline-server-based markers. - val updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name()) + var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name()) + // we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM. + updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true") retry(retryCnt, retryIntervalMs)( Try( diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 92e123b..5d1fd19 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -43,8 +43,8 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; -import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException; import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException; +import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException; import org.apache.hudi.utilities.schema.ChainedSchemaPostProcessor; import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; import org.apache.hudi.utilities.schema.RowBasedSchemaProvider; @@ -272,6 +272,7 @@ public class UtilHelpers { sparkConf.set("spark.eventLog.overwrite", "true"); sparkConf.set("spark.eventLog.enabled", "true"); } + sparkConf.set("spark.ui.port", "8090"); sparkConf.setIfMissing("spark.driver.maxResultSize", "2g"); sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); sparkConf.set("spark.hadoop.mapred.output.compress", "true"); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 2e83233..4b0f148 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; @@ -127,7 +128,6 @@ public class HoodieDeltaStreamer implements Serializable { public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf, Option<TypedProperties> propsOverride) throws IOException { this.properties = combineProperties(cfg, propsOverride, jssc.hadoopConfiguration()); - if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) { InitialCheckPointProvider checkPointProvider = UtilHelpers.createInitialCheckpointProvider(cfg.initialCheckpointProvider, this.properties); @@ -156,7 +156,14 @@ public class HoodieDeltaStreamer implements Serializable { hoodieConfig.setAll(UtilHelpers.readConfig(hadoopConf, new Path(cfg.propsFilePath), cfg.configs).getProps()); } + // set any configs that Deltastreamer has to override explicitly hoodieConfig.setDefaultValue(DataSourceWriteOptions.RECONCILE_SCHEMA()); + // we need auto adjustment enabled for deltastreamer since async table services are feasible within the same JVM. + hoodieConfig.setValue(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true"); + if (cfg.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) { + // Explicitly set the table type + hoodieConfig.setValue(HoodieTableConfig.TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); + } return hoodieConfig.getProps(true); }