This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9da2dd4 [HUDI-3719] High performance costs of AvroSerizlizer in
DataSource wr… (#5137)
9da2dd4 is described below
commit 9da2dd416ec76ec68393e38f8e1e94c7de141eb0
Author: xiarixiaoyao <[email protected]>
AuthorDate: Mon Mar 28 02:01:43 2022 +0800
[HUDI-3719] High performance costs of AvroSerizlizer in DataSource wr…
(#5137)
* [HUDI-3719] High performance costs of AvroSerizlizer in DataSource writing
* add benchmark framework which modify from spark
add avroSerDerBenchmark
---
.../org/apache/hudi/AvroConversionUtils.scala | 9 +-
.../spark/hudi/benchmark/HoodieBenchmark.scala | 239 +++++++++++++++++++++
.../spark/hudi/benchmark/HoodieBenchmarkBase.scala | 87 ++++++++
.../spark/hudi/benchmark/HoodieBenchmarks.scala | 143 ++++++++++++
.../execution/benchmark/AvroSerDerBenchmark.scala | 99 +++++++++
5 files changed, 574 insertions(+), 3 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index 0844c73..df878d7 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -62,10 +62,12 @@ object AvroConversionUtils {
* @param rootCatalystType Catalyst [[StructType]] to be transformed into
* @return converter accepting Avro payload and transforming it into a
Catalyst one (in the form of [[InternalRow]])
*/
- def createAvroToInternalRowConverter(rootAvroType: Schema, rootCatalystType:
StructType): GenericRecord => Option[InternalRow] =
- record => sparkAdapter.createAvroDeserializer(rootAvroType,
rootCatalystType)
+ def createAvroToInternalRowConverter(rootAvroType: Schema, rootCatalystType:
StructType): GenericRecord => Option[InternalRow] = {
+ val deserializer = sparkAdapter.createAvroDeserializer(rootAvroType,
rootCatalystType)
+ record => deserializer
.deserialize(record)
.map(_.asInstanceOf[InternalRow])
+ }
/**
* Creates converter to transform Catalyst payload into Avro one
@@ -76,7 +78,8 @@ object AvroConversionUtils {
* @return converter accepting Catalyst payload (in the form of
[[InternalRow]]) and transforming it into an Avro one
*/
def createInternalRowToAvroConverter(rootCatalystType: StructType,
rootAvroType: Schema, nullable: Boolean): InternalRow => GenericRecord = {
- row => sparkAdapter.createAvroSerializer(rootCatalystType, rootAvroType,
nullable)
+ val serializer = sparkAdapter.createAvroSerializer(rootCatalystType,
rootAvroType, nullable)
+ row => serializer
.serialize(row)
.asInstanceOf[GenericRecord]
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmark.scala
new file mode 100644
index 0000000..6d4317a
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmark.scala
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.hudi.benchmark
+
+
+import java.io.{OutputStream, PrintStream}
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.util.Try
+
+import org.apache.commons.io.output.TeeOutputStream
+import org.apache.commons.lang3.SystemUtils
+
+import org.apache.spark.util.Utils
+
+/**
+ * Reference from spark.
+ * Utility class to benchmark components. An example of how to use this is:
+ * val benchmark = new Benchmark("My Benchmark", valuesPerIteration)
+ * benchmark.addCase("V1")(<function>)
+ * benchmark.addCase("V2")(<function>)
+ * benchmark.run
+ * This will output the average time to run each function and the rate of each
function.
+ *
+ * The benchmark function takes one argument that is the iteration that's
being run.
+ *
+ * @param name name of this benchmark.
+ * @param valuesPerIteration number of values used in the test case, used to
compute rows/s.
+ * @param minNumIters the min number of iterations that will be run per case,
not counting warm-up.
+ * @param warmupTime amount of time to spend running dummy case iterations for
JIT warm-up.
+ * @param minTime further iterations will be run for each case until this time
is used up.
+ * @param outputPerIteration if true, the timing for each run will be printed
to stdout.
+ * @param output optional output stream to write benchmark results to
+ */
+class HoodieBenchmark(
+ name: String,
+ valuesPerIteration: Long,
+ minNumIters: Int = 2,
+ warmupTime: FiniteDuration = 2.seconds,
+ minTime: FiniteDuration = 2.seconds,
+ outputPerIteration: Boolean = false,
+ output: Option[OutputStream] = None) {
+ import HoodieBenchmark._
+ val benchmarks = mutable.ArrayBuffer.empty[HoodieBenchmark.Case]
+
+ val out = if (output.isDefined) {
+ new PrintStream(new TeeOutputStream(System.out, output.get))
+ } else {
+ System.out
+ }
+
+ /**
+ * Adds a case to run when run() is called. The given function will be run
for several
+ * iterations to collect timing statistics.
+ *
+ * @param name of the benchmark case
+ * @param numIters if non-zero, forces exactly this many iterations to be run
+ */
+ def addCase(name: String, numIters: Int = 0)(f: Int => Unit): Unit = {
+ addTimerCase(name, numIters) { timer =>
+ timer.startTiming()
+ f(timer.iteration)
+ timer.stopTiming()
+ }
+ }
+
+ /**
+ * Adds a case with manual timing control. When the function is run, timing
does not start
+ * until timer.startTiming() is called within the given function. The
corresponding
+ * timer.stopTiming() method must be called before the function returns.
+ *
+ * @param name of the benchmark case
+ * @param numIters if non-zero, forces exactly this many iterations to be run
+ */
+ def addTimerCase(name: String, numIters: Int = 0)(f: HoodieBenchmark.Timer
=> Unit): Unit = {
+ benchmarks += HoodieBenchmark.Case(name, f, numIters)
+ }
+
+ /**
+ * Runs the benchmark and outputs the results to stdout. This should be
copied and added as
+ * a comment with the benchmark. Although the results vary from machine to
machine, it should
+ * provide some baseline.
+ */
+ def run(): Unit = {
+ require(benchmarks.nonEmpty)
+ // scalastyle:off
+ println("Running benchmark: " + name)
+
+ val results = benchmarks.map { c =>
+ println(" Running case: " + c.name)
+ measure(valuesPerIteration, c.numIters)(c.fn)
+ }
+ println
+
+ val firstBest = results.head.bestMs
+ // The results are going to be processor specific so it is useful to
include that.
+ out.println(HoodieBenchmark.getJVMOSInfo())
+ out.println(HoodieBenchmark.getProcessorName())
+ val nameLen = Math.max(40, Math.max(name.length,
benchmarks.map(_.name.length).max))
+ out.printf(s"%-${nameLen}s %14s %14s %11s %12s %13s %10s\n",
+ name + ":", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)", "Rate(M/s)",
"Per Row(ns)", "Relative")
+ out.println("-" * (nameLen + 80))
+ results.zip(benchmarks).foreach { case (result, benchmark) =>
+ out.printf(s"%-${nameLen}s %14s %14s %11s %12s %13s %10s\n",
+ benchmark.name,
+ "%5.0f" format result.bestMs,
+ "%4.0f" format result.avgMs,
+ "%5.0f" format result.stdevMs,
+ "%10.1f" format result.bestRate,
+ "%6.1f" format (1000 / result.bestRate),
+ "%3.1fX" format (firstBest / result.bestMs))
+ }
+ out.println
+ // scalastyle:on
+ }
+
+ /**
+ * Runs a single function `f` for iters, returning the average time the
function took and
+ * the rate of the function.
+ */
+ def measure(num: Long, overrideNumIters: Int)(f: Timer => Unit): Result = {
+ System.gc() // ensures garbage from previous cases don't impact this one
+ val warmupDeadline = warmupTime.fromNow
+ while (!warmupDeadline.isOverdue) {
+ f(new HoodieBenchmark.Timer(-1))
+ }
+ val minIters = if (overrideNumIters != 0) overrideNumIters else minNumIters
+ val minDuration = if (overrideNumIters != 0) 0 else minTime.toNanos
+ val runTimes = ArrayBuffer[Long]()
+ var totalTime = 0L
+ var i = 0
+ while (i < minIters || totalTime < minDuration) {
+ val timer = new HoodieBenchmark.Timer(i)
+ f(timer)
+ val runTime = timer.totalTime()
+ runTimes += runTime
+ totalTime += runTime
+
+ if (outputPerIteration) {
+ // scalastyle:off
+ println(s"Iteration $i took ${NANOSECONDS.toMicros(runTime)}
microseconds")
+ // scalastyle:on
+ }
+ i += 1
+ }
+ // scalastyle:off
+ println(s" Stopped after $i iterations,
${NANOSECONDS.toMillis(runTimes.sum)} ms")
+ // scalastyle:on
+ assert(runTimes.nonEmpty)
+ val best = runTimes.min
+ val avg = runTimes.sum / runTimes.size
+ val stdev = if (runTimes.size > 1) {
+ math.sqrt(runTimes.map(time => (time - avg) * (time - avg)).sum /
(runTimes.size - 1))
+ } else 0
+ Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0, stdev /
1000000.0)
+ }
+}
+
+object HoodieBenchmark {
+
+ /**
+ * Object available to benchmark code to control timing e.g. to exclude
set-up time.
+ *
+ * @param iteration specifies this is the nth iteration of running the
benchmark case
+ */
+ class Timer(val iteration: Int) {
+ private var accumulatedTime: Long = 0L
+ private var timeStart: Long = 0L
+
+ def startTiming(): Unit = {
+ assert(timeStart == 0L, "Already started timing.")
+ timeStart = System.nanoTime
+ }
+
+ def stopTiming(): Unit = {
+ assert(timeStart != 0L, "Have not started timing.")
+ accumulatedTime += System.nanoTime - timeStart
+ timeStart = 0L
+ }
+
+ def totalTime(): Long = {
+ assert(timeStart == 0L, "Have not stopped timing.")
+ accumulatedTime
+ }
+ }
+
+ case class Case(name: String, fn: Timer => Unit, numIters: Int)
+ case class Result(avgMs: Double, bestRate: Double, bestMs: Double, stdevMs:
Double)
+
+ /**
+ * This should return a user helpful processor information. Getting at this
depends on the OS.
+ * This should return something like "Intel(R) Core(TM) i7-4870HQ CPU @
2.50GHz"
+ */
+ def getProcessorName(): String = {
+ val cpu = if (SystemUtils.IS_OS_MAC_OSX) {
+ Utils.executeAndGetOutput(Seq("/usr/sbin/sysctl", "-n",
"machdep.cpu.brand_string"))
+ .stripLineEnd
+ } else if (SystemUtils.IS_OS_LINUX) {
+ Try {
+ val grepPath = Utils.executeAndGetOutput(Seq("which",
"grep")).stripLineEnd
+ Utils.executeAndGetOutput(Seq(grepPath, "-m", "1", "model name",
"/proc/cpuinfo"))
+ .stripLineEnd.replaceFirst("model name[\\s*]:[\\s*]", "")
+ }.getOrElse("Unknown processor")
+ } else {
+ System.getenv("PROCESSOR_IDENTIFIER")
+ }
+ cpu
+ }
+
+ /**
+ * This should return a user helpful JVM & OS information.
+ * This should return something like
+ * "OpenJDK 64-Bit Server VM 1.8.0_65-b17 on Linux 4.1.13-100.fc21.x86_64"
+ */
+ def getJVMOSInfo(): String = {
+ val vmName = System.getProperty("java.vm.name")
+ val runtimeVersion = System.getProperty("java.runtime.version")
+ val osName = System.getProperty("os.name")
+ val osVersion = System.getProperty("os.version")
+ s"${vmName} ${runtimeVersion} on ${osName} ${osVersion}"
+ }
+}
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmarkBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmarkBase.scala
new file mode 100644
index 0000000..ff4f0bc
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmarkBase.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.hudi.benchmark
+
+import java.io.{File, FileOutputStream, OutputStream}
+
+/**
+ * Reference from spark.
+ * A base class for generate benchmark results to a file.
+ * For JDK9+, JDK major version number is added to the file names to
distinguish the results.
+ */
+abstract class HoodieBenchmarkBase {
+ var output: Option[OutputStream] = None
+
+ /**
+ * Main process of the whole benchmark.
+ * Implementations of this method are supposed to use the wrapper method
`runBenchmark`
+ * for each benchmark scenario.
+ */
+ def runBenchmarkSuite(mainArgs: Array[String]): Unit
+
+ final def runBenchmark(benchmarkName: String)(func: => Any): Unit = {
+ val separator = "=" * 96
+ val testHeader = (separator + '\n' + benchmarkName + '\n' + separator +
'\n' + '\n').getBytes
+ output.foreach(_.write(testHeader))
+ func
+ output.foreach(_.write('\n'))
+ }
+
+ def main(args: Array[String]): Unit = {
+ // turning this on so the behavior between running benchmark via
`spark-submit` or SBT will
+ // be consistent, also allow users to turn on/off certain behavior such as
+ // `spark.sql.codegen.factoryMode`
+ val regenerateBenchmarkFiles: Boolean =
System.getenv("SPARK_GENERATE_BENCHMARK_FILES") == "1"
+ if (regenerateBenchmarkFiles) {
+ val version = System.getProperty("java.version").split("\\D+")(0).toInt
+ val jdkString = if (version > 8) s"-jdk$version" else ""
+ val resultFileName =
+ s"${this.getClass.getSimpleName.replace("$",
"")}jdkStringsuffix-results.txt"
+ val prefix = HoodieBenchmarks.currentProjectRoot.map(_ +
"/").getOrElse("")
+ val dir = new File(s"${prefix}benchmarks/")
+ if (!dir.exists()) {
+ // scalastyle:off println
+ println(s"Creating ${dir.getAbsolutePath} for benchmark results.")
+ // scalastyle:on println
+ dir.mkdirs()
+ }
+ val file = new File(dir, resultFileName)
+ if (!file.exists()) {
+ file.createNewFile()
+ }
+ output = Some(new FileOutputStream(file))
+ }
+
+ runBenchmarkSuite(args)
+
+ output.foreach { o =>
+ if (o != null) {
+ o.close()
+ }
+ }
+
+ afterAll()
+ }
+
+ def suffix: String = ""
+
+ /**
+ * Any shutdown code to ensure a clean shutdown
+ */
+ def afterAll(): Unit = {}
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmarks.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmarks.scala
new file mode 100644
index 0000000..8729910
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmarks.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.hudi.benchmark
+
+import java.io.File
+import java.lang.reflect.Modifier
+import java.nio.file.{FileSystems, Paths}
+import java.util.Locale
+import scala.collection.JavaConverters._
+import scala.util.Try
+import org.apache.hbase.thirdparty.com.google.common.reflect.ClassPath
+
+/**
+ * Reference from spark.
+ * Run all benchmarks. To run this benchmark, you should build Spark with
either Maven or SBT.
+ * After that, you can run as below:
+ *
+ * {{{
+ * 1. with spark-submit
+ * bin/spark-submit --class <this class>
+ * --jars <all spark test jars>,<spark external package jars>
+ * <spark core test jar> <glob pattern for class> <extra arguments>
+ * 2. generate result:
+ * SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit --class <this class>
+ * --jars <all spark test jars>,<spark external package jars>
+ * <spark core test jar> <glob pattern for class> <extra arguments>
+ * Results will be written to all corresponding files under "benchmarks/".
+ * Notice that it detects the sub-project's directories from jar's paths
so the provided jars
+ * should be properly placed under target (Maven build) or target/scala-*
(SBT) when you
+ * generate the files.
+ * }}}
+ *
+ * You can use a command as below to find all the test jars.
+ * Make sure to do not select duplicated jars created by different versions of
builds or tools.
+ * {{{
+ * find . -name '*-SNAPSHOT-tests.jar' | paste -sd ',' -
+ * }}}
+ *
+ * The example below runs all benchmarks and generates the results:
+ * {{{
+ * SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit --class \
+ * org.apache.spark.benchmark.Benchmarks --jars \
+ * "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' |
paste -sd ',' -`" \
+ * "`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \
+ * "*"
+ * }}}
+ *
+ * The example below runs all benchmarks under
"org.apache.spark.sql.execution.datasources"
+ * {{{
+ * bin/spark-submit --class \
+ * org.apache.spark.benchmark.Benchmarks --jars \
+ * "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' |
paste -sd ',' -`" \
+ * "`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \
+ * "org.apache.spark.sql.execution.datasources.*"
+ * }}}
+ */
+
+object HoodieBenchmarks {
+ var currentProjectRoot: Option[String] = None
+
+ def main(args: Array[String]): Unit = {
+ val isFailFast = sys.env.get(
+
"SPARK_BENCHMARK_FAILFAST").map(_.toLowerCase(Locale.ROOT).trim.toBoolean).getOrElse(true)
+ val numOfSplits = sys.env.get(
+
"SPARK_BENCHMARK_NUM_SPLITS").map(_.toLowerCase(Locale.ROOT).trim.toInt).getOrElse(1)
+ val currentSplit = sys.env.get(
+ "SPARK_BENCHMARK_CUR_SPLIT").map(_.toLowerCase(Locale.ROOT).trim.toInt -
1).getOrElse(0)
+ var numBenchmark = 0
+
+ var isBenchmarkFound = false
+ val benchmarkClasses = ClassPath.from(
+ Thread.currentThread.getContextClassLoader
+ ).getTopLevelClassesRecursive("org.apache.spark").asScala.toArray
+ val matcher = FileSystems.getDefault.getPathMatcher(s"glob:${args.head}")
+
+ benchmarkClasses.foreach { info =>
+ lazy val clazz = info.load
+ lazy val runBenchmark = clazz.getMethod("main", classOf[Array[String]])
+ // isAssignableFrom seems not working with the reflected class from
Guava's
+ // getTopLevelClassesRecursive.
+ require(args.length > 0, "Benchmark class to run should be specified.")
+ if (
+ info.getName.endsWith("Benchmark") &&
+ // TODO(SPARK-34927): Support TPCDSQueryBenchmark in Benchmarks
+ !info.getName.endsWith("TPCDSQueryBenchmark") &&
+ matcher.matches(Paths.get(info.getName)) &&
+ Try(runBenchmark).isSuccess && // Does this has a main method?
+ !Modifier.isAbstract(clazz.getModifiers) // Is this a regular class?
+ ) {
+ numBenchmark += 1
+ if (numBenchmark % numOfSplits == currentSplit) {
+ isBenchmarkFound = true
+
+ val targetDirOrProjDir =
+ new File(clazz.getProtectionDomain.getCodeSource.getLocation.toURI)
+ .getParentFile.getParentFile
+
+ // The root path to be referred in each benchmark.
+ currentProjectRoot = Some {
+ if (targetDirOrProjDir.getName == "target") {
+ // SBT build
+ targetDirOrProjDir.getParentFile.getCanonicalPath
+ } else {
+ // Maven build
+ targetDirOrProjDir.getCanonicalPath
+ }
+ }
+
+ // scalastyle:off println
+ println(s"Running ${clazz.getName}:")
+ // scalastyle:on println
+ // Force GC to minimize the side effect.
+ System.gc()
+ try {
+ runBenchmark.invoke(null, args.tail.toArray)
+ } catch {
+ case e: Throwable if !isFailFast =>
+ // scalastyle:off println
+ println(s"${clazz.getName} failed with the exception below:")
+ // scalastyle:on println
+ e.printStackTrace()
+ }
+ }
+ }
+ }
+
+ if (!isBenchmarkFound) throw new RuntimeException("No benchmark found to
run.")
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala
new file mode 100644
index 0000000..5e092bd
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.{AvroConversionUtils, HoodieSparkUtils}
+import org.apache.spark.hudi.benchmark.{HoodieBenchmark, HoodieBenchmarkBase}
+import org.apache.spark.sql.functions.lit
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+/**
+ * Benchmark to measure Avro SerDer performance.
+ */
+object AvroSerDerBenchmark extends HoodieBenchmarkBase {
+ protected val spark: SparkSession = getSparkSession
+
+ def getSparkSession: SparkSession = SparkSession
+ .builder()
+ .master("local[1]")
+ .config("spark.driver.memory", "8G")
+ .appName(this.getClass.getCanonicalName)
+ .getOrCreate()
+
+ def getDataFrame(numbers: Long): DataFrame = {
+ spark.range(0, numbers).toDF("id")
+ .withColumn("c1", lit("AvroSerDerBenchmark"))
+ .withColumn("c2", lit(12.99d))
+ .withColumn("c3", lit(1))
+ }
+
+ /**
+ * Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Windows 10 10.0
+ * Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
+ * perf avro serializer for hoodie: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+ *
------------------------------------------------------------------------------------------------------------------------
+ * serialize internalRow to avro Record 6391 6683
413 7.8 127.8 1.0X
+ */
+ private def avroSerializerBenchmark: Unit = {
+ val benchmark = new HoodieBenchmark(s"perf avro serializer for hoodie",
50000000)
+ benchmark.addCase("serialize internalRow to avro Record") { _ =>
+ val df = getDataFrame(50000000)
+ val avroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, "record", "my")
+ spark.sparkContext.getConf.registerAvroSchemas(avroSchema)
+ HoodieSparkUtils.createRdd(df,"record", "my",
Some(avroSchema)).foreach(f => f)
+ }
+ benchmark.run()
+ }
+
+ /**
+ * Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Windows 10 10.0
+ * Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
+ * perf avro deserializer for hoodie: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+ *
------------------------------------------------------------------------------------------------------------------------
+ * deserialize avro Record to internalRow 1340 1360
27 7.5 134.0 1.0X
+ */
+ private def avroDeserializerBenchmark: Unit = {
+ val benchmark = new HoodieBenchmark(s"perf avro deserializer for hoodie",
10000000)
+ val df = getDataFrame(10000000)
+ val sparkSchema = df.schema
+ val avroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, "record", "my")
+ val testRdd = HoodieSparkUtils.createRdd(df,"record", "my",
Some(avroSchema))
+ testRdd.cache()
+ testRdd.foreach(f => f)
+ spark.sparkContext.getConf.registerAvroSchemas(avroSchema)
+ benchmark.addCase("deserialize avro Record to internalRow") { _ =>
+ testRdd.mapPartitions { iter =>
+ val schema =
AvroConversionUtils.convertStructTypeToAvroSchema(sparkSchema, "record", "my")
+ val avroToRowConverter =
AvroConversionUtils.createAvroToInternalRowConverter(schema, sparkSchema)
+ iter.map(record =>
avroToRowConverter.apply(record.asInstanceOf[GenericRecord]).get)
+ }.foreach(f => f)
+ }
+ benchmark.run()
+ }
+
+ override def afterAll(): Unit = {
+ spark.stop()
+ }
+
+ override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+ avroSerializerBenchmark
+ avroDeserializerBenchmark
+ }
+}