This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 02ae11f4b46255af43a4b60b9e45fe3059c84408 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Sat Mar 9 09:33:10 2024 -0800 [HUDI-5101] Adding spark-structured streaming test support via spark-submit job (#7074) Co-authored-by: Y Ethan Guo <[email protected]> --- .../streaming/StructuredStreamingSinkUtil.java | 168 +++++++++++++++++++++ .../StructuredStreamingSinkTestWriter.scala | 104 +++++++++++++ 2 files changed, 272 insertions(+) diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/streaming/StructuredStreamingSinkUtil.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/streaming/StructuredStreamingSinkUtil.java new file mode 100644 index 00000000000..f6fec62cb3b --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/streaming/StructuredStreamingSinkUtil.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.integ.testsuite.streaming; + +import org.apache.hudi.exception.HoodieException; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** + * Saprk-submit to test spark streaming + * + * Sample command. + * ./bin/spark-submit --master local[2] --driver-memory 1g --executor-memory 1g \ + * --class org.apache.hudi.streaming.StructuredStreamingSinkUtil PATH TO hudi-integ-test-bundle-0.13.0-SNAPSHOT.jar \ + * --spark-master local[2] \ + * --source-path /tmp/parquet_ny/ \ + * --target-path /tmp/hudi_streaming_kafka10/MERGE_ON_READ3/ \ + * --checkpoint-path /tmp/hudi_streaming_kafka10/checkpoint_mor3/ \ + * --table-type COPY_ON_WRITE \ + * --partition-field date_col \ + * --record-key-field tpep_pickup_datetime \ + * --pre-combine-field tpep_dropoff_datetime \ + * --table-name test_tbl + * + * Ensure "source-path" has parquet data. + */ +public class StructuredStreamingSinkUtil implements Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(StructuredStreamingSinkUtil.class); + + private transient JavaSparkContext jsc; + private SparkSession sparkSession; + private Config cfg; + + public StructuredStreamingSinkUtil(JavaSparkContext jsc, Config cfg) { + this.jsc = jsc; + this.sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate(); + this.cfg = cfg; + } + + public static class Config implements Serializable { + @Parameter(names = {"--source-path", "-sp"}, description = "Source path to consume data from", required = true) + public String sourcePath = null; + + @Parameter(names = {"--target-path", "-tp"}, description = "Target path of the table of interest.", required = true) + public String targetPath = null; + + @Parameter(names = {"--table-type", "-ty"}, description = "Target path of the table of interest.", required = true) + public String tableType = "COPY_ON_WRITE"; + + @Parameter(names = {"--checkpoint-path", "-cp"}, description = "Checkppint path of the table of interest", required = true) + public String checkpointPath = null; + + @Parameter(names = {"--partition-field", "-pp"}, description = "Partitioning field", required = true) + public String partitionField = null; + + @Parameter(names = {"--record-key-field", "-rk"}, description = "record key field", required = true) + public String recordKeyField = null; + + @Parameter(names = {"--pre-combine-field", "-pc"}, description = "Precombine field", required = true) + public String preCombineField = null; + + @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true) + public String tableName = null; + + @Parameter(names = {"--disable-metadata", "-dmdt"}, description = "Disable metadata while querying", required = false) + public Boolean disableMetadata = false; + + @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false) + public String sparkMaster = null; + + @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = false) + public String sparkMemory = "1g"; + + @Parameter(names = {"--help", "-h"}, help = true) + public Boolean help = false; + + } + + public static void main(String[] args) { + final Config cfg = new Config(); + JCommander cmd = new JCommander(cfg, null, args); + + if (cfg.help || args.length == 0) { + cmd.usage(); + System.exit(1); + } + + SparkConf sparkConf = buildSparkConf("Spark-structured-streaming-test", cfg.sparkMaster); + sparkConf.set("spark.executor.memory", cfg.sparkMemory); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + + try { + StructuredStreamingSinkUtil streamingSinkUtil = new StructuredStreamingSinkUtil(jsc, cfg); + streamingSinkUtil.run(); + } catch (Throwable throwable) { + LOG.error("Fail to execute tpcds read benchmarks for " + cfg, throwable); + } finally { + jsc.stop(); + } + } + + public void run() { + try { + LOG.info(cfg.toString()); + StructuredStreamingSinkTestWriter.triggerStreaming(sparkSession, cfg.tableType, cfg.sourcePath, cfg.targetPath, cfg.checkpointPath, + cfg.tableName, cfg.partitionField, cfg.recordKeyField, cfg.preCombineField); + StructuredStreamingSinkTestWriter.waitUntilCondition(1000 * 60 * 10, 1000 * 30); + } catch (Exception e) { + throw new HoodieException("Unable to test spark structured writes to hudi " + cfg.targetPath, e); + } finally { + LOG.warn("Completing Spark Structured Streaming test"); + } + } + + public static SparkConf buildSparkConf(String appName, String defaultMaster) { + return buildSparkConf(appName, defaultMaster, new HashMap<>()); + } + + private static SparkConf buildSparkConf(String appName, String defaultMaster, Map<String, String> additionalConfigs) { + final SparkConf sparkConf = new SparkConf().setAppName(appName); + String master = sparkConf.get("spark.master", defaultMaster); + sparkConf.setMaster(master); + if (master.startsWith("yarn")) { + sparkConf.set("spark.eventLog.overwrite", "true"); + sparkConf.set("spark.eventLog.enabled", "true"); + } + sparkConf.set("spark.ui.port", "8090"); + sparkConf.setIfMissing("spark.driver.maxResultSize", "2g"); + sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + sparkConf.set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar"); + sparkConf.set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"); + sparkConf.set("spark.hadoop.mapred.output.compress", "true"); + sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true"); + sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); + sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK"); + + additionalConfigs.forEach(sparkConf::set); + return sparkConf; + } +} diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/streaming/StructuredStreamingSinkTestWriter.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/streaming/StructuredStreamingSinkTestWriter.scala new file mode 100644 index 00000000000..8eb3b469e93 --- /dev/null +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/streaming/StructuredStreamingSinkTestWriter.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.integ.testsuite.streaming + +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.config.HoodieWriteConfig.FAIL_ON_TIMELINE_ARCHIVING_ENABLE +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent} +import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryListener, Trigger} +import org.apache.log4j.LogManager + +object StructuredStreamingSinkTestWriter { + + private val log = LogManager.getLogger(getClass) + var validationComplete: Boolean = false; + + def waitUntilCondition(): Unit = { + waitUntilCondition(1000 * 60 * 5, 500) + } + + def waitUntilCondition(maxWaitTimeMs: Long, intervalTimeMs: Long): Unit = { + var waitSoFar: Long = 0; + while (waitSoFar < maxWaitTimeMs && !validationComplete) { + log.info("Waiting for " + intervalTimeMs + ". Total wait time " + waitSoFar) + Thread.sleep(intervalTimeMs) + waitSoFar += intervalTimeMs + } + } + + def triggerStreaming(spark: SparkSession, tableType: String, inputPath: String, hudiPath: String, hudiCheckpointPath: String, + tableName: String, partitionPathField: String, recordKeyField: String, + preCombineField: String): Unit = { + + def validate(): Unit = { + log.info("Validation starting") + val inputDf = spark.read.format("parquet").load(inputPath) + val hudiDf = spark.read.format("hudi").load(hudiPath) + inputDf.registerTempTable("inputTbl") + hudiDf.registerTempTable("hudiTbl") + assert(spark.sql("select count(distinct " + partitionPathField + ", " + recordKeyField + ") from inputTbl").count == + spark.sql("select count(distinct " + partitionPathField + ", " + recordKeyField + ") from hudiTbl").count) + validationComplete = true + log.info("Validation complete") + } + + def shutdownListener(spark: SparkSession) = new StreamingQueryListener() { + override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { + log.info("Query started: " + queryStarted.id) + } + + override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { + log.info("Query terminated! " + queryTerminated.id + ". Validating input and hudi") + validate() + log.info("Data Validation complete") + } + + override def onQueryProgress(queryProgressEvent: QueryProgressEvent): Unit = { + if (queryProgressEvent.progress.numInputRows == 0) { + log.info("Stopping spark structured streaming as we have reached the end") + spark.streams.active.foreach(_.stop()) + } + } + } + + spark.streams.addListener(shutdownListener(spark)) + log.info("Starting to consume from source and writing to hudi ") + + val inputDfSchema = spark.read.format("parquet").load(inputPath).schema + val parquetdf = spark.readStream.option("spark.sql.streaming.schemaInference", "true").option("maxFilesPerTrigger", "1") + .schema(inputDfSchema).parquet(inputPath) + + val writer = parquetdf.writeStream.format("org.apache.hudi"). + option(TABLE_TYPE.key, tableType). + option(PRECOMBINE_FIELD.key, preCombineField). + option(RECORDKEY_FIELD.key, recordKeyField). + option(PARTITIONPATH_FIELD.key, partitionPathField). + option(FAIL_ON_TIMELINE_ARCHIVING_ENABLE.key, false). + option(STREAMING_IGNORE_FAILED_BATCH.key, false). + option(STREAMING_RETRY_CNT.key, 0). + option("hoodie.table.name", tableName). + option("hoodie.compact.inline.max.delta.commits", "2"). + option("checkpointLocation", hudiCheckpointPath). + outputMode(OutputMode.Append()); + + writer.trigger(Trigger.ProcessingTime(30000)).start(hudiPath); + } +}
