This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 26c967bac6 [HUDI-3504] Support bootstrap command based on Call Produce
Command (#5977)
26c967bac6 is described below
commit 26c967bac60de394ca78e79d27647067680ed630
Author: ForwardXu <[email protected]>
AuthorDate: Mon Jun 27 13:06:50 2022 +0800
[HUDI-3504] Support bootstrap command based on Call Produce Command (#5977)
---
.../apache/hudi/cli/BootstrapExecutorUtils.java | 254 +++++++++++++++++++++
.../java/org/apache/hudi/cli/SchemaProvider.java | 58 +++++
.../hudi/command/procedures/HoodieProcedures.scala | 3 +
.../command/procedures/RunBootstrapProcedure.scala | 144 ++++++++++++
.../procedures/ShowBootstrapMappingProcedure.scala | 117 ++++++++++
.../ShowBootstrapPartitionsProcedure.scala | 75 ++++++
.../hudi/procedure/TestBootstrapProcedure.scala | 89 ++++++++
7 files changed, 740 insertions(+)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
new file mode 100644
index 0000000000..0cae022967
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hive.HiveSyncConfig;
+import org.apache.hudi.hive.HiveSyncTool;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.sync.common.HoodieSyncConfig;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+
+import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
+
+/**
+ * Performs bootstrap from a non-hudi source.
+ */
+public class BootstrapExecutorUtils implements Serializable {
+
+ private static final Logger LOG =
LogManager.getLogger(BootstrapExecutorUtils.class);
+
+ /**
+ * Config.
+ */
+ private final Config cfg;
+
+ /**
+ * Spark context.
+ */
+ private final transient JavaSparkContext jssc;
+
+ /**
+ * Bag of properties with source, hoodie client, key generator etc.
+ */
+ private final TypedProperties props;
+
+ /**
+ * Hadoop Configuration.
+ */
+ private final Configuration configuration;
+
+ /**
+ * Bootstrap Configuration.
+ */
+ private final HoodieWriteConfig bootstrapConfig;
+
+ /**
+ * FileSystem instance.
+ */
+ private final transient FileSystem fs;
+
+ private final String bootstrapBasePath;
+
+ public static final String CHECKPOINT_KEY =
HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY;
+
+ /**
+ * Bootstrap Executor.
+ *
+ * @param cfg DeltaStreamer Config
+ * @param jssc Java Spark Context
+ * @param fs File System
+ * @param properties Bootstrap Writer Properties
+ * @throws IOException
+ */
+ public BootstrapExecutorUtils(Config cfg, JavaSparkContext jssc, FileSystem
fs, Configuration conf,
+ TypedProperties properties) throws IOException
{
+ this.cfg = cfg;
+ this.jssc = jssc;
+ this.fs = fs;
+ this.configuration = conf;
+ this.props = properties;
+
+
ValidationUtils.checkArgument(properties.containsKey(HoodieTableConfig.BOOTSTRAP_BASE_PATH
+ .key()),
+ HoodieTableConfig.BOOTSTRAP_BASE_PATH.key() + " must be specified.");
+ this.bootstrapBasePath =
properties.getString(HoodieTableConfig.BOOTSTRAP_BASE_PATH.key());
+
+ // Add more defaults if full bootstrap requested
+ this.props.putIfAbsent(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key(),
+ DataSourceWriteOptions.PAYLOAD_CLASS_NAME().defaultValue());
+ /**
+ * Schema provider that supplies the command for reading the input and
writing out the target table.
+ */
+ SchemaProvider schemaProvider =
createSchemaProvider(cfg.schemaProviderClass, props, jssc);
+ HoodieWriteConfig.Builder builder =
+ HoodieWriteConfig.newBuilder().withPath(cfg.basePath)
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build())
+ .forTable(cfg.tableName)
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
+ .withAutoCommit(true)
+ .withProps(props);
+
+ if (null != schemaProvider && null != schemaProvider.getTargetSchema()) {
+ builder =
builder.withSchema(schemaProvider.getTargetSchema().toString());
+ }
+ this.bootstrapConfig = builder.build();
+ LOG.info("Created bootstrap executor with configs : " +
bootstrapConfig.getProps());
+ }
+
+ public static SchemaProvider createSchemaProvider(String
schemaProviderClass, TypedProperties cfg,
+ JavaSparkContext jssc)
throws IOException {
+ try {
+ return StringUtils.isNullOrEmpty(schemaProviderClass) ? null
+ : (SchemaProvider) ReflectionUtils.loadClass(schemaProviderClass,
cfg, jssc);
+ } catch (Throwable e) {
+ throw new IOException("Could not load schema provider class " +
schemaProviderClass, e);
+ }
+ }
+
+ /**
+ * Executes Bootstrap.
+ */
+ public void execute() throws IOException {
+ initializeTable();
+
+ try (SparkRDDWriteClient bootstrapClient = new SparkRDDWriteClient(new
HoodieSparkEngineContext(jssc), bootstrapConfig)) {
+ HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
+ checkpointCommitMetadata.put(CHECKPOINT_KEY, Config.checkpoint);
+ bootstrapClient.bootstrap(Option.of(checkpointCommitMetadata));
+ syncHive();
+ }
+ }
+
+ /**
+ * Sync to Hive.
+ */
+ private void syncHive() {
+ if (cfg.enableHiveSync) {
+ TypedProperties metaProps = new TypedProperties();
+ metaProps.putAll(props);
+ metaProps.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), cfg.basePath);
+ metaProps.put(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.key(),
cfg.baseFileFormat);
+ if (props.getBoolean(HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.key(),
HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.defaultValue())) {
+ metaProps.put(HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC_SPEC.key(),
HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()),
+
props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())));
+ }
+
+ new HiveSyncTool(metaProps, configuration, fs).syncHoodieTable();
+ }
+ }
+
+ private void initializeTable() throws IOException {
+ Path basePath = new Path(cfg.basePath);
+ if (fs.exists(basePath)) {
+ if (cfg.bootstrapOverwrite) {
+ LOG.warn("Target base path already exists, overwrite it");
+ fs.delete(basePath, true);
+ } else {
+ throw new HoodieException("target base path already exists at " +
cfg.basePath
+ + ". Cannot bootstrap data on top of an existing table");
+ }
+ }
+ HoodieTableMetaClient.withPropertyBuilder()
+ .fromProperties(props)
+ .setTableType(cfg.tableType)
+ .setTableName(cfg.tableName)
+ .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
+ .setPayloadClassName(cfg.payloadClass)
+ .setBaseFileFormat(cfg.baseFileFormat)
+ .setBootstrapIndexClass(cfg.bootstrapIndexClass)
+ .setBootstrapBasePath(bootstrapBasePath)
+ .initTable(new Configuration(jssc.hadoopConfiguration()),
cfg.basePath);
+ }
+
+ public static class Config {
+ private String tableName;
+ private String tableType;
+
+ private String basePath;
+
+ private String baseFileFormat;
+ private String bootstrapIndexClass;
+ private String schemaProviderClass;
+ private String payloadClass;
+ private Boolean enableHiveSync;
+
+ private Boolean bootstrapOverwrite;
+
+ public static String checkpoint = null;
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public void setTableType(String tableType) {
+ this.tableType = tableType;
+ }
+
+ public void setBasePath(String basePath) {
+ this.basePath = basePath;
+ }
+
+ public void setBaseFileFormat(String baseFileFormat) {
+ this.baseFileFormat = baseFileFormat;
+ }
+
+ public void setBootstrapIndexClass(String bootstrapIndexClass) {
+ this.bootstrapIndexClass = bootstrapIndexClass;
+ }
+
+ public void setSchemaProviderClass(String schemaProviderClass) {
+ this.schemaProviderClass = schemaProviderClass;
+ }
+
+ public void setPayloadClass(String payloadClass) {
+ this.payloadClass = payloadClass;
+ }
+
+ public void setEnableHiveSync(Boolean enableHiveSync) {
+ this.enableHiveSync = enableHiveSync;
+ }
+
+ public void setBootstrapOverwrite(Boolean bootstrapOverwrite) {
+ this.bootstrapOverwrite = bootstrapOverwrite;
+ }
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/SchemaProvider.java
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/SchemaProvider.java
new file mode 100644
index 0000000000..de6770bf30
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/SchemaProvider.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+
+/**
+ * Class to provide schema for reading data and also writing into a Hoodie
table,
+ * used by deltastreamer (runs over Spark).
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.STABLE)
+public abstract class SchemaProvider implements Serializable {
+
+ protected TypedProperties config;
+
+ protected JavaSparkContext jssc;
+
+ public SchemaProvider(TypedProperties props) {
+ this(props, null);
+ }
+
+ protected SchemaProvider(TypedProperties props, JavaSparkContext jssc) {
+ this.config = props;
+ this.jssc = jssc;
+ }
+
+ @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+ public abstract Schema getSourceSchema();
+
+ @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+ public Schema getTargetSchema() {
+ // by default, use source schema as target for hoodie table as well
+ return getSourceSchema();
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index 33ca211b03..e3f05389a9 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -55,6 +55,9 @@ object HoodieProcedures {
mapBuilder.put(StatsWriteAmplificationProcedure.NAME,
StatsWriteAmplificationProcedure.builder)
mapBuilder.put(StatsFileSizeProcedure.NAME, StatsFileSizeProcedure.builder)
mapBuilder.put(HdfsParquetImportProcedure.NAME,
HdfsParquetImportProcedure.builder)
+ mapBuilder.put(RunBootstrapProcedure.NAME, RunBootstrapProcedure.builder)
+ mapBuilder.put(ShowBootstrapMappingProcedure.NAME,
ShowBootstrapMappingProcedure.builder)
+ mapBuilder.put(ShowBootstrapPartitionsProcedure.NAME,
ShowBootstrapPartitionsProcedure.builder)
mapBuilder.build
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
new file mode 100644
index 0000000000..8e6fd36a8f
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.cli.BootstrapExecutorUtils
+import org.apache.hudi.cli.HDFSParquetImporterUtils.{buildProperties,
readConfig}
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.config.HoodieBootstrapConfig
+import org.apache.hudi.keygen.constant.KeyGeneratorType
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
+
+import java.util
+import java.util.Locale
+import java.util.function.Supplier
+
+class RunBootstrapProcedure extends BaseProcedure with ProcedureBuilder with
Logging {
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(1, "tableType", DataTypes.StringType, None),
+ ProcedureParameter.required(2, "bootstrapPath", DataTypes.StringType,
None),
+ ProcedureParameter.required(3, "basePath", DataTypes.StringType, None),
+ ProcedureParameter.required(4, "rowKeyField", DataTypes.StringType, None),
+ ProcedureParameter.optional(5, "baseFileFormat", DataTypes.StringType,
"PARQUET"),
+ ProcedureParameter.optional(6, "partitionPathField", DataTypes.StringType,
""),
+ ProcedureParameter.optional(7, "bootstrapIndexClass",
DataTypes.StringType,
"org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex"),
+ ProcedureParameter.optional(8, "selectorClass", DataTypes.StringType,
"org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector"),
+ ProcedureParameter.optional(9, "keyGeneratorClass", DataTypes.StringType,
"org.apache.hudi.keygen.SimpleKeyGenerator"),
+ ProcedureParameter.optional(10, "fullBootstrapInputProvider",
DataTypes.StringType,
"org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider"),
+ ProcedureParameter.optional(11, "schemaProviderClass",
DataTypes.StringType, ""),
+ ProcedureParameter.optional(12, "payloadClass", DataTypes.StringType,
"org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"),
+ ProcedureParameter.optional(13, "parallelism", DataTypes.IntegerType,
1500),
+ ProcedureParameter.optional(14, "enableHiveSync", DataTypes.BooleanType,
false),
+ ProcedureParameter.optional(15, "propsFilePath", DataTypes.StringType, ""),
+ ProcedureParameter.optional(16, "bootstrapOverwrite",
DataTypes.BooleanType, false)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("status", DataTypes.IntegerType, nullable = true,
Metadata.empty))
+ )
+
+ def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ def outputType: StructType = OUTPUT_TYPE
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val tableType = getArgValueOrDefault(args,
PARAMETERS(1)).get.asInstanceOf[String]
+ val bootstrapPath = getArgValueOrDefault(args,
PARAMETERS(2)).get.asInstanceOf[String]
+ val basePath = getArgValueOrDefault(args,
PARAMETERS(3)).get.asInstanceOf[String]
+ val rowKeyField = getArgValueOrDefault(args,
PARAMETERS(4)).get.asInstanceOf[String]
+ val baseFileFormat = getArgValueOrDefault(args,
PARAMETERS(5)).get.asInstanceOf[String]
+ val partitionPathField = getArgValueOrDefault(args,
PARAMETERS(6)).get.asInstanceOf[String]
+ val bootstrapIndexClass = getArgValueOrDefault(args,
PARAMETERS(7)).get.asInstanceOf[String]
+ val selectorClass = getArgValueOrDefault(args,
PARAMETERS(8)).get.asInstanceOf[String]
+ val keyGeneratorClass = getArgValueOrDefault(args,
PARAMETERS(9)).get.asInstanceOf[String]
+ val fullBootstrapInputProvider = getArgValueOrDefault(args,
PARAMETERS(10)).get.asInstanceOf[String]
+ val schemaProviderClass = getArgValueOrDefault(args,
PARAMETERS(11)).get.asInstanceOf[String]
+ val payloadClass = getArgValueOrDefault(args,
PARAMETERS(12)).get.asInstanceOf[String]
+ val parallelism = getArgValueOrDefault(args,
PARAMETERS(13)).get.asInstanceOf[Int]
+ val enableHiveSync = getArgValueOrDefault(args,
PARAMETERS(14)).get.asInstanceOf[Boolean]
+ val propsFilePath = getArgValueOrDefault(args,
PARAMETERS(15)).get.asInstanceOf[String]
+ val bootstrapOverwrite = getArgValueOrDefault(args,
PARAMETERS(16)).get.asInstanceOf[Boolean]
+
+ val configs: util.List[String] = new util.ArrayList[String]
+
+ val properties: TypedProperties = if (propsFilePath == null ||
propsFilePath.isEmpty) buildProperties(configs)
+ else readConfig(jsc.hadoopConfiguration, new Path(propsFilePath),
configs).getProps(true)
+
+ properties.setProperty(HoodieBootstrapConfig.BASE_PATH.key, bootstrapPath)
+
+ if (!StringUtils.isNullOrEmpty(keyGeneratorClass) &&
KeyGeneratorType.getNames.contains(keyGeneratorClass.toUpperCase(Locale.ROOT)))
{
+ properties.setProperty(HoodieBootstrapConfig.KEYGEN_TYPE.key,
keyGeneratorClass.toUpperCase(Locale.ROOT))
+ }
+ else {
+ properties.setProperty(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key,
keyGeneratorClass)
+ }
+
+
properties.setProperty(HoodieBootstrapConfig.FULL_BOOTSTRAP_INPUT_PROVIDER_CLASS_NAME.key,
fullBootstrapInputProvider)
+ properties.setProperty(HoodieBootstrapConfig.PARALLELISM_VALUE.key,
parallelism.toString)
+ properties.setProperty(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key,
selectorClass)
+ properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD.key,
rowKeyField)
+ properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key,
partitionPathField)
+
+ val fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration)
+
+ val cfg = new BootstrapExecutorUtils.Config()
+ cfg.setTableName(tableName.get.asInstanceOf[String])
+ cfg.setTableType(tableType)
+ cfg.setBasePath(basePath)
+ cfg.setBaseFileFormat(baseFileFormat)
+ cfg.setBootstrapIndexClass(bootstrapIndexClass)
+ cfg.setSchemaProviderClass(schemaProviderClass)
+ cfg.setPayloadClass(payloadClass)
+ cfg.setEnableHiveSync(enableHiveSync)
+ cfg.setBootstrapOverwrite(bootstrapOverwrite)
+
+ try {
+ new BootstrapExecutorUtils(cfg, jsc, fs, jsc.hadoopConfiguration,
properties).execute()
+ } catch {
+ case e: Exception =>
+ logWarning(s"Run bootstrap failed due to", e)
+ Seq(Row(-1))
+ }
+ Seq(Row(0))
+ }
+
+ override def build = new RunBootstrapProcedure()
+}
+
+object RunBootstrapProcedure {
+ val NAME = "run_bootstrap"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get() = new RunBootstrapProcedure
+ }
+}
+
+
+
+
+
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapMappingProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapMappingProcedure.scala
new file mode 100644
index 0000000000..dab3891686
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapMappingProcedure.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import com.google.common.collect.Lists
+import org.apache.hudi.common.bootstrap.index.BootstrapIndex
+import org.apache.hudi.common.model.{BootstrapFileMapping, HoodieFileGroupId}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
+
+import java.util
+import java.util.function.Supplier
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+class ShowBootstrapMappingProcedure extends BaseProcedure with
ProcedureBuilder {
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.optional(1, "partitionPath", DataTypes.StringType, ""),
+ ProcedureParameter.optional(2, "fileIds", DataTypes.StringType, ""),
+ ProcedureParameter.optional(3, "limit", DataTypes.IntegerType, 10),
+ ProcedureParameter.optional(4, "sortBy", DataTypes.StringType,
"partition"),
+ ProcedureParameter.optional(5, "desc", DataTypes.BooleanType, false)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("partition", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("fileid", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("source_basepath", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("source_partition", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("source_file", DataTypes.StringType, nullable = true,
Metadata.empty))
+ )
+
+ def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ def outputType: StructType = OUTPUT_TYPE
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val partitionPath = getArgValueOrDefault(args,
PARAMETERS(1)).get.asInstanceOf[String]
+ val fileIds = getArgValueOrDefault(args,
PARAMETERS(2)).get.asInstanceOf[String]
+ val limit = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[Int]
+ val sortBy = getArgValueOrDefault(args,
PARAMETERS(4)).get.asInstanceOf[String]
+ val desc = getArgValueOrDefault(args,
PARAMETERS(5)).get.asInstanceOf[Boolean]
+
+ val basePath: String = getBasePath(tableName)
+ val metaClient =
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+
+ if (partitionPath.isEmpty && fileIds.nonEmpty) throw new
IllegalStateException("PartitionPath is mandatory when passing fileIds.")
+
+ val indexReader = createBootstrapIndexReader(metaClient)
+ val indexedPartitions = indexReader.getIndexedPartitionPaths
+
+ if (partitionPath.nonEmpty && !indexedPartitions.contains(partitionPath))
new HoodieException(partitionPath + " is not an valid indexed partition")
+
+ val mappingList: util.ArrayList[BootstrapFileMapping] = new
util.ArrayList[BootstrapFileMapping]
+ if (fileIds.nonEmpty) {
+ val fileGroupIds = fileIds.split(",").toList.map((fileId: String) => new
HoodieFileGroupId(partitionPath, fileId)).asJava
+
mappingList.addAll(indexReader.getSourceFileMappingForFileIds(fileGroupIds).values)
+ } else if (partitionPath.nonEmpty)
mappingList.addAll(indexReader.getSourceFileMappingForPartition(partitionPath))
+ else {
+ for (part <- indexedPartitions) {
+ mappingList.addAll(indexReader.getSourceFileMappingForPartition(part))
+ }
+ }
+
+ val rows: java.util.List[Row] = mappingList
+ .map(mapping => Row(mapping.getPartitionPath, mapping.getFileId,
mapping.getBootstrapBasePath,
+ mapping.getBootstrapPartitionPath,
mapping.getBootstrapFileStatus.getPath.getUri)).toList
+
+ val df = spark.createDataFrame(rows, OUTPUT_TYPE)
+
+ if (desc) {
+ df.orderBy(df(sortBy).desc).limit(limit).collect()
+ } else {
+ df.orderBy(df(sortBy).asc).limit(limit).collect()
+ }
+ }
+
+ private def createBootstrapIndexReader(metaClient: HoodieTableMetaClient) = {
+ val index = BootstrapIndex.getBootstrapIndex(metaClient)
+ if (!index.useIndex) throw new HoodieException("This is not a bootstrapped
Hudi table. Don't have any index info")
+ index.createReader
+ }
+
+ override def build: Procedure = new ShowBootstrapMappingProcedure()
+}
+
+object ShowBootstrapMappingProcedure {
+ val NAME = "show_bootstrap_mapping"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get() = new ShowBootstrapMappingProcedure
+ }
+}
+
+
+
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapPartitionsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapPartitionsProcedure.scala
new file mode 100644
index 0000000000..b3bebd7f22
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapPartitionsProcedure.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.common.bootstrap.index.BootstrapIndex
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
+
+import java.util.function.Supplier
+
+class ShowBootstrapPartitionsProcedure extends BaseProcedure with
ProcedureBuilder {
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("indexed_partitions", DataTypes.StringType, nullable = true,
Metadata.empty))
+ )
+
+ def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ def outputType: StructType = OUTPUT_TYPE
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+
+ val basePath: String = getBasePath(tableName)
+ val metaClient =
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+
+ val indexReader = createBootstrapIndexReader(metaClient)
+ val indexedPartitions = indexReader.getIndexedPartitionPaths
+
+ indexedPartitions.stream().toArray.map(r => Row(r)).toList
+ }
+
+ private def createBootstrapIndexReader(metaClient: HoodieTableMetaClient) = {
+ val index = BootstrapIndex.getBootstrapIndex(metaClient)
+ if (!index.useIndex) throw new HoodieException("This is not a bootstrapped
Hudi table. Don't have any index info")
+ index.createReader
+ }
+
+ override def build = new ShowBootstrapPartitionsProcedure()
+}
+
+object ShowBootstrapPartitionsProcedure {
+ val NAME = "show_bootstrap_partitions"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get() = new ShowBootstrapPartitionsProcedure
+ }
+}
+
+
+
+
+
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
new file mode 100644
index 0000000000..931d313013
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.functional.TestBootstrap
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+import org.apache.spark.sql.{Dataset, Row}
+
+import java.time.Instant
+import java.util
+
+class TestBootstrapProcedure extends HoodieSparkSqlTestBase {
+
+ test("Test Call run_bootstrap Procedure") {
+ withTempDir { tmp =>
+ val NUM_OF_RECORDS = 100
+ val PARTITION_FIELD = "datestr"
+ val RECORD_KEY_FIELD = "_row_key"
+
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}"
+
+ val srcName: String = "source"
+ val sourcePath = basePath + Path.SEPARATOR + srcName
+ val tablePath = basePath + Path.SEPARATOR + tableName
+ val jsc = new JavaSparkContext(spark.sparkContext)
+
+ // generate test data
+ val partitions = util.Arrays.asList("2018", "2019", "2020")
+ val timestamp: Long = Instant.now.toEpochMilli
+ for (i <- 0 until partitions.size) {
+ val df: Dataset[Row] =
TestBootstrap.generateTestRawTripDataset(timestamp, i * NUM_OF_RECORDS, i *
NUM_OF_RECORDS + NUM_OF_RECORDS, null, jsc, spark.sqlContext)
+ df.write.parquet(sourcePath + Path.SEPARATOR + PARTITION_FIELD + "=" +
partitions.get(i))
+ }
+
+ // run bootstrap
+ checkAnswer(
+ s"""call run_bootstrap(
+ |table => '$tableName',
+ |basePath => '$tablePath',
+ |tableType => '${HoodieTableType.COPY_ON_WRITE.name}',
+ |bootstrapPath => '$sourcePath',
+ |rowKeyField => '$RECORD_KEY_FIELD',
+ |partitionPathField => '$PARTITION_FIELD',
+ |bootstrapOverwrite => true)""".stripMargin) {
+ Seq(0)
+ }
+
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName using hudi
+ |location '$tablePath'
+ |tblproperties(primaryKey = '$RECORD_KEY_FIELD')
+ |""".stripMargin)
+
+ // show bootstrap's index partitions
+ var result = spark.sql(s"""call show_bootstrap_partitions(table =>
'$tableName')""".stripMargin).collect()
+ assertResult(3) {
+ result.length
+ }
+
+ // show bootstrap's index mapping
+ result = spark.sql(
+ s"""call show_bootstrap_mapping(table =>
'$tableName')""".stripMargin).collect()
+ assertResult(3) {
+ result.length
+ }
+ }
+ }
+}