This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 26c967bac6 [HUDI-3504] Support bootstrap command based on Call Produce 
Command (#5977)
26c967bac6 is described below

commit 26c967bac60de394ca78e79d27647067680ed630
Author: ForwardXu <[email protected]>
AuthorDate: Mon Jun 27 13:06:50 2022 +0800

    [HUDI-3504] Support bootstrap command based on Call Produce Command (#5977)
---
 .../apache/hudi/cli/BootstrapExecutorUtils.java    | 254 +++++++++++++++++++++
 .../java/org/apache/hudi/cli/SchemaProvider.java   |  58 +++++
 .../hudi/command/procedures/HoodieProcedures.scala |   3 +
 .../command/procedures/RunBootstrapProcedure.scala | 144 ++++++++++++
 .../procedures/ShowBootstrapMappingProcedure.scala | 117 ++++++++++
 .../ShowBootstrapPartitionsProcedure.scala         |  75 ++++++
 .../hudi/procedure/TestBootstrapProcedure.scala    |  89 ++++++++
 7 files changed, 740 insertions(+)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
new file mode 100644
index 0000000000..0cae022967
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hive.HiveSyncConfig;
+import org.apache.hudi.hive.HiveSyncTool;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.sync.common.HoodieSyncConfig;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+
+import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
+
+/**
+ * Performs bootstrap from a non-hudi source.
+ */
+public class BootstrapExecutorUtils implements Serializable {
+
+  private static final Logger LOG = 
LogManager.getLogger(BootstrapExecutorUtils.class);
+
+  /**
+   * Config.
+   */
+  private final Config cfg;
+
+  /**
+   * Spark context.
+   */
+  private final transient JavaSparkContext jssc;
+
+  /**
+   * Bag of properties with source, hoodie client, key generator etc.
+   */
+  private final TypedProperties props;
+
+  /**
+   * Hadoop Configuration.
+   */
+  private final Configuration configuration;
+
+  /**
+   * Bootstrap Configuration.
+   */
+  private final HoodieWriteConfig bootstrapConfig;
+
+  /**
+   * FileSystem instance.
+   */
+  private final transient FileSystem fs;
+
+  private final String bootstrapBasePath;
+
+  public static final String CHECKPOINT_KEY = 
HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY;
+
+  /**
+   * Bootstrap Executor.
+   *
+   * @param cfg        DeltaStreamer Config
+   * @param jssc       Java Spark Context
+   * @param fs         File System
+   * @param properties Bootstrap Writer Properties
+   * @throws IOException
+   */
+  public BootstrapExecutorUtils(Config cfg, JavaSparkContext jssc, FileSystem 
fs, Configuration conf,
+                                TypedProperties properties) throws IOException 
{
+    this.cfg = cfg;
+    this.jssc = jssc;
+    this.fs = fs;
+    this.configuration = conf;
+    this.props = properties;
+
+    
ValidationUtils.checkArgument(properties.containsKey(HoodieTableConfig.BOOTSTRAP_BASE_PATH
+            .key()),
+        HoodieTableConfig.BOOTSTRAP_BASE_PATH.key() + " must be specified.");
+    this.bootstrapBasePath = 
properties.getString(HoodieTableConfig.BOOTSTRAP_BASE_PATH.key());
+
+    // Add more defaults if full bootstrap requested
+    this.props.putIfAbsent(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key(),
+        DataSourceWriteOptions.PAYLOAD_CLASS_NAME().defaultValue());
+    /**
+     * Schema provider that supplies the command for reading the input and 
writing out the target table.
+     */
+    SchemaProvider schemaProvider = 
createSchemaProvider(cfg.schemaProviderClass, props, jssc);
+    HoodieWriteConfig.Builder builder =
+        HoodieWriteConfig.newBuilder().withPath(cfg.basePath)
+            
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build())
+            .forTable(cfg.tableName)
+            
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
+            .withAutoCommit(true)
+            .withProps(props);
+
+    if (null != schemaProvider && null != schemaProvider.getTargetSchema()) {
+      builder = 
builder.withSchema(schemaProvider.getTargetSchema().toString());
+    }
+    this.bootstrapConfig = builder.build();
+    LOG.info("Created bootstrap executor with configs : " + 
bootstrapConfig.getProps());
+  }
+
+  public static SchemaProvider createSchemaProvider(String 
schemaProviderClass, TypedProperties cfg,
+                                                    JavaSparkContext jssc) 
throws IOException {
+    try {
+      return StringUtils.isNullOrEmpty(schemaProviderClass) ? null
+          : (SchemaProvider) ReflectionUtils.loadClass(schemaProviderClass, 
cfg, jssc);
+    } catch (Throwable e) {
+      throw new IOException("Could not load schema provider class " + 
schemaProviderClass, e);
+    }
+  }
+
+  /**
+   * Executes Bootstrap.
+   */
+  public void execute() throws IOException {
+    initializeTable();
+
+    try (SparkRDDWriteClient bootstrapClient = new SparkRDDWriteClient(new 
HoodieSparkEngineContext(jssc), bootstrapConfig)) {
+      HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
+      checkpointCommitMetadata.put(CHECKPOINT_KEY, Config.checkpoint);
+      bootstrapClient.bootstrap(Option.of(checkpointCommitMetadata));
+      syncHive();
+    }
+  }
+
+  /**
+   * Sync to Hive.
+   */
+  private void syncHive() {
+    if (cfg.enableHiveSync) {
+      TypedProperties metaProps = new TypedProperties();
+      metaProps.putAll(props);
+      metaProps.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), cfg.basePath);
+      metaProps.put(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.key(), 
cfg.baseFileFormat);
+      if (props.getBoolean(HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.key(), 
HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.defaultValue())) {
+        metaProps.put(HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC_SPEC.key(), 
HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()),
+            
props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())));
+      }
+
+      new HiveSyncTool(metaProps, configuration, fs).syncHoodieTable();
+    }
+  }
+
+  private void initializeTable() throws IOException {
+    Path basePath = new Path(cfg.basePath);
+    if (fs.exists(basePath)) {
+      if (cfg.bootstrapOverwrite) {
+        LOG.warn("Target base path already exists, overwrite it");
+        fs.delete(basePath, true);
+      } else {
+        throw new HoodieException("target base path already exists at " + 
cfg.basePath
+            + ". Cannot bootstrap data on top of an existing table");
+      }
+    }
+    HoodieTableMetaClient.withPropertyBuilder()
+        .fromProperties(props)
+        .setTableType(cfg.tableType)
+        .setTableName(cfg.tableName)
+        .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
+        .setPayloadClassName(cfg.payloadClass)
+        .setBaseFileFormat(cfg.baseFileFormat)
+        .setBootstrapIndexClass(cfg.bootstrapIndexClass)
+        .setBootstrapBasePath(bootstrapBasePath)
+        .initTable(new Configuration(jssc.hadoopConfiguration()), 
cfg.basePath);
+  }
+
+  public static class Config {
+    private String tableName;
+    private String tableType;
+
+    private String basePath;
+
+    private String baseFileFormat;
+    private String bootstrapIndexClass;
+    private String schemaProviderClass;
+    private String payloadClass;
+    private Boolean enableHiveSync;
+
+    private Boolean bootstrapOverwrite;
+
+    public static String checkpoint = null;
+
+    public void setTableName(String tableName) {
+      this.tableName = tableName;
+    }
+
+    public void setTableType(String tableType) {
+      this.tableType = tableType;
+    }
+
+    public void setBasePath(String basePath) {
+      this.basePath = basePath;
+    }
+
+    public void setBaseFileFormat(String baseFileFormat) {
+      this.baseFileFormat = baseFileFormat;
+    }
+
+    public void setBootstrapIndexClass(String bootstrapIndexClass) {
+      this.bootstrapIndexClass = bootstrapIndexClass;
+    }
+
+    public void setSchemaProviderClass(String schemaProviderClass) {
+      this.schemaProviderClass = schemaProviderClass;
+    }
+
+    public void setPayloadClass(String payloadClass) {
+      this.payloadClass = payloadClass;
+    }
+
+    public void setEnableHiveSync(Boolean enableHiveSync) {
+      this.enableHiveSync = enableHiveSync;
+    }
+
+    public void setBootstrapOverwrite(Boolean bootstrapOverwrite) {
+      this.bootstrapOverwrite = bootstrapOverwrite;
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/SchemaProvider.java
 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/SchemaProvider.java
new file mode 100644
index 0000000000..de6770bf30
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/SchemaProvider.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+
+/**
+ * Class to provide schema for reading data and also writing into a Hoodie 
table,
+ * used by deltastreamer (runs over Spark).
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.STABLE)
+public abstract class SchemaProvider implements Serializable {
+
+  protected TypedProperties config;
+
+  protected JavaSparkContext jssc;
+
+  public SchemaProvider(TypedProperties props) {
+    this(props, null);
+  }
+
+  protected SchemaProvider(TypedProperties props, JavaSparkContext jssc) {
+    this.config = props;
+    this.jssc = jssc;
+  }
+
+  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+  public abstract Schema getSourceSchema();
+
+  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+  public Schema getTargetSchema() {
+    // by default, use source schema as target for hoodie table as well
+    return getSourceSchema();
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index 33ca211b03..e3f05389a9 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -55,6 +55,9 @@ object HoodieProcedures {
     mapBuilder.put(StatsWriteAmplificationProcedure.NAME, 
StatsWriteAmplificationProcedure.builder)
     mapBuilder.put(StatsFileSizeProcedure.NAME, StatsFileSizeProcedure.builder)
     mapBuilder.put(HdfsParquetImportProcedure.NAME, 
HdfsParquetImportProcedure.builder)
+    mapBuilder.put(RunBootstrapProcedure.NAME, RunBootstrapProcedure.builder)
+    mapBuilder.put(ShowBootstrapMappingProcedure.NAME, 
ShowBootstrapMappingProcedure.builder)
+    mapBuilder.put(ShowBootstrapPartitionsProcedure.NAME, 
ShowBootstrapPartitionsProcedure.builder)
     mapBuilder.build
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
new file mode 100644
index 0000000000..8e6fd36a8f
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.cli.BootstrapExecutorUtils
+import org.apache.hudi.cli.HDFSParquetImporterUtils.{buildProperties, 
readConfig}
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.config.HoodieBootstrapConfig
+import org.apache.hudi.keygen.constant.KeyGeneratorType
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util
+import java.util.Locale
+import java.util.function.Supplier
+
+class RunBootstrapProcedure extends BaseProcedure with ProcedureBuilder with 
Logging {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(1, "tableType", DataTypes.StringType, None),
+    ProcedureParameter.required(2, "bootstrapPath", DataTypes.StringType, 
None),
+    ProcedureParameter.required(3, "basePath", DataTypes.StringType, None),
+    ProcedureParameter.required(4, "rowKeyField", DataTypes.StringType, None),
+    ProcedureParameter.optional(5, "baseFileFormat", DataTypes.StringType, 
"PARQUET"),
+    ProcedureParameter.optional(6, "partitionPathField", DataTypes.StringType, 
""),
+    ProcedureParameter.optional(7, "bootstrapIndexClass", 
DataTypes.StringType, 
"org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex"),
+    ProcedureParameter.optional(8, "selectorClass", DataTypes.StringType, 
"org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector"),
+    ProcedureParameter.optional(9, "keyGeneratorClass", DataTypes.StringType, 
"org.apache.hudi.keygen.SimpleKeyGenerator"),
+    ProcedureParameter.optional(10, "fullBootstrapInputProvider", 
DataTypes.StringType, 
"org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider"),
+    ProcedureParameter.optional(11, "schemaProviderClass", 
DataTypes.StringType, ""),
+    ProcedureParameter.optional(12, "payloadClass", DataTypes.StringType, 
"org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"),
+    ProcedureParameter.optional(13, "parallelism", DataTypes.IntegerType, 
1500),
+    ProcedureParameter.optional(14, "enableHiveSync", DataTypes.BooleanType, 
false),
+    ProcedureParameter.optional(15, "propsFilePath", DataTypes.StringType, ""),
+    ProcedureParameter.optional(16, "bootstrapOverwrite", 
DataTypes.BooleanType, false)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("status", DataTypes.IntegerType, nullable = true, 
Metadata.empty))
+  )
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tableType = getArgValueOrDefault(args, 
PARAMETERS(1)).get.asInstanceOf[String]
+    val bootstrapPath = getArgValueOrDefault(args, 
PARAMETERS(2)).get.asInstanceOf[String]
+    val basePath = getArgValueOrDefault(args, 
PARAMETERS(3)).get.asInstanceOf[String]
+    val rowKeyField = getArgValueOrDefault(args, 
PARAMETERS(4)).get.asInstanceOf[String]
+    val baseFileFormat = getArgValueOrDefault(args, 
PARAMETERS(5)).get.asInstanceOf[String]
+    val partitionPathField = getArgValueOrDefault(args, 
PARAMETERS(6)).get.asInstanceOf[String]
+    val bootstrapIndexClass = getArgValueOrDefault(args, 
PARAMETERS(7)).get.asInstanceOf[String]
+    val selectorClass = getArgValueOrDefault(args, 
PARAMETERS(8)).get.asInstanceOf[String]
+    val keyGeneratorClass = getArgValueOrDefault(args, 
PARAMETERS(9)).get.asInstanceOf[String]
+    val fullBootstrapInputProvider = getArgValueOrDefault(args, 
PARAMETERS(10)).get.asInstanceOf[String]
+    val schemaProviderClass = getArgValueOrDefault(args, 
PARAMETERS(11)).get.asInstanceOf[String]
+    val payloadClass = getArgValueOrDefault(args, 
PARAMETERS(12)).get.asInstanceOf[String]
+    val parallelism = getArgValueOrDefault(args, 
PARAMETERS(13)).get.asInstanceOf[Int]
+    val enableHiveSync = getArgValueOrDefault(args, 
PARAMETERS(14)).get.asInstanceOf[Boolean]
+    val propsFilePath = getArgValueOrDefault(args, 
PARAMETERS(15)).get.asInstanceOf[String]
+    val bootstrapOverwrite = getArgValueOrDefault(args, 
PARAMETERS(16)).get.asInstanceOf[Boolean]
+
+    val configs: util.List[String] = new util.ArrayList[String]
+
+    val properties: TypedProperties = if (propsFilePath == null || 
propsFilePath.isEmpty) buildProperties(configs)
+    else readConfig(jsc.hadoopConfiguration, new Path(propsFilePath), 
configs).getProps(true)
+
+    properties.setProperty(HoodieBootstrapConfig.BASE_PATH.key, bootstrapPath)
+
+    if (!StringUtils.isNullOrEmpty(keyGeneratorClass) && 
KeyGeneratorType.getNames.contains(keyGeneratorClass.toUpperCase(Locale.ROOT))) 
{
+      properties.setProperty(HoodieBootstrapConfig.KEYGEN_TYPE.key, 
keyGeneratorClass.toUpperCase(Locale.ROOT))
+    }
+    else {
+      properties.setProperty(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, 
keyGeneratorClass)
+    }
+
+    
properties.setProperty(HoodieBootstrapConfig.FULL_BOOTSTRAP_INPUT_PROVIDER_CLASS_NAME.key,
 fullBootstrapInputProvider)
+    properties.setProperty(HoodieBootstrapConfig.PARALLELISM_VALUE.key, 
parallelism.toString)
+    properties.setProperty(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key, 
selectorClass)
+    properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD.key, 
rowKeyField)
+    properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, 
partitionPathField)
+
+    val fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration)
+
+    val cfg = new BootstrapExecutorUtils.Config()
+    cfg.setTableName(tableName.get.asInstanceOf[String])
+    cfg.setTableType(tableType)
+    cfg.setBasePath(basePath)
+    cfg.setBaseFileFormat(baseFileFormat)
+    cfg.setBootstrapIndexClass(bootstrapIndexClass)
+    cfg.setSchemaProviderClass(schemaProviderClass)
+    cfg.setPayloadClass(payloadClass)
+    cfg.setEnableHiveSync(enableHiveSync)
+    cfg.setBootstrapOverwrite(bootstrapOverwrite)
+
+    try {
+      new BootstrapExecutorUtils(cfg, jsc, fs, jsc.hadoopConfiguration, 
properties).execute()
+    } catch {
+      case e: Exception =>
+        logWarning(s"Run bootstrap failed due to", e)
+        Seq(Row(-1))
+    }
+    Seq(Row(0))
+  }
+
+  override def build = new RunBootstrapProcedure()
+}
+
+object RunBootstrapProcedure {
+  val NAME = "run_bootstrap"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new RunBootstrapProcedure
+  }
+}
+
+
+
+
+
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapMappingProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapMappingProcedure.scala
new file mode 100644
index 0000000000..dab3891686
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapMappingProcedure.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import com.google.common.collect.Lists
+import org.apache.hudi.common.bootstrap.index.BootstrapIndex
+import org.apache.hudi.common.model.{BootstrapFileMapping, HoodieFileGroupId}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util
+import java.util.function.Supplier
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+class ShowBootstrapMappingProcedure extends BaseProcedure with 
ProcedureBuilder {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.optional(1, "partitionPath", DataTypes.StringType, ""),
+    ProcedureParameter.optional(2, "fileIds", DataTypes.StringType, ""),
+    ProcedureParameter.optional(3, "limit", DataTypes.IntegerType, 10),
+    ProcedureParameter.optional(4, "sortBy", DataTypes.StringType, 
"partition"),
+    ProcedureParameter.optional(5, "desc", DataTypes.BooleanType, false)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("partition", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("fileid", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("source_basepath", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("source_partition", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("source_file", DataTypes.StringType, nullable = true, 
Metadata.empty))
+  )
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val partitionPath = getArgValueOrDefault(args, 
PARAMETERS(1)).get.asInstanceOf[String]
+    val fileIds = getArgValueOrDefault(args, 
PARAMETERS(2)).get.asInstanceOf[String]
+    val limit = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[Int]
+    val sortBy = getArgValueOrDefault(args, 
PARAMETERS(4)).get.asInstanceOf[String]
+    val desc = getArgValueOrDefault(args, 
PARAMETERS(5)).get.asInstanceOf[Boolean]
+
+    val basePath: String = getBasePath(tableName)
+    val metaClient = 
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+
+    if (partitionPath.isEmpty && fileIds.nonEmpty) throw new 
IllegalStateException("PartitionPath is mandatory when passing fileIds.")
+
+    val indexReader = createBootstrapIndexReader(metaClient)
+    val indexedPartitions = indexReader.getIndexedPartitionPaths
+
+    if (partitionPath.nonEmpty && !indexedPartitions.contains(partitionPath)) 
new HoodieException(partitionPath + " is not an valid indexed partition")
+
+    val mappingList: util.ArrayList[BootstrapFileMapping] = new 
util.ArrayList[BootstrapFileMapping]
+    if (fileIds.nonEmpty) {
+      val fileGroupIds = fileIds.split(",").toList.map((fileId: String) => new 
HoodieFileGroupId(partitionPath, fileId)).asJava
+      
mappingList.addAll(indexReader.getSourceFileMappingForFileIds(fileGroupIds).values)
+    } else if (partitionPath.nonEmpty) 
mappingList.addAll(indexReader.getSourceFileMappingForPartition(partitionPath))
+    else {
+      for (part <- indexedPartitions) {
+        mappingList.addAll(indexReader.getSourceFileMappingForPartition(part))
+      }
+    }
+
+    val rows: java.util.List[Row] = mappingList
+      .map(mapping => Row(mapping.getPartitionPath, mapping.getFileId, 
mapping.getBootstrapBasePath,
+        mapping.getBootstrapPartitionPath, 
mapping.getBootstrapFileStatus.getPath.getUri)).toList
+
+    val df = spark.createDataFrame(rows, OUTPUT_TYPE)
+
+    if (desc) {
+      df.orderBy(df(sortBy).desc).limit(limit).collect()
+    } else {
+      df.orderBy(df(sortBy).asc).limit(limit).collect()
+    }
+  }
+
+  private def createBootstrapIndexReader(metaClient: HoodieTableMetaClient) = {
+    val index = BootstrapIndex.getBootstrapIndex(metaClient)
+    if (!index.useIndex) throw new HoodieException("This is not a bootstrapped 
Hudi table. Don't have any index info")
+    index.createReader
+  }
+
+  override def build: Procedure = new ShowBootstrapMappingProcedure()
+}
+
+object ShowBootstrapMappingProcedure {
+  val NAME = "show_bootstrap_mapping"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new ShowBootstrapMappingProcedure
+  }
+}
+
+
+
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapPartitionsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapPartitionsProcedure.scala
new file mode 100644
index 0000000000..b3bebd7f22
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapPartitionsProcedure.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.common.bootstrap.index.BootstrapIndex
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+
+class ShowBootstrapPartitionsProcedure extends BaseProcedure with 
ProcedureBuilder {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("indexed_partitions", DataTypes.StringType, nullable = true, 
Metadata.empty))
+  )
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+
+    val basePath: String = getBasePath(tableName)
+    val metaClient = 
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+
+    val indexReader = createBootstrapIndexReader(metaClient)
+    val indexedPartitions = indexReader.getIndexedPartitionPaths
+
+    indexedPartitions.stream().toArray.map(r => Row(r)).toList
+  }
+
+  private def createBootstrapIndexReader(metaClient: HoodieTableMetaClient) = {
+    val index = BootstrapIndex.getBootstrapIndex(metaClient)
+    if (!index.useIndex) throw new HoodieException("This is not a bootstrapped 
Hudi table. Don't have any index info")
+    index.createReader
+  }
+
+  override def build = new ShowBootstrapPartitionsProcedure()
+}
+
+object ShowBootstrapPartitionsProcedure {
+  val NAME = "show_bootstrap_partitions"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new ShowBootstrapPartitionsProcedure
+  }
+}
+
+
+
+
+
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
new file mode 100644
index 0000000000..931d313013
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.functional.TestBootstrap
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+import org.apache.spark.sql.{Dataset, Row}
+
+import java.time.Instant
+import java.util
+
+class TestBootstrapProcedure extends HoodieSparkSqlTestBase {
+
+  test("Test Call run_bootstrap Procedure") {
+    withTempDir { tmp =>
+      val NUM_OF_RECORDS = 100
+      val PARTITION_FIELD = "datestr"
+      val RECORD_KEY_FIELD = "_row_key"
+
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}"
+
+      val srcName: String = "source"
+      val sourcePath = basePath + Path.SEPARATOR + srcName
+      val tablePath = basePath + Path.SEPARATOR + tableName
+      val jsc = new JavaSparkContext(spark.sparkContext)
+
+      // generate test data
+      val partitions = util.Arrays.asList("2018", "2019", "2020")
+      val timestamp: Long = Instant.now.toEpochMilli
+      for (i <- 0 until partitions.size) {
+        val df: Dataset[Row] = 
TestBootstrap.generateTestRawTripDataset(timestamp, i * NUM_OF_RECORDS, i * 
NUM_OF_RECORDS + NUM_OF_RECORDS, null, jsc, spark.sqlContext)
+        df.write.parquet(sourcePath + Path.SEPARATOR + PARTITION_FIELD + "=" + 
partitions.get(i))
+      }
+
+      // run bootstrap
+      checkAnswer(
+        s"""call run_bootstrap(
+           |table => '$tableName',
+           |basePath => '$tablePath',
+           |tableType => '${HoodieTableType.COPY_ON_WRITE.name}',
+           |bootstrapPath => '$sourcePath',
+           |rowKeyField => '$RECORD_KEY_FIELD',
+           |partitionPathField => '$PARTITION_FIELD',
+           |bootstrapOverwrite => true)""".stripMargin) {
+        Seq(0)
+      }
+
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName using hudi
+           |location '$tablePath'
+           |tblproperties(primaryKey = '$RECORD_KEY_FIELD')
+           |""".stripMargin)
+
+      // show bootstrap's index partitions
+      var result = spark.sql(s"""call show_bootstrap_partitions(table => 
'$tableName')""".stripMargin).collect()
+      assertResult(3) {
+        result.length
+      }
+
+      // show bootstrap's index mapping
+      result = spark.sql(
+        s"""call show_bootstrap_mapping(table => 
'$tableName')""".stripMargin).collect()
+      assertResult(3) {
+        result.length
+      }
+    }
+  }
+}

Reply via email to