watermelon12138 commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r815720738



##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws 
IOException {
   private static String resetTarget(Config configuration, String database, 
String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' 
? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + 
database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + 
database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + 
tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does 
incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same 
target.

Review comment:
       Thank you for your advice. I have changed corresponding content in the 
code.

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws 
IOException {
   private static String resetTarget(Config configuration, String database, 
String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' 
? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + 
database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + 
database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + 
tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does 
incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same 
target.
     for (TableExecutionContext context : tableExecutionContexts) {
       try {
-        new HoodieDeltaStreamer(context.getConfig(), jssc, 
Option.ofNullable(context.getProperties())).sync();
+        HoodieDeltaStreamer hds = new HoodieDeltaStreamer(context.getConfig(), 
jssc, Option.ofNullable(context.getProperties()));
+
+        // Add object of HoodieDeltaStreamer temporarily to hdsObjectList when 
multiple sources update the same target.
+        if 
(!StringUtils.isNullOrEmpty(context.getProperties().getProperty(Constants.SOURCES_TO_BE_BOUND)))
 {
+          hdsObjectList.add(hds);
+          continue;
+        }
+
+        hds.sync();
         successTables.add(Helpers.getTableWithDatabase(context));
       } catch (Exception e) {
-        logger.error("error while running MultiTableDeltaStreamer for table: " 
+ context.getTableName(), e);
+        logger.error("Error while running MultiTableDeltaStreamer for table: " 
+ context.getTableName(), e);
         failedTables.add(Helpers.getTableWithDatabase(context));
       }
     }
 
-    logger.info("Ingestion was successful for topics: " + successTables);
-    if (!failedTables.isEmpty()) {
-      logger.info("Ingestion failed for topics: " + failedTables);
+    // If hdsObjectList is empty, it indicates that all source sync operations 
have been completed. In this case, directly return.
+    if (hdsObjectList.isEmpty()) {
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+      }
+      return;
     }
+
+    // The sync function is executing here when multiple sources update the 
same target.
+    boolean isContinuousMode = hdsObjectList.get(0).cfg.continuousMode;

Review comment:
       Thank you for your advice. I have changed corresponding content in the 
code.

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws 
IOException {
   private static String resetTarget(Config configuration, String database, 
String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' 
? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + 
database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + 
database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + 
tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does 
incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same 
target.
     for (TableExecutionContext context : tableExecutionContexts) {
       try {
-        new HoodieDeltaStreamer(context.getConfig(), jssc, 
Option.ofNullable(context.getProperties())).sync();
+        HoodieDeltaStreamer hds = new HoodieDeltaStreamer(context.getConfig(), 
jssc, Option.ofNullable(context.getProperties()));
+
+        // Add object of HoodieDeltaStreamer temporarily to hdsObjectList when 
multiple sources update the same target.
+        if 
(!StringUtils.isNullOrEmpty(context.getProperties().getProperty(Constants.SOURCES_TO_BE_BOUND)))
 {
+          hdsObjectList.add(hds);
+          continue;
+        }
+
+        hds.sync();
         successTables.add(Helpers.getTableWithDatabase(context));
       } catch (Exception e) {
-        logger.error("error while running MultiTableDeltaStreamer for table: " 
+ context.getTableName(), e);
+        logger.error("Error while running MultiTableDeltaStreamer for table: " 
+ context.getTableName(), e);
         failedTables.add(Helpers.getTableWithDatabase(context));
       }
     }
 
-    logger.info("Ingestion was successful for topics: " + successTables);
-    if (!failedTables.isEmpty()) {
-      logger.info("Ingestion failed for topics: " + failedTables);
+    // If hdsObjectList is empty, it indicates that all source sync operations 
have been completed. In this case, directly return.
+    if (hdsObjectList.isEmpty()) {
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+      }
+      return;
     }
+
+    // The sync function is executing here when multiple sources update the 
same target.
+    boolean isContinuousMode = hdsObjectList.get(0).cfg.continuousMode;
+    do {
+      // Executing sync function by traversing hdsObjectList when multiple 
sources update the same target.
+      for (int i = 0; i < hdsObjectList.size(); i++) {
+        // Threads cannot be started when multiple sources update the same 
target.
+        if (isContinuousMode) {
+          hdsObjectList.get(i).cfg.continuousMode = false;
+        }
+
+        try {
+          hdsObjectList.get(i).sync();
+          
successTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+        } catch (Exception e) {
+          logger.error("Error while running MultiTableDeltaStreamer for table: 
"
+              + tableExecutionContexts.get(i).getTableName(), e);
+          
failedTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+          break;
+        }
+      }
+
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+        break;
+      }
+      successTables.clear();
+    } while (isContinuousMode);
   }
 
   public static class Constants {
+    // When there are multiple sources, you can use this configuration item to 
set an independent checkpoint for the source.
+    public static final String SOURCE_CHECKPOINT = 
"hoodie.deltastreamer.current.source.checkpoint";
+
+    // If there are multiple sources, you can use this configuration item to 
set an alias for the source to distinguish the source.
+    // In addition, the alias is used as a suffix to distinguish the 
CHECKPOINT_KEY and CHECKPOINT_RESET_KEY of each source.
+    public static final String SOURCE_NAME = 
"hoodie.deltastreamer.current.source.name";
+
     public static final String KAFKA_TOPIC_PROP = 
"hoodie.deltastreamer.source.kafka.topic";
+
     private static final String SOURCE_SCHEMA_REGISTRY_URL_PROP = 
"hoodie.deltastreamer.schemaprovider.registry.url";
+
     private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = 
"hoodie.deltastreamer.schemaprovider.registry.targetUrl";
+
     public static final String HIVE_SYNC_TABLE_PROP = 
"hoodie.datasource.hive_sync.table";
+
     private static final String SCHEMA_REGISTRY_BASE_URL_PROP = 
"hoodie.deltastreamer.schemaprovider.registry.baseUrl";
+
     private static final String SCHEMA_REGISTRY_URL_SUFFIX_PROP = 
"hoodie.deltastreamer.schemaprovider.registry.urlSuffix";
+
     private static final String SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = 
"hoodie.deltastreamer.schemaprovider.registry.sourceUrlSuffix";
+
     private static final String SCHEMA_REGISTRY_TARGET_URL_SUFFIX = 
"hoodie.deltastreamer.schemaprovider.registry.targetUrlSuffix";
+
     private static final String TABLES_TO_BE_INGESTED_PROP = 
"hoodie.deltastreamer.ingestion.tablesToBeIngested";
+
+    // This configuration item specifies the database name and table name of 
the source. The format is "database.table".
+    // It is recommended that table name be the same as the alias of the 
source. If there are multiple sources, separate them with commas.
+    public static final String SOURCES_TO_BE_BOUND = 
"hoodie.deltastreamer.source.sourcesToBeBound";
+
+    private static final String SOURCE_PREFIX = "hoodie.deltastreamer.source.";
+
     private static final String INGESTION_PREFIX = 
"hoodie.deltastreamer.ingestion.";
+
     private static final String INGESTION_CONFIG_SUFFIX = ".configFile";
+
     private static final String DEFAULT_CONFIG_FILE_NAME_SUFFIX = 
"_config.properties";
+
     private static final String TARGET_BASE_PATH_PROP = 
"hoodie.deltastreamer.ingestion.targetBasePath";
+
     private static final String LOCAL_SPARK_MASTER = "local[2]";
-    private static final String FILE_DELIMITER = "/";
-    private static final String DELIMITER = ".";
+
+    public static final String PATH_SEPARATOR = "/";
+
+    public static final String PATH_CUR_DIR = ".";

Review comment:
       Thank you for your advice. I have changed corresponding content in the 
code.

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -104,43 +108,111 @@ private void checkIfTableConfigFileExists(String 
configFolder, FileSystem fs, St
     }
   }
 
-  //commonProps are passed as parameter which contain table to config file 
mapping
+  // commonProps are passed as parameter which contain table to config file 
mapping
   private void populateTableExecutionContextList(TypedProperties properties, 
String configFolder, FileSystem fs, Config config) throws IOException {
-    List<String> tablesToBeIngested = getTablesToBeIngested(properties);
-    logger.info("tables to be ingested via MultiTableDeltaStreamer : " + 
tablesToBeIngested);
-    TableExecutionContext executionContext;
-    for (String table : tablesToBeIngested) {
-      String[] tableWithDatabase = table.split("\\.");
-      String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] : 
"default";
-      String currentTable = tableWithDatabase.length > 1 ? 
tableWithDatabase[1] : table;
-      String configProp = Constants.INGESTION_PREFIX + database + 
Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
-      String configFilePath = properties.getString(configProp, 
Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
-      checkIfTableConfigFileExists(configFolder, fs, configFilePath);
-      TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), 
new Path(configFilePath), new ArrayList<String>()).getProps();
-      properties.forEach((k, v) -> {
-        if (tableProperties.get(k) == null) {
-          tableProperties.setProperty(k.toString(), v.toString());
-        }
-      });
-      final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
-      //copy all the values from config to cfg
-      String targetBasePath = resetTarget(config, database, currentTable);
-      Helpers.deepCopyConfigs(config, cfg);
-      String overriddenTargetBasePath = 
tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
-      cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) 
? targetBasePath : overriddenTargetBasePath;
-      if (cfg.enableMetaSync && 
StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key(),
 ""))) {
-        throw new HoodieException("Meta sync table field not provided!");
+    // Parameter SOURCES_TO_BE_BOUND indicates that multiple sources update 
the same target.
+    String sourcesToBeBound = 
properties.getProperty(Constants.SOURCES_TO_BE_BOUND);
+    if (!StringUtils.isNullOrEmpty(sourcesToBeBound)) {
+      // populate the table execution context by traversing source tables
+      List<String> sourcesToBeBonded = getSourcesToBeBound(properties);
+      logger.info("Source tables to be bound via MultiTableDeltaStreamer : " + 
sourcesToBeBonded);
+      String tableToBeIngested = getTableToBeIngested(properties);
+      String[] targetTableWithDataBase = tableToBeIngested.split("\\.");
+      String targetDataBase = targetTableWithDataBase.length > 1 ? 
targetTableWithDataBase[0] : "default";
+      String targetTable = targetTableWithDataBase.length > 1 ? 
targetTableWithDataBase[1] : targetTableWithDataBase[0];
+      String targetBasePath = resetTarget(config, targetDataBase, targetTable);
+
+      for (String source : sourcesToBeBonded) {
+        String[] tableWithDatabase = source.split("\\.");
+        String currentSourceDataBase = tableWithDatabase.length > 1 ? 
tableWithDatabase[0] : "default";
+        String currentSourceTable = tableWithDatabase.length > 1 ? 
tableWithDatabase[1] : source;
+        String configProp = Constants.SOURCE_PREFIX + currentSourceDataBase + 
Constants.PATH_CUR_DIR + currentSourceTable + Constants.INGESTION_CONFIG_SUFFIX;
+        TableExecutionContext executionContext = 
populateTableExecutionContext(properties, configFolder, fs, config, configProp, 
currentSourceDataBase, currentSourceTable, targetBasePath);
+        this.tableExecutionContexts.add(executionContext);
+      }
+    } else {
+      // populate the table execution context by traversing target tables
+      List<String> tablesToBeIngested = getTablesToBeIngested(properties);
+      logger.info("Tables to be ingested via MultiTableDeltaStreamer : " + 
tablesToBeIngested);
+
+      for (String table : tablesToBeIngested) {
+        String[] tableWithDatabase = table.split("\\.");
+        String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] 
: "default";
+        String currentTable = tableWithDatabase.length > 1 ? 
tableWithDatabase[1] : table;
+        String configProp = Constants.INGESTION_PREFIX + database + 
Constants.PATH_CUR_DIR + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
+        TableExecutionContext executionContext = 
populateTableExecutionContext(properties, configFolder, fs, config, configProp, 
database, currentTable, null);
+        this.tableExecutionContexts.add(executionContext);
       }
-      populateSchemaProviderProps(cfg, tableProperties);
-      executionContext = new TableExecutionContext();
-      executionContext.setProperties(tableProperties);
-      executionContext.setConfig(cfg);
-      executionContext.setDatabase(database);
-      executionContext.setTableName(currentTable);
-      this.tableExecutionContexts.add(executionContext);
     }
   }
 
+  private TableExecutionContext populateTableExecutionContext(TypedProperties 
properties, String configFolder,
+      FileSystem fs, Config config, String configProp, String database, String 
currentTable, String targetBasePath) throws IOException {
+    // copy all common properties to current table properties
+    TypedProperties currentTableProperties = 
getCurrentTableProperties(properties, configFolder, fs, configProp, database,
+        currentTable);
+
+    // copy all the values from config to cfg
+    final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
+    Helpers.deepCopyConfigs(config, cfg);
+
+    // calculate the value of targetBasePath which is a property of cfg
+    calculateTargetBasePath(config, database, currentTable, targetBasePath, 
currentTableProperties, cfg);
+
+    if (cfg.enableHiveSync && 
StringUtils.isNullOrEmpty(currentTableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key())))
 {
+      throw new HoodieException("Hive sync table field not provided!");
+    }
+
+    populateSchemaProviderProps(cfg, currentTableProperties);
+    TableExecutionContext executionContext = new TableExecutionContext();
+    executionContext.setProperties(currentTableProperties);
+    executionContext.setConfig(cfg);
+    executionContext.setDatabase(database);
+    executionContext.setTableName(currentTable);
+    return executionContext;
+  }
+
+  private TypedProperties getCurrentTableProperties(TypedProperties 
properties, String configFolder, FileSystem fs,
+      String configProp, String database, String currentTable) throws 
IOException {
+
+    String configFilePath = properties.getString(configProp, 
Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
+    checkIfTableConfigFileExists(configFolder, fs, configFilePath);
+
+    TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), new 
Path(configFilePath), new ArrayList<>()).getProps();
+    properties.forEach((k, v) -> {
+      if (tableProperties.get(k) == null) {
+        tableProperties.setProperty(k.toString(), v.toString());
+      }
+    });
+
+    return tableProperties;
+  }
+
+  private void calculateTargetBasePath(Config config, String database, String 
currentTable, String targetBasePath,
+      TypedProperties currentTableProperties, HoodieDeltaStreamer.Config cfg) {
+
+    String overriddenTargetBasePath = 
currentTableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
+
+    if (StringUtils.isNullOrEmpty(targetBasePath)) {
+      targetBasePath = resetTarget(config, database, currentTable);
+    }
+
+    cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath)
+      ? targetBasePath
+      : overriddenTargetBasePath;
+  }
+
+  private List<String> getSourcesToBeBound(TypedProperties properties) {
+    String combinedSourcesString = 
properties.getString(Constants.SOURCES_TO_BE_BOUND, null);
+    return StringUtils.isNullOrEmpty(combinedSourcesString)
+      ? new ArrayList<>()
+      : Arrays.asList(combinedSourcesString.split(Constants.COMMA_SEPARATOR));
+  }
+
+  private String getTableToBeIngested(TypedProperties properties) {

Review comment:
       Thank you for your advice. I have changed corresponding content in the 
code.

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -104,43 +108,111 @@ private void checkIfTableConfigFileExists(String 
configFolder, FileSystem fs, St
     }
   }
 
-  //commonProps are passed as parameter which contain table to config file 
mapping
+  // commonProps are passed as parameter which contain table to config file 
mapping
   private void populateTableExecutionContextList(TypedProperties properties, 
String configFolder, FileSystem fs, Config config) throws IOException {
-    List<String> tablesToBeIngested = getTablesToBeIngested(properties);
-    logger.info("tables to be ingested via MultiTableDeltaStreamer : " + 
tablesToBeIngested);
-    TableExecutionContext executionContext;
-    for (String table : tablesToBeIngested) {
-      String[] tableWithDatabase = table.split("\\.");
-      String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] : 
"default";
-      String currentTable = tableWithDatabase.length > 1 ? 
tableWithDatabase[1] : table;
-      String configProp = Constants.INGESTION_PREFIX + database + 
Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
-      String configFilePath = properties.getString(configProp, 
Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
-      checkIfTableConfigFileExists(configFolder, fs, configFilePath);
-      TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), 
new Path(configFilePath), new ArrayList<String>()).getProps();
-      properties.forEach((k, v) -> {
-        if (tableProperties.get(k) == null) {
-          tableProperties.setProperty(k.toString(), v.toString());
-        }
-      });
-      final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
-      //copy all the values from config to cfg
-      String targetBasePath = resetTarget(config, database, currentTable);
-      Helpers.deepCopyConfigs(config, cfg);
-      String overriddenTargetBasePath = 
tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
-      cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) 
? targetBasePath : overriddenTargetBasePath;
-      if (cfg.enableMetaSync && 
StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key(),
 ""))) {
-        throw new HoodieException("Meta sync table field not provided!");
+    // Parameter SOURCES_TO_BE_BOUND indicates that multiple sources update 
the same target.
+    String sourcesToBeBound = 
properties.getProperty(Constants.SOURCES_TO_BE_BOUND);
+    if (!StringUtils.isNullOrEmpty(sourcesToBeBound)) {
+      // populate the table execution context by traversing source tables
+      List<String> sourcesToBeBonded = getSourcesToBeBound(properties);
+      logger.info("Source tables to be bound via MultiTableDeltaStreamer : " + 
sourcesToBeBonded);
+      String tableToBeIngested = getTableToBeIngested(properties);
+      String[] targetTableWithDataBase = tableToBeIngested.split("\\.");
+      String targetDataBase = targetTableWithDataBase.length > 1 ? 
targetTableWithDataBase[0] : "default";

Review comment:
       Thank you for your advice. I have changed corresponding content in the 
code.

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -104,43 +108,111 @@ private void checkIfTableConfigFileExists(String 
configFolder, FileSystem fs, St
     }
   }
 
-  //commonProps are passed as parameter which contain table to config file 
mapping
+  // commonProps are passed as parameter which contain table to config file 
mapping
   private void populateTableExecutionContextList(TypedProperties properties, 
String configFolder, FileSystem fs, Config config) throws IOException {
-    List<String> tablesToBeIngested = getTablesToBeIngested(properties);
-    logger.info("tables to be ingested via MultiTableDeltaStreamer : " + 
tablesToBeIngested);
-    TableExecutionContext executionContext;
-    for (String table : tablesToBeIngested) {
-      String[] tableWithDatabase = table.split("\\.");
-      String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] : 
"default";
-      String currentTable = tableWithDatabase.length > 1 ? 
tableWithDatabase[1] : table;
-      String configProp = Constants.INGESTION_PREFIX + database + 
Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
-      String configFilePath = properties.getString(configProp, 
Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
-      checkIfTableConfigFileExists(configFolder, fs, configFilePath);
-      TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), 
new Path(configFilePath), new ArrayList<String>()).getProps();
-      properties.forEach((k, v) -> {
-        if (tableProperties.get(k) == null) {
-          tableProperties.setProperty(k.toString(), v.toString());
-        }
-      });
-      final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
-      //copy all the values from config to cfg
-      String targetBasePath = resetTarget(config, database, currentTable);
-      Helpers.deepCopyConfigs(config, cfg);
-      String overriddenTargetBasePath = 
tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
-      cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) 
? targetBasePath : overriddenTargetBasePath;
-      if (cfg.enableMetaSync && 
StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key(),
 ""))) {
-        throw new HoodieException("Meta sync table field not provided!");
+    // Parameter SOURCES_TO_BE_BOUND indicates that multiple sources update 
the same target.
+    String sourcesToBeBound = 
properties.getProperty(Constants.SOURCES_TO_BE_BOUND);
+    if (!StringUtils.isNullOrEmpty(sourcesToBeBound)) {
+      // populate the table execution context by traversing source tables
+      List<String> sourcesToBeBonded = getSourcesToBeBound(properties);

Review comment:
       Thank you for your advice. I have changed corresponding content in the 
code.

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws 
IOException {
   private static String resetTarget(Config configuration, String database, 
String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' 
? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + 
database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + 
database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + 
tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does 
incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same 
target.
     for (TableExecutionContext context : tableExecutionContexts) {
       try {
-        new HoodieDeltaStreamer(context.getConfig(), jssc, 
Option.ofNullable(context.getProperties())).sync();
+        HoodieDeltaStreamer hds = new HoodieDeltaStreamer(context.getConfig(), 
jssc, Option.ofNullable(context.getProperties()));
+
+        // Add object of HoodieDeltaStreamer temporarily to hdsObjectList when 
multiple sources update the same target.
+        if 
(!StringUtils.isNullOrEmpty(context.getProperties().getProperty(Constants.SOURCES_TO_BE_BOUND)))
 {
+          hdsObjectList.add(hds);
+          continue;
+        }
+
+        hds.sync();
         successTables.add(Helpers.getTableWithDatabase(context));
       } catch (Exception e) {
-        logger.error("error while running MultiTableDeltaStreamer for table: " 
+ context.getTableName(), e);
+        logger.error("Error while running MultiTableDeltaStreamer for table: " 
+ context.getTableName(), e);
         failedTables.add(Helpers.getTableWithDatabase(context));
       }
     }
 
-    logger.info("Ingestion was successful for topics: " + successTables);
-    if (!failedTables.isEmpty()) {
-      logger.info("Ingestion failed for topics: " + failedTables);
+    // If hdsObjectList is empty, it indicates that all source sync operations 
have been completed. In this case, directly return.
+    if (hdsObjectList.isEmpty()) {
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+      }
+      return;
     }
+
+    // The sync function is executing here when multiple sources update the 
same target.
+    boolean isContinuousMode = hdsObjectList.get(0).cfg.continuousMode;
+    do {
+      // Executing sync function by traversing hdsObjectList when multiple 
sources update the same target.
+      for (int i = 0; i < hdsObjectList.size(); i++) {
+        // Threads cannot be started when multiple sources update the same 
target.
+        if (isContinuousMode) {
+          hdsObjectList.get(i).cfg.continuousMode = false;
+        }
+
+        try {
+          hdsObjectList.get(i).sync();
+          
successTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+        } catch (Exception e) {
+          logger.error("Error while running MultiTableDeltaStreamer for table: 
"
+              + tableExecutionContexts.get(i).getTableName(), e);
+          
failedTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+          break;
+        }
+      }
+
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+        break;
+      }
+      successTables.clear();
+    } while (isContinuousMode);

Review comment:
       Thank you for your advice. I have changed corresponding content in the 
code.

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws 
IOException {
   private static String resetTarget(Config configuration, String database, 
String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' 
? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + 
database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + 
database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + 
tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does 
incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same 
target.
     for (TableExecutionContext context : tableExecutionContexts) {
       try {
-        new HoodieDeltaStreamer(context.getConfig(), jssc, 
Option.ofNullable(context.getProperties())).sync();
+        HoodieDeltaStreamer hds = new HoodieDeltaStreamer(context.getConfig(), 
jssc, Option.ofNullable(context.getProperties()));
+
+        // Add object of HoodieDeltaStreamer temporarily to hdsObjectList when 
multiple sources update the same target.
+        if 
(!StringUtils.isNullOrEmpty(context.getProperties().getProperty(Constants.SOURCES_TO_BE_BOUND)))
 {
+          hdsObjectList.add(hds);
+          continue;
+        }
+
+        hds.sync();
         successTables.add(Helpers.getTableWithDatabase(context));
       } catch (Exception e) {
-        logger.error("error while running MultiTableDeltaStreamer for table: " 
+ context.getTableName(), e);
+        logger.error("Error while running MultiTableDeltaStreamer for table: " 
+ context.getTableName(), e);
         failedTables.add(Helpers.getTableWithDatabase(context));
       }
     }
 
-    logger.info("Ingestion was successful for topics: " + successTables);
-    if (!failedTables.isEmpty()) {
-      logger.info("Ingestion failed for topics: " + failedTables);
+    // If hdsObjectList is empty, it indicates that all source sync operations 
have been completed. In this case, directly return.
+    if (hdsObjectList.isEmpty()) {
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+      }
+      return;
     }
+
+    // The sync function is executing here when multiple sources update the 
same target.
+    boolean isContinuousMode = hdsObjectList.get(0).cfg.continuousMode;
+    do {
+      // Executing sync function by traversing hdsObjectList when multiple 
sources update the same target.
+      for (int i = 0; i < hdsObjectList.size(); i++) {
+        // Threads cannot be started when multiple sources update the same 
target.
+        if (isContinuousMode) {
+          hdsObjectList.get(i).cfg.continuousMode = false;
+        }
+
+        try {
+          hdsObjectList.get(i).sync();
+          
successTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+        } catch (Exception e) {
+          logger.error("Error while running MultiTableDeltaStreamer for table: 
"
+              + tableExecutionContexts.get(i).getTableName(), e);
+          
failedTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+          break;
+        }
+      }
+
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+        break;
+      }
+      successTables.clear();
+    } while (isContinuousMode);
   }
 
   public static class Constants {
+    // When there are multiple sources, you can use this configuration item to 
set an independent checkpoint for the source.
+    public static final String SOURCE_CHECKPOINT = 
"hoodie.deltastreamer.current.source.checkpoint";
+
+    // If there are multiple sources, you can use this configuration item to 
set an alias for the source to distinguish the source.
+    // In addition, the alias is used as a suffix to distinguish the 
CHECKPOINT_KEY and CHECKPOINT_RESET_KEY of each source.
+    public static final String SOURCE_NAME = 
"hoodie.deltastreamer.current.source.name";
+
     public static final String KAFKA_TOPIC_PROP = 
"hoodie.deltastreamer.source.kafka.topic";
+
     private static final String SOURCE_SCHEMA_REGISTRY_URL_PROP = 
"hoodie.deltastreamer.schemaprovider.registry.url";
+
     private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = 
"hoodie.deltastreamer.schemaprovider.registry.targetUrl";
+
     public static final String HIVE_SYNC_TABLE_PROP = 
"hoodie.datasource.hive_sync.table";
+
     private static final String SCHEMA_REGISTRY_BASE_URL_PROP = 
"hoodie.deltastreamer.schemaprovider.registry.baseUrl";
+

Review comment:
       Thank you for your advice. I have changed corresponding content in the 
code.

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws 
IOException {
   private static String resetTarget(Config configuration, String database, 
String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' 
? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + 
database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + 
database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + 
tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does 
incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same 
target.
     for (TableExecutionContext context : tableExecutionContexts) {
       try {
-        new HoodieDeltaStreamer(context.getConfig(), jssc, 
Option.ofNullable(context.getProperties())).sync();
+        HoodieDeltaStreamer hds = new HoodieDeltaStreamer(context.getConfig(), 
jssc, Option.ofNullable(context.getProperties()));
+
+        // Add object of HoodieDeltaStreamer temporarily to hdsObjectList when 
multiple sources update the same target.
+        if 
(!StringUtils.isNullOrEmpty(context.getProperties().getProperty(Constants.SOURCES_TO_BE_BOUND)))
 {
+          hdsObjectList.add(hds);
+          continue;
+        }
+
+        hds.sync();
         successTables.add(Helpers.getTableWithDatabase(context));
       } catch (Exception e) {
-        logger.error("error while running MultiTableDeltaStreamer for table: " 
+ context.getTableName(), e);
+        logger.error("Error while running MultiTableDeltaStreamer for table: " 
+ context.getTableName(), e);
         failedTables.add(Helpers.getTableWithDatabase(context));
       }
     }
 
-    logger.info("Ingestion was successful for topics: " + successTables);
-    if (!failedTables.isEmpty()) {
-      logger.info("Ingestion failed for topics: " + failedTables);
+    // If hdsObjectList is empty, it indicates that all source sync operations 
have been completed. In this case, directly return.
+    if (hdsObjectList.isEmpty()) {
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+      }
+      return;
     }
+
+    // The sync function is executing here when multiple sources update the 
same target.
+    boolean isContinuousMode = hdsObjectList.get(0).cfg.continuousMode;
+    do {
+      // Executing sync function by traversing hdsObjectList when multiple 
sources update the same target.
+      for (int i = 0; i < hdsObjectList.size(); i++) {
+        // Threads cannot be started when multiple sources update the same 
target.
+        if (isContinuousMode) {
+          hdsObjectList.get(i).cfg.continuousMode = false;
+        }
+
+        try {
+          hdsObjectList.get(i).sync();
+          
successTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+        } catch (Exception e) {
+          logger.error("Error while running MultiTableDeltaStreamer for table: 
"
+              + tableExecutionContexts.get(i).getTableName(), e);
+          
failedTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+          break;
+        }
+      }
+
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+        break;
+      }
+      successTables.clear();
+    } while (isContinuousMode);
   }
 
   public static class Constants {
+    // When there are multiple sources, you can use this configuration item to 
set an independent checkpoint for the source.
+    public static final String SOURCE_CHECKPOINT = 
"hoodie.deltastreamer.current.source.checkpoint";
+
+    // If there are multiple sources, you can use this configuration item to 
set an alias for the source to distinguish the source.
+    // In addition, the alias is used as a suffix to distinguish the 
CHECKPOINT_KEY and CHECKPOINT_RESET_KEY of each source.
+    public static final String SOURCE_NAME = 
"hoodie.deltastreamer.current.source.name";
+
     public static final String KAFKA_TOPIC_PROP = 
"hoodie.deltastreamer.source.kafka.topic";
+
     private static final String SOURCE_SCHEMA_REGISTRY_URL_PROP = 
"hoodie.deltastreamer.schemaprovider.registry.url";
+
     private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = 
"hoodie.deltastreamer.schemaprovider.registry.targetUrl";
+
     public static final String HIVE_SYNC_TABLE_PROP = 
"hoodie.datasource.hive_sync.table";
+
     private static final String SCHEMA_REGISTRY_BASE_URL_PROP = 
"hoodie.deltastreamer.schemaprovider.registry.baseUrl";
+
     private static final String SCHEMA_REGISTRY_URL_SUFFIX_PROP = 
"hoodie.deltastreamer.schemaprovider.registry.urlSuffix";
+
     private static final String SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = 
"hoodie.deltastreamer.schemaprovider.registry.sourceUrlSuffix";
+
     private static final String SCHEMA_REGISTRY_TARGET_URL_SUFFIX = 
"hoodie.deltastreamer.schemaprovider.registry.targetUrlSuffix";
+
     private static final String TABLES_TO_BE_INGESTED_PROP = 
"hoodie.deltastreamer.ingestion.tablesToBeIngested";
+
+    // This configuration item specifies the database name and table name of 
the source. The format is "database.table".
+    // It is recommended that table name be the same as the alias of the 
source. If there are multiple sources, separate them with commas.
+    public static final String SOURCES_TO_BE_BOUND = 
"hoodie.deltastreamer.source.sourcesToBeBound";

Review comment:
       Thank you for your advice. I have changed corresponding content in the 
code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to