This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 06584c6f76815fdc429d51bab66e44ae16e5fe67 Author: Geser Dugarov <[email protected]> AuthorDate: Fri Mar 8 07:52:52 2024 +0700 [MINOR] Separate HoodieSparkWriterTestBase to reduce duplication (#10832) --- .../apache/hudi/HoodieSparkWriterTestBase.scala | 136 ++++++++++++++++++ .../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 152 +++------------------ .../apache/hudi/TestHoodieSparkSqlWriterUtc.scala | 2 +- .../hudi/TestTableSchemaResolverWithSparkSQL.scala | 102 +------------- 4 files changed, 162 insertions(+), 230 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkWriterTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkWriterTestBase.scala new file mode 100644 index 00000000000..c0c1c2c12bd --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkWriterTestBase.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.commons.io.FileUtils +import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType} +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.testutils.HoodieClientTestUtils +import org.apache.spark.SparkContext +import org.apache.spark.sql.hudi.HoodieSparkSessionExtension +import org.apache.spark.sql.{Dataset, Row, SQLContext, SparkSession} +import org.junit.jupiter.api.{AfterEach, BeforeEach} + +import scala.collection.JavaConverters + +class HoodieSparkWriterTestBase { + var spark: SparkSession = _ + var sqlContext: SQLContext = _ + var sc: SparkContext = _ + var tempPath: java.nio.file.Path = _ + var tempBootStrapPath: java.nio.file.Path = _ + var hoodieFooTableName = "hoodie_foo_tbl" + var tempBasePath: String = _ + var commonTableModifier: Map[String, String] = Map() + + case class StringLongTest(uuid: String, ts: Long) + + /** + * Setup method running before each test. + */ + @BeforeEach + def setUp(): Unit = { + initSparkContext() + tempPath = java.nio.file.Files.createTempDirectory("hoodie_test_path") + tempBootStrapPath = java.nio.file.Files.createTempDirectory("hoodie_test_bootstrap") + tempBasePath = tempPath.toAbsolutePath.toString + commonTableModifier = getCommonParams(tempPath, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name()) + } + + /** + * Tear down method running after each test. + */ + @AfterEach + def tearDown(): Unit = { + cleanupSparkContexts() + FileUtils.deleteDirectory(tempPath.toFile) + FileUtils.deleteDirectory(tempBootStrapPath.toFile) + } + + /** + * Utility method for initializing the spark context. + */ + def initSparkContext(): Unit = { + val sparkConf = HoodieClientTestUtils.getSparkConfForTest(getClass.getSimpleName) + + spark = SparkSession.builder() + .withExtensions(new HoodieSparkSessionExtension) + .config(sparkConf) + .getOrCreate() + + sc = spark.sparkContext + sc.setLogLevel("ERROR") + sqlContext = spark.sqlContext + } + + /** + * Utility method for cleaning up spark resources. + */ + def cleanupSparkContexts(): Unit = { + if (sqlContext != null) { + sqlContext.clearCache(); + sqlContext = null; + } + if (sc != null) { + sc.stop() + sc = null + } + if (spark != null) { + spark.close() + } + } + + /** + * Utility method for creating common params for writer. + * + * @param path Path for hoodie table + * @param hoodieFooTableName Name of hoodie table + * @param tableType Type of table + * @return Map of common params + */ + def getCommonParams(path: java.nio.file.Path, hoodieFooTableName: String, tableType: String): Map[String, String] = { + Map("path" -> path.toAbsolutePath.toString, + HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, + "hoodie.insert.shuffle.parallelism" -> "1", + "hoodie.upsert.shuffle.parallelism" -> "1", + DataSourceWriteOptions.TABLE_TYPE.key -> tableType, + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", + DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.SimpleKeyGenerator") + } + + /** + * Utility method for dropping all hoodie meta related columns. + */ + def dropMetaFields(df: Dataset[Row]): Dataset[Row] = { + df.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) + .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3)) + .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4)) + } + + /** + * Utility method for converting list of Row to list of Seq. + * + * @param inputList list of Row + * @return list of Seq + */ + def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] = + JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq + +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index d7a1f9331ae..0767d055915 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -19,10 +19,8 @@ package org.apache.hudi import org.apache.avro.Schema import org.apache.commons.io.FileUtils -import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.HoodieSparkUtils.gteqSpark3_0 import org.apache.hudi.client.SparkRDDWriteClient -import org.apache.hudi.common.model._ +import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, HoodieRecordPayload, HoodieTableType, WriteOperationType} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieIndexConfig, HoodieWriteConfig} @@ -30,19 +28,15 @@ import org.apache.hudi.exception.{HoodieException, SchemaCompatibilityException} import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode import org.apache.hudi.functional.TestBootstrap import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator} -import org.apache.hudi.testutils.DataSourceTestUtils -import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest -import org.apache.spark.SparkContext +import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestUtils} import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.sql._ +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} import org.apache.spark.sql.functions.{expr, lit} -import org.apache.spark.sql.hudi.HoodieSparkSessionExtension import org.apache.spark.sql.hudi.command.SqlKeyGenerator import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertNull, assertTrue, fail} -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.Arguments.arguments -import org.junit.jupiter.params.provider._ +import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, MethodSource, ValueSource} import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{spy, times, verify} import org.scalatest.Assertions.assertThrows @@ -52,7 +46,6 @@ import java.io.IOException import java.time.Instant import java.util.{Collections, Date, UUID} import scala.collection.JavaConversions._ -import scala.collection.JavaConverters /** * Test suite for SparkSqlWriter class. @@ -60,113 +53,10 @@ import scala.collection.JavaConverters * Otherwise UTC tests will generate infinite loops, if there is any initiated test with time zone that is greater then UTC+0. * The reason is in a saved value in the heap of static {@link org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.lastInstantTime}. */ -class TestHoodieSparkSqlWriter { - var spark: SparkSession = _ - var sqlContext: SQLContext = _ - var sc: SparkContext = _ - var tempPath: java.nio.file.Path = _ - var tempBootStrapPath: java.nio.file.Path = _ - var hoodieFooTableName = "hoodie_foo_tbl" - var tempBasePath: String = _ - var commonTableModifier: Map[String, String] = Map() - case class StringLongTest(uuid: String, ts: Long) +class TestHoodieSparkSqlWriter extends HoodieSparkWriterTestBase { /** - * Setup method running before each test. - */ - @BeforeEach - def setUp(): Unit = { - initSparkContext() - tempPath = java.nio.file.Files.createTempDirectory("hoodie_test_path") - tempBootStrapPath = java.nio.file.Files.createTempDirectory("hoodie_test_bootstrap") - tempBasePath = tempPath.toAbsolutePath.toString - commonTableModifier = getCommonParams(tempPath, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name()) - } - - /** - * Tear down method running after each test. - */ - @AfterEach - def tearDown(): Unit = { - cleanupSparkContexts() - FileUtils.deleteDirectory(tempPath.toFile) - FileUtils.deleteDirectory(tempBootStrapPath.toFile) - } - - /** - * Utility method for initializing the spark context. - * - * TODO rebase this onto existing base class to avoid duplication - */ - def initSparkContext(): Unit = { - val sparkConf = getSparkConfForTest(getClass.getSimpleName) - - spark = SparkSession.builder() - .withExtensions(new HoodieSparkSessionExtension) - .config(sparkConf) - .getOrCreate() - - sc = spark.sparkContext - sc.setLogLevel("ERROR") - sqlContext = spark.sqlContext - } - - /** - * Utility method for cleaning up spark resources. - */ - def cleanupSparkContexts(): Unit = { - if (sqlContext != null) { - sqlContext.clearCache(); - sqlContext = null; - } - if (sc != null) { - sc.stop() - sc = null - } - if (spark != null) { - spark.close() - } - } - - /** - * Utility method for dropping all hoodie meta related columns. - */ - def dropMetaFields(df: Dataset[Row]): Dataset[Row] = { - df.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) - .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3)) - .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4)) - } - - /** - * Utility method for creating common params for writer. - * - * @param path Path for hoodie table - * @param hoodieFooTableName Name of hoodie table - * @param tableType Type of table - * @return Map of common params - */ - def getCommonParams(path: java.nio.file.Path, hoodieFooTableName: String, tableType: String): Map[String, String] = { - Map("path" -> path.toAbsolutePath.toString, - HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, - "hoodie.insert.shuffle.parallelism" -> "1", - "hoodie.upsert.shuffle.parallelism" -> "1", - DataSourceWriteOptions.TABLE_TYPE.key -> tableType, - DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", - DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.SimpleKeyGenerator") - } - - /** - * Utility method for converting list of Row to list of Seq. - * - * @param inputList list of Row - * @return list of Seq - */ - def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] = - JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq - - /** - * Utility method for performing bulk insert tests. + * Local utility method for performing bulk insert tests. * * @param sortMode Bulk insert sort mode * @param populateMetaFields Flag for populating meta fields @@ -226,12 +116,13 @@ class TestHoodieSparkSqlWriter { val originals = HoodieWriterUtils.parametersWithWriteDefaults(Map.empty) val rhsKey = "hoodie.right.hand.side.key" val rhsVal = "hoodie.right.hand.side.val" - val modifier = Map(OPERATION.key -> INSERT_OPERATION_OPT_VAL, TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL, rhsKey -> rhsVal) + val modifier = Map(DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, rhsKey -> rhsVal) val modified = HoodieWriterUtils.parametersWithWriteDefaults(modifier) val matcher = (k: String, v: String) => modified(k) should be(v) originals foreach { - case ("hoodie.datasource.write.operation", _) => matcher("hoodie.datasource.write.operation", INSERT_OPERATION_OPT_VAL) - case ("hoodie.datasource.write.table.type", _) => matcher("hoodie.datasource.write.table.type", MOR_TABLE_TYPE_OPT_VAL) + case ("hoodie.datasource.write.operation", _) => matcher("hoodie.datasource.write.operation", DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + case ("hoodie.datasource.write.table.type", _) => matcher("hoodie.datasource.write.table.type", DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) case (`rhsKey`, _) => matcher(rhsKey, rhsVal) case (k, v) => matcher(k, v) } @@ -245,7 +136,7 @@ class TestHoodieSparkSqlWriter { spark.stop() val session = SparkSession.builder() // Here we intentionally remove the "spark.serializer" config to test failure - .config(getSparkConfForTest("hoodie_test").remove("spark.serializer")) + .config(HoodieClientTestUtils.getSparkConfForTest("hoodie_test").remove("spark.serializer")) .getOrCreate() try { val sqlContext = session.sqlContext @@ -290,7 +181,7 @@ class TestHoodieSparkSqlWriter { assert(tableAlreadyExistException.getMessage.contains(s"${HoodieWriteConfig.TBL_NAME.key}:\thoodie_bar_tbl\thoodie_foo_tbl")) //on same path try append with delete operation and different("hoodie_bar_tbl") table name which should throw an exception - val deleteTableModifier = barTableModifier ++ Map(OPERATION.key -> "delete") + val deleteTableModifier = barTableModifier ++ Map(DataSourceWriteOptions.OPERATION.key -> "delete") val deleteCmdException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, deleteTableModifier, dataFrame2)) assert(tableAlreadyExistException.getMessage.contains("Config conflict")) assert(tableAlreadyExistException.getMessage.contains(s"${HoodieWriteConfig.TBL_NAME.key}:\thoodie_bar_tbl\thoodie_foo_tbl")) @@ -454,7 +345,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = { val fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4") .updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true") - .updated(INSERT_DROP_DUPS.key, "true") + .updated(DataSourceWriteOptions.INSERT_DROP_DUPS.key, "true") // generate the inserts val schema = DataSourceTestUtils.getStructTypeExampleSchema @@ -687,10 +578,11 @@ def testBulkInsertForDropPartitionColumn(): Unit = { .setBaseFileFormat(fooTableParams.getOrElse(HoodieWriteConfig.BASE_FILE_FORMAT.key, HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().name)) .setArchiveLogFolder(HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue()) - .setPayloadClassName(PAYLOAD_CLASS_NAME.key) - .setPreCombineField(fooTableParams.getOrElse(PRECOMBINE_FIELD.key, PRECOMBINE_FIELD.defaultValue())) + .setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key) + .setPreCombineField(fooTableParams.getOrElse(DataSourceWriteOptions.PRECOMBINE_FIELD.key, DataSourceWriteOptions.PRECOMBINE_FIELD.defaultValue())) .setPartitionFields(fooTableParams(DataSourceWriteOptions.PARTITIONPATH_FIELD.key)) - .setKeyGeneratorClassProp(fooTableParams.getOrElse(KEYGENERATOR_CLASS_NAME.key, KEYGENERATOR_CLASS_NAME.defaultValue())) + .setKeyGeneratorClassProp(fooTableParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, + DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.defaultValue())) if(addBootstrapPath) { tableMetaClientBuilder .setBootstrapBasePath(fooTableParams(HoodieBootstrapConfig.BASE_PATH.key)) @@ -1364,19 +1256,19 @@ object TestHoodieSparkSqlWriter { // NOTE: Hudi doesn't support Orc in Spark < 3.0 // Please check HUDI-4496 for more details - val targetScenarios = if (gteqSpark3_0) { + val targetScenarios = if (HoodieSparkUtils.gteqSpark3_0) { parquetScenarios ++ orcScenarios } else { parquetScenarios } - java.util.Arrays.stream(targetScenarios.map(as => arguments(as.map(_.asInstanceOf[AnyRef]):_*))) + java.util.Arrays.stream(targetScenarios.map(as => Arguments.arguments(as.map(_.asInstanceOf[AnyRef]):_*))) } def deletePartitionsWildcardTestParams(): java.util.stream.Stream[Arguments] = { java.util.stream.Stream.of( - arguments("*5/03/1*", Seq("2016/03/15")), - arguments("2016/03/*", Seq("2015/03/16", "2015/03/17"))) + Arguments.arguments("*5/03/1*", Seq("2016/03/15")), + Arguments.arguments("2016/03/*", Seq("2015/03/16", "2015/03/17"))) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterUtc.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterUtc.scala index df8614f5e2a..ca4d23f719d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterUtc.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterUtc.scala @@ -36,7 +36,7 @@ import java.util.TimeZone * value of static {@link HoodieInstantTimeGenerator.lastInstantTime} in the heap, * which will be greater than instant time for {@link HoodieTimelineTimeZone.UTC}. */ -class TestHoodieSparkSqlWriterUtc extends TestHoodieSparkSqlWriter { +class TestHoodieSparkSqlWriterUtc extends HoodieSparkWriterTestBase { /* * Test case for instant is generated with commit timezone when TIMELINE_TIMEZONE set to UTC * related to HUDI-5978 diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala index d9d5b59c8d7..70886d96444 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala @@ -18,120 +18,24 @@ package org.apache.hudi import org.apache.avro.Schema -import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.avro.model.HoodieMetadataRecord -import org.apache.hudi.common.model._ import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.testutils.DataSourceTestUtils -import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest -import org.apache.spark.SparkContext -import org.apache.spark.sql._ -import org.apache.spark.sql.hudi.HoodieSparkSessionExtension +import org.apache.spark.sql.SaveMode import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} -import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test} +import org.junit.jupiter.api.{Tag, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.CsvSource -import scala.collection.JavaConverters - /** * Test suite for TableSchemaResolver with SparkSqlWriter. */ @Tag("functional") -class TestTableSchemaResolverWithSparkSQL { - var spark: SparkSession = _ - var sqlContext: SQLContext = _ - var sc: SparkContext = _ - var tempPath: java.nio.file.Path = _ - var tempBootStrapPath: java.nio.file.Path = _ - var hoodieFooTableName = "hoodie_foo_tbl" - var tempBasePath: String = _ - var commonTableModifier: Map[String, String] = Map() - - case class StringLongTest(uuid: String, ts: Long) - - /** - * Setup method running before each test. - */ - @BeforeEach - def setUp(): Unit = { - initSparkContext() - tempPath = java.nio.file.Files.createTempDirectory("hoodie_test_path") - tempBootStrapPath = java.nio.file.Files.createTempDirectory("hoodie_test_bootstrap") - tempBasePath = tempPath.toAbsolutePath.toString - commonTableModifier = getCommonParams(tempPath, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name()) - } - - /** - * Tear down method running after each test. - */ - @AfterEach - def tearDown(): Unit = { - cleanupSparkContexts() - FileUtils.deleteDirectory(tempPath.toFile) - FileUtils.deleteDirectory(tempBootStrapPath.toFile) - } - - /** - * Utility method for initializing the spark context. - */ - def initSparkContext(): Unit = { - spark = SparkSession.builder() - .config(getSparkConfForTest(hoodieFooTableName)) - .getOrCreate() - sc = spark.sparkContext - sc.setLogLevel("ERROR") - sqlContext = spark.sqlContext - } - - /** - * Utility method for cleaning up spark resources. - */ - def cleanupSparkContexts(): Unit = { - if (sqlContext != null) { - sqlContext.clearCache(); - sqlContext = null; - } - if (sc != null) { - sc.stop() - sc = null - } - if (spark != null) { - spark.close() - } - } - - /** - * Utility method for creating common params for writer. - * - * @param path Path for hoodie table - * @param hoodieFooTableName Name of hoodie table - * @param tableType Type of table - * @return Map of common params - */ - def getCommonParams(path: java.nio.file.Path, hoodieFooTableName: String, tableType: String): Map[String, String] = { - Map("path" -> path.toAbsolutePath.toString, - HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, - "hoodie.insert.shuffle.parallelism" -> "1", - "hoodie.upsert.shuffle.parallelism" -> "1", - DataSourceWriteOptions.TABLE_TYPE.key -> tableType, - DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", - DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.SimpleKeyGenerator") - } - - /** - * Utility method for converting list of Row to list of Seq. - * - * @param inputList list of Row - * @return list of Seq - */ - def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] = - JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq +class TestTableSchemaResolverWithSparkSQL extends HoodieSparkWriterTestBase { @Test def testTableSchemaResolverInMetadataTable(): Unit = {
