This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 686da41 [HUDI-3689] Fix UT failures in TestHoodieDeltaStreamer (#5120) 686da41 is described below commit 686da41696e17daa182f84796cd2721b945d71b1 Author: Raymond Xu <2701446+xushi...@users.noreply.github.com> AuthorDate: Thu Mar 24 09:10:33 2022 -0700 [HUDI-3689] Fix UT failures in TestHoodieDeltaStreamer (#5120) --- .../testutils/TestHiveSyncGlobalCommitTool.java | 133 --------------------- .../org/apache/hudi/utilities/TestUtilHelpers.java | 97 --------------- .../functional/TestHoodieDeltaStreamer.java | 30 ++--- .../TestHoodieMultiTableDeltaStreamer.java | 8 +- pom.xml | 4 +- 5 files changed, 21 insertions(+), 251 deletions(-) diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestHiveSyncGlobalCommitTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestHiveSyncGlobalCommitTool.java deleted file mode 100644 index 980374e..0000000 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestHiveSyncGlobalCommitTool.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.hive.testutils; - -import org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig; -import org.apache.hudi.hive.replication.HiveSyncGlobalCommitTool; - -import org.apache.hadoop.fs.Path; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.Collections; - -import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP; -import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_BASE_PATH; -import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SERVER_JDBC_URLS; -import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SITE_URI; -import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_BASE_PATH; -import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SERVER_JDBC_URLS; -import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SITE_URI; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class TestHiveSyncGlobalCommitTool { - - TestCluster localCluster; - TestCluster remoteCluster; - - private static String DB_NAME = "foo"; - private static String TBL_NAME = "bar"; - - private HiveSyncGlobalCommitConfig getGlobalCommitConfig( - String commitTime, String dbName, String tblName) throws Exception { - HiveSyncGlobalCommitConfig config = new HiveSyncGlobalCommitConfig(); - config.properties.setProperty(LOCAL_HIVE_SITE_URI, localCluster.getHiveSiteXmlLocation()); - config.properties.setProperty(REMOTE_HIVE_SITE_URI, remoteCluster.getHiveSiteXmlLocation()); - config.properties.setProperty(LOCAL_HIVE_SERVER_JDBC_URLS, localCluster.getHiveJdBcUrl()); - config.properties.setProperty(REMOTE_HIVE_SERVER_JDBC_URLS, remoteCluster.getHiveJdBcUrl()); - config.properties.setProperty(LOCAL_BASE_PATH, localCluster.tablePath(dbName, tblName)); - config.properties.setProperty(REMOTE_BASE_PATH, remoteCluster.tablePath(dbName, tblName)); - config.globallyReplicatedTimeStamp = commitTime; - config.hiveUser = System.getProperty("user.name"); - config.hivePass = ""; - config.databaseName = dbName; - config.tableName = tblName; - config.basePath = localCluster.tablePath(dbName, tblName); - config.assumeDatePartitioning = true; - config.usePreApacheInputFormat = false; - config.partitionFields = Collections.singletonList("datestr"); - return config; - } - - private void compareEqualLastReplicatedTimeStamp(HiveSyncGlobalCommitConfig config) throws Exception { - assertEquals(localCluster.getHMSClient().getTable(config.databaseName, config.tableName).getParameters().get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), - remoteCluster.getHMSClient().getTable(config.databaseName, config.tableName).getParameters().get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), - "compare replicated timestamps"); - } - - @BeforeEach - public void setUp() throws Exception { - localCluster = new TestCluster(); - localCluster.setup(); - remoteCluster = new TestCluster(); - remoteCluster.setup(); - localCluster.forceCreateDb(DB_NAME); - remoteCluster.forceCreateDb(DB_NAME); - localCluster.dfsCluster.getFileSystem().delete(new Path(localCluster.tablePath(DB_NAME, TBL_NAME)), true); - remoteCluster.dfsCluster.getFileSystem().delete(new Path(remoteCluster.tablePath(DB_NAME, TBL_NAME)), true); - } - - @AfterEach - public void clear() throws Exception { - localCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME); - remoteCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME); - localCluster.shutDown(); - remoteCluster.shutDown(); - } - - @Test - public void testBasicGlobalCommit() throws Exception { - String commitTime = "100"; - localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME); - // simulate drs - remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME); - HiveSyncGlobalCommitConfig config = getGlobalCommitConfig(commitTime, DB_NAME, TBL_NAME); - HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(config); - assertTrue(tool.commit()); - compareEqualLastReplicatedTimeStamp(config); - } - - @Test - public void testBasicRollback() throws Exception { - String commitTime = "100"; - localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME); - // simulate drs - remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME); - HiveSyncGlobalCommitConfig config = getGlobalCommitConfig(commitTime, DB_NAME, TBL_NAME); - HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(config); - assertFalse(localCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME)); - assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME)); - // stop the remote cluster hive server to simulate cluster going down - remoteCluster.stopHiveServer2(); - assertFalse(tool.commit()); - assertEquals(commitTime, localCluster.getHMSClient() - .getTable(config.databaseName, config.tableName).getParameters() - .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP)); - assertTrue(tool.rollback()); // do a rollback - assertNotEquals(commitTime, localCluster.getHMSClient() - .getTable(config.databaseName, config.tableName).getParameters() - .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP)); - assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME)); - remoteCluster.startHiveServer2(); - } -} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java deleted file mode 100644 index 45ffa1f..0000000 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hudi.utilities; - -import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.utilities.transform.ChainedTransformer; -import org.apache.hudi.utilities.transform.Transformer; - -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class TestUtilHelpers { - - public static class TransformerFoo implements Transformer { - - @Override - public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) { - return null; - } - } - - public static class TransformerBar implements Transformer { - - @Override - public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) { - return null; - } - } - - @Nested - public class TestCreateTransformer { - - @Test - public void testCreateTransformerNotPresent() throws IOException { - assertFalse(UtilHelpers.createTransformer(null).isPresent()); - } - - @Test - public void testCreateTransformerLoadOneClass() throws IOException { - Transformer transformer = UtilHelpers.createTransformer(Collections.singletonList(TransformerFoo.class.getName())).get(); - assertTrue(transformer instanceof ChainedTransformer); - List<String> transformerNames = ((ChainedTransformer) transformer).getTransformersNames(); - assertEquals(1, transformerNames.size()); - assertEquals(TransformerFoo.class.getName(), transformerNames.get(0)); - } - - @Test - public void testCreateTransformerLoadMultipleClasses() throws IOException { - List<String> classNames = Arrays.asList(TransformerFoo.class.getName(), TransformerBar.class.getName()); - Transformer transformer = UtilHelpers.createTransformer(classNames).get(); - assertTrue(transformer instanceof ChainedTransformer); - List<String> transformerNames = ((ChainedTransformer) transformer).getTransformersNames(); - assertEquals(2, transformerNames.size()); - assertEquals(TransformerFoo.class.getName(), transformerNames.get(0)); - assertEquals(TransformerBar.class.getName(), transformerNames.get(1)); - } - - @Test - public void testCreateTransformerThrowsException() throws IOException { - Exception e = assertThrows(IOException.class, () -> { - UtilHelpers.createTransformer(Arrays.asList("foo", "bar")); - }); - assertEquals("Could not load transformer class(es) [foo, bar]", e.getMessage()); - } - } -} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 202d0b7..2a57716 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -1378,7 +1378,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { assertEquals(1000, c); } - private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName) throws IOException { + private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName) { if (createTopic) { try { testUtils.createTopic(topicName, 2); @@ -1491,7 +1491,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { props.setProperty("include", "base.properties"); props.setProperty("hoodie.embed.timeline.server", "false"); props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); - props.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path"); + props.setProperty("hoodie.datasource.write.partitionpath.field", ""); props.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT); props.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName); props.setProperty("hoodie.deltastreamer.source.kafka.checkpoint.type", kafkaCheckpointType); @@ -1515,15 +1515,15 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA); prepareParquetDFSSource(true, false, "source_uber.avsc", "target_uber.avsc", PROPS_FILENAME_TEST_PARQUET, - PARQUET_SOURCE_ROOT, false); + PARQUET_SOURCE_ROOT, false, ""); // delta streamer w/ parquet source String tableBasePath = dfsBasePath + "/test_dfs_to_kafka" + testNum; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), - Collections.EMPTY_LIST, PROPS_FILENAME_TEST_PARQUET, false, + Collections.emptyList(), PROPS_FILENAME_TEST_PARQUET, false, false, 100000, false, null, null, "timestamp", null), jsc); deltaStreamer.sync(); - TestHelpers.assertRecordCount(parquetRecords, tableBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertRecordCount(parquetRecords, tableBasePath, sqlContext); deltaStreamer.shutdownGracefully(); // prep json kafka source @@ -1533,18 +1533,18 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // delta streamer w/ json kafka source deltaStreamer = new HoodieDeltaStreamer( TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), - Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false, + Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false, true, 100000, false, null, null, "timestamp", null), jsc); deltaStreamer.sync(); // if auto reset value is set to LATEST, this all kafka records so far may not be synced. int totalExpectedRecords = parquetRecords + ((autoResetToLatest) ? 0 : JSON_KAFKA_NUM_RECORDS); - TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext); // verify 2nd batch to test LATEST auto reset value. prepareJsonKafkaDFSFiles(20, false, topicName); totalExpectedRecords += 20; deltaStreamer.sync(); - TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext); testNum++; } @@ -1556,17 +1556,17 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), - Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false, + Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false, true, 100000, false, null, null, "timestamp", null), jsc); deltaStreamer.sync(); - TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath, sqlContext); int totalRecords = JSON_KAFKA_NUM_RECORDS; int records = 10; totalRecords += records; prepareJsonKafkaDFSFiles(records, false, topicName); deltaStreamer.sync(); - TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertRecordCount(totalRecords, tableBasePath, sqlContext); } @Test @@ -1578,20 +1578,20 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), - Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false, + Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false, true, 100000, false, null, null, "timestamp", String.valueOf(System.currentTimeMillis())), jsc); deltaStreamer.sync(); - TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath, sqlContext); prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, false, topicName); deltaStreamer = new HoodieDeltaStreamer( TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), - Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false, + Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false, true, 100000, false, null, null, "timestamp", String.valueOf(System.currentTimeMillis())), jsc); deltaStreamer.sync(); - TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath, sqlContext); } @Test diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java index 416f2c5..f80d38a 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java @@ -161,8 +161,8 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa String targetBasePath2 = executionContexts.get(1).getConfig().targetBasePath; streamer.sync(); - TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1 + "/*/*.parquet", sqlContext); - TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2 + "/*/*.parquet", sqlContext); + TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1, sqlContext); + TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2, sqlContext); //insert updates for already existing records in kafka topics testUtils.sendMessages(topicName1, Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5, HoodieTestDataGenerator.TRIP_SCHEMA))); @@ -177,8 +177,8 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa assertTrue(streamer.getFailedTables().isEmpty()); //assert the record count matches now - TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1 + "/*/*.parquet", sqlContext); - TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2 + "/*/*.parquet", sqlContext); + TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1, sqlContext); + TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2, sqlContext); testNum++; } diff --git a/pom.xml b/pom.xml index c61d5ef..ebd2a76 100644 --- a/pom.xml +++ b/pom.xml @@ -73,8 +73,8 @@ <properties> <maven-jar-plugin.version>3.2.0</maven-jar-plugin.version> - <maven-surefire-plugin.version>3.0.0-M4</maven-surefire-plugin.version> - <maven-failsafe-plugin.version>3.0.0-M4</maven-failsafe-plugin.version> + <maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version> + <maven-failsafe-plugin.version>3.0.0-M5</maven-failsafe-plugin.version> <maven-shade-plugin.version>3.2.4</maven-shade-plugin.version> <maven-javadoc-plugin.version>3.1.1</maven-javadoc-plugin.version> <maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>