nsivabalan commented on code in PR #18295:
URL: https://github.com/apache/hudi/pull/18295#discussion_r2960435888


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java:
##########
@@ -350,6 +383,28 @@ public static HoodieWriteConfig createMetadataWriteConfig(
     }
 
     HoodieWriteConfig metadataWriteConfig = builder.build();
+    if (mergeMetdataLockConfigAtEnd) {
+      // We need to update the MDT write config to have the same lock related 
configs as the data table.
+      // All data table props with the lock prefix are always copied (to 
override MDT defaults with
+      // user-configured values). Other data table props not present in MDT 
config are also copied to
+      // support custom lock providers that may use non-standard config keys.
+      Properties lockProps = new Properties();
+      TypedProperties dataTableProps = writeConfig.getProps();
+      TypedProperties mdtProps = metadataWriteConfig.getProps();
+      for (String key : dataTableProps.stringPropertyNames()) {
+        if (key.startsWith(LockConfiguration.LOCK_PREFIX) || 
!mdtProps.containsKey(key)) {
+          lockProps.setProperty(key, dataTableProps.getProperty(key));
+        }
+      }
+      lockProps.setProperty(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
metadataWriteConcurrencyMode.name());
+      String lockProviderClass = writeConfig.getLockProviderClass();
+      checkState(lockProviderClass != null, "Lock provider class must be set 
for metadata table");

Review Comment:
   minor. `Lock provider class must be set for data table to enable async 
executions of table services in metadata table`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java:
##########
@@ -147,14 +150,39 @@ public static HoodieWriteConfig createMetadataWriteConfig(
       HoodieTableVersion datatableVersion) {
     String tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
     boolean isStreamingWritesToMetadataEnabled = 
writeConfig.isMetadataStreamingWritesEnabled(datatableVersion);
-    WriteConcurrencyMode concurrencyMode = isStreamingWritesToMetadataEnabled
-        ? WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL : 
WriteConcurrencyMode.SINGLE_WRITER;
-    HoodieLockConfig lockConfig = isStreamingWritesToMetadataEnabled
-        ? 
HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()
 : HoodieLockConfig.newBuilder().build();
-    // HUDI-9407 tracks adding support for separate lock configuration for 
MDT. Until then, all writes to MDT will happen within data table lock.
-
-    if (isStreamingWritesToMetadataEnabled) {
+    WriteConcurrencyMode metadataWriteConcurrencyMode =
+        
WriteConcurrencyMode.valueOf(writeConfig.getMetadataConfig().getWriteConcurrencyMode());
+
+    WriteConcurrencyMode concurrencyMode;
+    HoodieLockConfig lockConfig;
+    final boolean mergeMetdataLockConfigAtEnd;
+    if (metadataWriteConcurrencyMode.supportsMultiWriter()) {
+      // Configuring Multi-writer directly on metadata table is intended for 
executing table service plans, not for writes.
+      checkState(!isStreamingWritesToMetadataEnabled,
+          "Streaming writes to metadata table must be disabled when using 
multi-writer concurrency mode "
+              + metadataWriteConcurrencyMode + ". Disable " + 
HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key());
+      checkState(metadataWriteConcurrencyMode == 
writeConfig.getWriteConcurrencyMode(),
+          "If multiwriter is used on metadata table, its concurrency mode (" + 
metadataWriteConcurrencyMode
+              + ") must match the data table concurrency mode (" + 
writeConfig.getWriteConcurrencyMode() + ")");
+      // First lets create te MDT write config with default single writer lock 
configs.
+      // Then, once all MDT-specific write configs are set, we can create a 
lock config 
+      // containing all write config raw props in data table that aren't in 
the raw props
+      // for MDT write config. And then, re-build the MDT write config with 
this merged lock config.
+      concurrencyMode = WriteConcurrencyMode.SINGLE_WRITER;
+      lockConfig = HoodieLockConfig.newBuilder().build();
       failedWritesCleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
+      mergeMetdataLockConfigAtEnd = true;

Review Comment:
   can we validate the lock provider class alone here and fail fast. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java:
##########
@@ -147,14 +150,39 @@ public static HoodieWriteConfig createMetadataWriteConfig(
       HoodieTableVersion datatableVersion) {
     String tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
     boolean isStreamingWritesToMetadataEnabled = 
writeConfig.isMetadataStreamingWritesEnabled(datatableVersion);
-    WriteConcurrencyMode concurrencyMode = isStreamingWritesToMetadataEnabled
-        ? WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL : 
WriteConcurrencyMode.SINGLE_WRITER;
-    HoodieLockConfig lockConfig = isStreamingWritesToMetadataEnabled
-        ? 
HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()
 : HoodieLockConfig.newBuilder().build();
-    // HUDI-9407 tracks adding support for separate lock configuration for 
MDT. Until then, all writes to MDT will happen within data table lock.
-
-    if (isStreamingWritesToMetadataEnabled) {
+    WriteConcurrencyMode metadataWriteConcurrencyMode =
+        
WriteConcurrencyMode.valueOf(writeConfig.getMetadataConfig().getWriteConcurrencyMode());
+
+    WriteConcurrencyMode concurrencyMode;
+    HoodieLockConfig lockConfig;
+    final boolean mergeMetdataLockConfigAtEnd;

Review Comment:
   `deriveMetadataLockConfigsFromDataTableConfigs



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java:
##########
@@ -350,6 +383,28 @@ public static HoodieWriteConfig createMetadataWriteConfig(
     }
 
     HoodieWriteConfig metadataWriteConfig = builder.build();
+    if (mergeMetdataLockConfigAtEnd) {
+      // We need to update the MDT write config to have the same lock related 
configs as the data table.
+      // All data table props with the lock prefix are always copied (to 
override MDT defaults with
+      // user-configured values). Other data table props not present in MDT 
config are also copied to
+      // support custom lock providers that may use non-standard config keys.
+      Properties lockProps = new Properties();
+      TypedProperties dataTableProps = writeConfig.getProps();
+      TypedProperties mdtProps = metadataWriteConfig.getProps();
+      for (String key : dataTableProps.stringPropertyNames()) {
+        if (key.startsWith(LockConfiguration.LOCK_PREFIX) || 
!mdtProps.containsKey(key)) {
+          lockProps.setProperty(key, dataTableProps.getProperty(key));
+        }
+      }
+      lockProps.setProperty(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
metadataWriteConcurrencyMode.name());
+      String lockProviderClass = writeConfig.getLockProviderClass();
+      checkState(lockProviderClass != null, "Lock provider class must be set 
for metadata table");

Review Comment:
   also, wondering if we should check that its not InProcessLockProvider as 
well 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java:
##########
@@ -350,6 +383,28 @@ public static HoodieWriteConfig createMetadataWriteConfig(
     }
 
     HoodieWriteConfig metadataWriteConfig = builder.build();
+    if (mergeMetdataLockConfigAtEnd) {
+      // We need to update the MDT write config to have the same lock related 
configs as the data table.
+      // All data table props with the lock prefix are always copied (to 
override MDT defaults with
+      // user-configured values). Other data table props not present in MDT 
config are also copied to
+      // support custom lock providers that may use non-standard config keys.
+      Properties lockProps = new Properties();
+      TypedProperties dataTableProps = writeConfig.getProps();
+      TypedProperties mdtProps = metadataWriteConfig.getProps();
+      for (String key : dataTableProps.stringPropertyNames()) {
+        if (key.startsWith(LockConfiguration.LOCK_PREFIX) || 
!mdtProps.containsKey(key)) {
+          lockProps.setProperty(key, dataTableProps.getProperty(key));
+        }
+      }
+      lockProps.setProperty(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
metadataWriteConcurrencyMode.name());
+      String lockProviderClass = writeConfig.getLockProviderClass();
+      checkState(lockProviderClass != null, "Lock provider class must be set 
for metadata table");
+      lockProps.setProperty(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), 
lockProviderClass);
+      metadataWriteConfig = HoodieWriteConfig.newBuilder()
+          .withProperties(metadataWriteConfig.getProps())

Review Comment:
   minor. we could use `mdtProps`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java:
##########
@@ -350,6 +383,28 @@ public static HoodieWriteConfig createMetadataWriteConfig(
     }
 
     HoodieWriteConfig metadataWriteConfig = builder.build();
+    if (mergeMetdataLockConfigAtEnd) {
+      // We need to update the MDT write config to have the same lock related 
configs as the data table.
+      // All data table props with the lock prefix are always copied (to 
override MDT defaults with
+      // user-configured values). Other data table props not present in MDT 
config are also copied to
+      // support custom lock providers that may use non-standard config keys.
+      Properties lockProps = new Properties();
+      TypedProperties dataTableProps = writeConfig.getProps();
+      TypedProperties mdtProps = metadataWriteConfig.getProps();
+      for (String key : dataTableProps.stringPropertyNames()) {
+        if (key.startsWith(LockConfiguration.LOCK_PREFIX) || 
!mdtProps.containsKey(key)) {

Review Comment:
   oh, something that striked me just now. Don't we need to have reentrancy 
support for all lock providers if we are going w/ this solution? 
   if not, how can we support this for any LP?
   bcoz, the writes to mdt happens w/n data table lock, irrespective of whether 
we mdt is single writer or not. If its not a single writer, then writes to mdt 
will take a new set of locks we are configuring. 
   
   So, if a given LP does not support re-entrancy, this may go into deadlock 
right? 



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -81,6 +82,28 @@ public final class HoodieMetadataConfig extends HoodieConfig 
{
           + "in streaming manner rather than two disjoint writes. By default "
           + "streaming writes to metadata table is enabled for SPARK engine 
for incremental operations and disabled for all other cases.");
 
+  public static final ConfigProperty<String> METADATA_WRITE_CONCURRENCY_MODE = 
ConfigProperty
+      .key(METADATA_PREFIX + ".write.concurrency.mode")
+      .defaultValue(WriteConcurrencyMode.SINGLE_WRITER.name())
+      .markAdvanced()
+      .withDocumentation("Change this to OPTIMISTIC_CONCURRENCY_CONTROL when 
MDT operations are being performed "
+          + "from an external concurrent writer (such as a table service 
platform) so that appropriate locks are taken.");
+
+  public static final ConfigProperty<Boolean> TABLE_SERVICE_MANAGER_ENABLED = 
ConfigProperty
+      .key(METADATA_PREFIX + ".table.service.manager.enabled")
+      .defaultValue(false)
+      .markAdvanced()
+      .withDocumentation("If true, delegate specified table service actions on 
the metadata table to the table service manager "
+          + "instead of executing them inline. This prevents the current 
writer from executing compaction/logcompaction "
+          + "on the metadata table, allowing a separate async pipeline to 
handle them.");
+
+  public static final ConfigProperty<String> TABLE_SERVICE_MANAGER_ACTIONS = 
ConfigProperty
+      .key(METADATA_PREFIX + ".table.service.manager.actions")
+      .defaultValue("")
+      .markAdvanced()
+      .withDocumentation("Comma-separated list of table service actions on the 
metadata table "
+          + "that should be delegated to the table service manager. Currently 
supported actions are: compaction, logcompaction.");

Review Comment:
   should we throw if someone sets any other actions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to