This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 84064a9  [HUDI-3772] Fixing auto adjustment of lock configs for 
deltastreamer (#5207)
84064a9 is described below

commit 84064a9b081c246f306855ae125f0dae5eb8f6d0
Author: Sivabalan Narayanan <n.siv...@gmail.com>
AuthorDate: Sat Apr 2 23:44:10 2022 -0700

    [HUDI-3772] Fixing auto adjustment of lock configs for deltastreamer (#5207)
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 79 +++++++++++++---------
 .../apache/hudi/config/TestHoodieWriteConfig.java  | 44 ++++++++++++
 .../org/apache/hudi/HoodieStreamingSink.scala      |  4 +-
 .../org/apache/hudi/utilities/UtilHelpers.java     |  3 +-
 .../deltastreamer/HoodieDeltaStreamer.java         |  9 ++-
 5 files changed, 104 insertions(+), 35 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 23f1f38..87dd56b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -480,6 +480,12 @@ public class HoodieWriteConfig extends HoodieConfig {
       .sinceVersion("0.11.0")
       .withDocumentation("Control to enable release all persist rdds when the 
spark job finish.");
 
+  public static final ConfigProperty<Boolean> AUTO_ADJUST_LOCK_CONFIGS = 
ConfigProperty
+      .key("hoodie.auto.adjust.lock.configs")
+      .defaultValue(false)
+      .sinceVersion("0.11.0")
+      .withDocumentation("Auto adjust lock configurations when metadata table 
is enabled and for async table services.");
+
   private ConsistencyGuardConfig consistencyGuardConfig;
   private FileSystemRetryConfig fileSystemRetryConfig;
 
@@ -1968,6 +1974,9 @@ public class HoodieWriteConfig extends HoodieConfig {
    * Hoodie Client Lock Configs.
    * @return
    */
+  public boolean isAutoAdjustLockConfigs() {
+    return getBooleanOrDefault(AUTO_ADJUST_LOCK_CONFIGS);
+  }
 
   public String getLockProviderClass() {
     return getString(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME);
@@ -2443,6 +2452,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withAutoAdjustLockConfigs(boolean autoAdjustLockConfigs) {
+      writeConfig.setValue(AUTO_ADJUST_LOCK_CONFIGS, 
String.valueOf(autoAdjustLockConfigs));
+      return this;
+    }
+
     protected void setDefaults() {
       writeConfig.setDefaultValue(MARKERS_TYPE, 
getDefaultMarkersType(engineType));
       // Check for mandatory properties
@@ -2480,41 +2494,42 @@ public class HoodieWriteConfig extends HoodieConfig {
           
HoodieLayoutConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
       writeConfig.setDefaultValue(TIMELINE_LAYOUT_VERSION_NUM, 
String.valueOf(TimelineLayoutVersion.CURR_VERSION));
 
-      autoAdjustConfigsForConcurrencyMode();
-    }
-
-    private void autoAdjustConfigsForConcurrencyMode() {
-      boolean isMetadataTableEnabled = 
writeConfig.getBoolean(HoodieMetadataConfig.ENABLE);
+      // isLockProviderPropertySet must be fetched before setting defaults of 
HoodieLockConfig
       final TypedProperties writeConfigProperties = writeConfig.getProps();
       final boolean isLockProviderPropertySet = 
writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME)
           || 
writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_PROP);
-      
-      if (!isLockConfigSet) {
-        HoodieLockConfig.Builder lockConfigBuilder = 
HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps());
-        writeConfig.setDefault(lockConfigBuilder.build());
-      }
-
-      if (isMetadataTableEnabled) {
-        // When metadata table is enabled, optimistic concurrency control must 
be used for
-        // single writer with async table services.
-        // Async table services can update the metadata table and a lock 
provider is
-        // needed to guard against any concurrent table write operations. If 
user has
-        // not configured any lock provider, let's use the InProcess lock 
provider.
-        boolean areTableServicesEnabled = 
writeConfig.areTableServicesEnabled();
-        boolean areAsyncTableServicesEnabled = 
writeConfig.areAnyTableServicesAsync();
-
-        if (!isLockProviderPropertySet && areTableServicesEnabled && 
areAsyncTableServicesEnabled) {
-          // This is targeted at Single writer with async table services
-          // If user does not set the lock provider, likely that the 
concurrency mode is not set either
-          // Override the configs for metadata table
-          writeConfig.setValue(WRITE_CONCURRENCY_MODE.key(),
-              WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
-          writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
-              InProcessLockProvider.class.getName());
-          LOG.info(String.format("Automatically set %s=%s and %s=%s since user 
has not set the "
-                  + "lock provider for single writer with async table 
services",
-              WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value(),
-              HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), 
InProcessLockProvider.class.getName()));
+      writeConfig.setDefaultOnCondition(!isLockConfigSet,
+          
HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
+
+      autoAdjustConfigsForConcurrencyMode(isLockProviderPropertySet);
+    }
+
+    private void autoAdjustConfigsForConcurrencyMode(boolean 
isLockProviderPropertySet) {
+      if (writeConfig.isAutoAdjustLockConfigs()) {
+        // auto adjustment is required only for deltastreamer and spark 
streaming where async table services can be executed in the same JVM.
+        boolean isMetadataTableEnabled = 
writeConfig.getBoolean(HoodieMetadataConfig.ENABLE);
+
+        if (isMetadataTableEnabled) {
+          // When metadata table is enabled, optimistic concurrency control 
must be used for
+          // single writer with async table services.
+          // Async table services can update the metadata table and a lock 
provider is
+          // needed to guard against any concurrent table write operations. If 
user has
+          // not configured any lock provider, let's use the InProcess lock 
provider.
+          boolean areTableServicesEnabled = 
writeConfig.areTableServicesEnabled();
+          boolean areAsyncTableServicesEnabled = 
writeConfig.areAnyTableServicesAsync();
+          if (!isLockProviderPropertySet && areTableServicesEnabled && 
areAsyncTableServicesEnabled) {
+            // This is targeted at Single writer with async table services
+            // If user does not set the lock provider, likely that the 
concurrency mode is not set either
+            // Override the configs for metadata table
+            writeConfig.setValue(WRITE_CONCURRENCY_MODE.key(),
+                WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
+            
writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
+                InProcessLockProvider.class.getName());
+            LOG.info(String.format("Automatically set %s=%s and %s=%s since 
user has not set the "
+                    + "lock provider for single writer with async table 
services",
+                WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value(),
+                HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), 
InProcessLockProvider.class.getName()));
+          }
         }
       }
 
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index 778bef7..85d4096 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -136,6 +136,7 @@ public class TestHoodieWriteConfig {
             put(INLINE_COMPACT.key(), "true");
             put(AUTO_CLEAN.key(), "true");
             put(ASYNC_CLEAN.key(), "false");
+            put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
           }
         }), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
         HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
@@ -148,6 +149,7 @@ public class TestHoodieWriteConfig {
             put(INLINE_COMPACT.key(), "true");
             put(AUTO_CLEAN.key(), "true");
             put(ASYNC_CLEAN.key(), "true");
+            put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
           }
         }), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
         HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
@@ -160,6 +162,7 @@ public class TestHoodieWriteConfig {
             put(INLINE_COMPACT.key(), "false");
             put(AUTO_CLEAN.key(), "true");
             put(ASYNC_CLEAN.key(), "false");
+            put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
           }
         }), true,
         tableType == HoodieTableType.MERGE_ON_READ,
@@ -181,6 +184,7 @@ public class TestHoodieWriteConfig {
             put(INLINE_COMPACT.key(), "true");
             put(AUTO_CLEAN.key(), "true");
             put(ASYNC_CLEAN.key(), "false");
+            put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
           }
         }), Option.of(true), Option.of(false), Option.of(true),
         WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
@@ -190,6 +194,38 @@ public class TestHoodieWriteConfig {
 
   @ParameterizedTest
   @EnumSource(HoodieTableType.class)
+  public void testAutoAdjustLockConfigs(HoodieTableType tableType) {
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(HoodieTableConfig.TYPE.key(), tableType.name());
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath("/tmp")
+        .withAutoAdjustLockConfigs(false)
+        .withClusteringConfig(new 
HoodieClusteringConfig.Builder().withAsyncClustering(true).build())
+        .withProperties(properties)
+        .build();
+
+    verifyConcurrencyControlRelatedConfigs(writeConfig,
+        true, true,
+        WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
+        
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
+        HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
+
+    writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath("/tmp")
+        .withAutoAdjustLockConfigs(false)
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        .withClusteringConfig(new 
HoodieClusteringConfig.Builder().withAsyncClustering(true).build())
+        .withProperties(properties)
+        .build();
+
+    verifyConcurrencyControlRelatedConfigs(writeConfig,
+        true, true,
+        WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, 
HoodieFailedWritesCleaningPolicy.LAZY,
+        HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
+  }
+
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
   public void 
testAutoConcurrencyConfigAdjustmentWithUserConfigs(HoodieTableType tableType) {
     // 1. User override for the lock provider should always take the precedence
     TypedProperties properties = new TypedProperties();
@@ -199,8 +235,10 @@ public class TestHoodieWriteConfig {
         .withLockConfig(HoodieLockConfig.newBuilder()
             .withLockProvider(FileSystemBasedLockProviderTestClass.class)
             .build())
+        .withAutoAdjustLockConfigs(true)
         .withProperties(properties)
         .build();
+
     verifyConcurrencyControlRelatedConfigs(writeConfig,
         true, tableType == HoodieTableType.MERGE_ON_READ,
         WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
@@ -217,6 +255,7 @@ public class TestHoodieWriteConfig {
             put(ASYNC_CLEAN.key(), "true");
             put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
                 ZookeeperBasedLockProvider.class.getName());
+            put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
           }
         }), true, true,
         WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
@@ -227,6 +266,7 @@ public class TestHoodieWriteConfig {
     writeConfig = createWriteConfig(new HashMap<String, String>() {
       {
         put(HoodieTableConfig.TYPE.key(), tableType.name());
+        put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
       }
     });
     if (writeConfig.areAnyTableServicesAsync()) {
@@ -252,6 +292,7 @@ public class TestHoodieWriteConfig {
           {
             put(HoodieTableConfig.TYPE.key(), tableType.name());
             put(TABLE_SERVICES_ENABLED.key(), "false");
+            put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
           }
         }), false, tableType == HoodieTableType.MERGE_ON_READ,
         WriteConcurrencyMode.fromValue(WRITE_CONCURRENCY_MODE.defaultValue()),
@@ -268,6 +309,7 @@ public class TestHoodieWriteConfig {
                 WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
             put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
                 FileSystemBasedLockProviderTestClass.class.getName());
+            put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
           }
         }), false, tableType == HoodieTableType.MERGE_ON_READ,
         WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
@@ -288,6 +330,7 @@ public class TestHoodieWriteConfig {
             put(INLINE_COMPACT.key(), "true");
             put(AUTO_CLEAN.key(), "true");
             put(ASYNC_CLEAN.key(), "false");
+            put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
           }
         }), true, true,
         WriteConcurrencyMode.fromValue(WRITE_CONCURRENCY_MODE.defaultValue()),
@@ -306,6 +349,7 @@ public class TestHoodieWriteConfig {
                 WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
             put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
                 FileSystemBasedLockProviderTestClass.class.getName());
+            put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
           }
         }), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
         HoodieFailedWritesCleaningPolicy.LAZY, 
FileSystemBasedLockProviderTestClass.class.getName());
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index 93580de..2befb47 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -81,7 +81,9 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     // Override to use direct markers. In Structured streaming, timeline 
server is closed after
     // first micro-batch and subsequent micro-batches do not have timeline 
server running.
     // Thus, we can't use timeline-server-based markers.
-    val updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), 
MarkerType.DIRECT.name())
+    var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), 
MarkerType.DIRECT.name())
+    // we need auto adjustment enabled for streaming sink since async table 
services are feasible within the same JVM.
+    updatedOptions = 
updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
 
     retry(retryCnt, retryIntervalMs)(
       Try(
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 92e123b..5d1fd19 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -43,8 +43,8 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
-import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
 import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
+import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
 import org.apache.hudi.utilities.schema.ChainedSchemaPostProcessor;
 import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
 import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
@@ -272,6 +272,7 @@ public class UtilHelpers {
       sparkConf.set("spark.eventLog.overwrite", "true");
       sparkConf.set("spark.eventLog.enabled", "true");
     }
+    sparkConf.set("spark.ui.port", "8090");
     sparkConf.setIfMissing("spark.driver.maxResultSize", "2g");
     sparkConf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
     sparkConf.set("spark.hadoop.mapred.output.compress", "true");
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 2e83233..4b0f148 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -35,6 +35,7 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
@@ -127,7 +128,6 @@ public class HoodieDeltaStreamer implements Serializable {
   public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, 
Configuration conf,
                              Option<TypedProperties> propsOverride) throws 
IOException {
     this.properties = combineProperties(cfg, propsOverride, 
jssc.hadoopConfiguration());
-
     if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) {
       InitialCheckPointProvider checkPointProvider =
           
UtilHelpers.createInitialCheckpointProvider(cfg.initialCheckpointProvider, 
this.properties);
@@ -156,7 +156,14 @@ public class HoodieDeltaStreamer implements Serializable {
       hoodieConfig.setAll(UtilHelpers.readConfig(hadoopConf, new 
Path(cfg.propsFilePath), cfg.configs).getProps());
     }
 
+    // set any configs that Deltastreamer has to override explicitly
     hoodieConfig.setDefaultValue(DataSourceWriteOptions.RECONCILE_SCHEMA());
+    // we need auto adjustment enabled for deltastreamer since async table 
services are feasible within the same JVM.
+    hoodieConfig.setValue(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), 
"true");
+    if (cfg.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
+      // Explicitly set the table type
+      hoodieConfig.setValue(HoodieTableConfig.TYPE.key(), 
HoodieTableType.MERGE_ON_READ.name());
+    }
 
     return hoodieConfig.getProps(true);
   }

Reply via email to