This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 14323cb [HUDI-344] Improve exporter tests (#1404)
14323cb is described below
commit 14323cb10012bdbf80cbb838928af9301cb42ba0
Author: Raymond Xu <[email protected]>
AuthorDate: Sun Mar 15 05:24:30 2020 -0700
[HUDI-344] Improve exporter tests (#1404)
---
.../hudi/utilities/HoodieSnapshotExporter.java | 9 +
.../apache/hudi/utilities/DataSourceTestUtils.java | 50 ----
.../hudi/utilities/TestHoodieSnapshotExporter.java | 318 +++++++++------------
3 files changed, 151 insertions(+), 226 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
index f785d74..b58b5d3 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
@@ -132,6 +132,7 @@ public class HoodieSnapshotExporter {
// No transformation is needed for output format "HUDI", just copy the
original files.
copySnapshot(jsc, fs, cfg, partitions, dataFiles,
latestCommitTimestamp, serConf);
}
+ createSuccessTag(fs, cfg.targetOutputPath);
} else {
LOG.info("The job has 0 partition to copy.");
}
@@ -205,6 +206,14 @@ public class HoodieSnapshotExporter {
}
}
+ private void createSuccessTag(FileSystem fs, String targetOutputPath) throws
IOException {
+ Path successTagPath = new Path(targetOutputPath + "/_SUCCESS");
+ if (!fs.exists(successTagPath)) {
+ LOG.info(String.format("Creating _SUCCESS under target output path: %s",
targetOutputPath));
+ fs.createNewFile(successTagPath);
+ }
+ }
+
public static void main(String[] args) throws IOException {
// Take input configs
final Config cfg = new Config();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/DataSourceTestUtils.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/DataSourceTestUtils.java
deleted file mode 100644
index 1a96b81..0000000
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/DataSourceTestUtils.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utilities;
-
-import org.apache.hudi.common.TestRawTripPayload;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.Option;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * Test utils for data source tests.
- */
-public class DataSourceTestUtils {
-
- public static Option<String> convertToString(HoodieRecord record) {
- try {
- String str = ((TestRawTripPayload) record.getData()).getJsonData();
- str = "{" + str.substring(str.indexOf("\"timestamp\":"));
- // Remove the last } bracket
- str = str.substring(0, str.length() - 1);
- return Option.of(str + ", \"partition\": \"" + record.getPartitionPath()
+ "\"}");
- } catch (IOException e) {
- return Option.empty();
- }
- }
-
- public static List<String> convertToStringList(List<HoodieRecord> records) {
- return
records.stream().map(DataSourceTestUtils::convertToString).filter(Option::isPresent).map(Option::get)
- .collect(Collectors.toList());
- }
-}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
index 920f1ed..f624247 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
@@ -18,205 +18,171 @@
package org.apache.hudi.utilities;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.common.HoodieCommonTestHarness;
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.common.HoodieClientTestHarness;
import org.apache.hudi.common.HoodieTestDataGenerator;
-import org.apache.hudi.common.model.HoodieTestUtils;
-import org.apache.hudi.common.util.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.SparkSession;
+import org.apache.hudi.index.HoodieIndex.IndexType;
+import org.apache.hudi.utilities.HoodieSnapshotExporter.Config;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.SparkSession;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
-import java.io.File;
import java.io.IOException;
-import java.util.HashMap;
+import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-public class TestHoodieSnapshotExporter extends HoodieCommonTestHarness {
- private static String TEST_WRITE_TOKEN = "1-0-1";
-
- private SparkSession spark = null;
- private HoodieTestDataGenerator dataGen = null;
- private String outputPath = null;
- private String rootPath = null;
- private FileSystem fs = null;
- private Map commonOpts;
- private HoodieSnapshotExporter.Config cfg;
- private JavaSparkContext jsc = null;
-
- @Before
- public void initialize() throws IOException {
- spark = SparkSession.builder()
- .appName("Hoodie Datasource test")
- .master("local[2]")
- .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
- .getOrCreate();
- jsc = new JavaSparkContext(spark.sparkContext());
- dataGen = new HoodieTestDataGenerator();
- folder.create();
- basePath = folder.getRoot().getAbsolutePath();
- fs = FSUtils.getFs(basePath, spark.sparkContext().hadoopConfiguration());
- commonOpts = new HashMap();
-
- commonOpts.put("hoodie.insert.shuffle.parallelism", "4");
- commonOpts.put("hoodie.upsert.shuffle.parallelism", "4");
- commonOpts.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),
"_row_key");
- commonOpts.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),
"partition");
- commonOpts.put(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(),
"timestamp");
- commonOpts.put(HoodieWriteConfig.TABLE_NAME, "hoodie_test");
-
-
- cfg = new HoodieSnapshotExporter.Config();
-
- cfg.sourceBasePath = basePath;
- cfg.targetOutputPath = outputPath = basePath + "/target";
- cfg.outputFormat = "json";
- cfg.outputPartitionField = "partition";
+@RunWith(Enclosed.class)
+public class TestHoodieSnapshotExporter {
+
+ static class ExporterTestHarness extends HoodieClientTestHarness {
+
+ static final Logger LOG = LogManager.getLogger(ExporterTestHarness.class);
+ static final int NUM_RECORDS = 100;
+ static final String COMMIT_TIME = "20200101000000";
+ static final String PARTITION_PATH = "2020/01/01";
+ static final String TABLE_NAME = "testing";
+ String sourcePath;
+ String targetPath;
+
+ @Before
+ public void setUp() throws Exception {
+ initSparkContexts();
+ initDFS();
+ dataGen = new HoodieTestDataGenerator(new String[] {PARTITION_PATH});
+
+ // Initialize test data dirs
+ sourcePath = dfsBasePath + "/source/";
+ targetPath = dfsBasePath + "/target/";
+ dfs.mkdirs(new Path(sourcePath));
+ dfs.mkdirs(new Path(targetPath));
+ HoodieTableMetaClient
+ .initTableType(jsc.hadoopConfiguration(), sourcePath,
HoodieTableType.COPY_ON_WRITE, TABLE_NAME,
+ HoodieAvroPayload.class.getName());
+
+ // Prepare data as source Hudi dataset
+ HoodieWriteConfig cfg = getHoodieWriteConfig(sourcePath);
+ HoodieWriteClient hdfsWriteClient = new HoodieWriteClient(jsc, cfg);
+ hdfsWriteClient.startCommitWithTime(COMMIT_TIME);
+ List<HoodieRecord> records = dataGen.generateInserts(COMMIT_TIME,
NUM_RECORDS);
+ JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
+ hdfsWriteClient.bulkInsert(recordsRDD, COMMIT_TIME);
+ hdfsWriteClient.close();
+
+ RemoteIterator<LocatedFileStatus> itr = dfs.listFiles(new
Path(sourcePath), true);
+ while (itr.hasNext()) {
+ LOG.info(">>> Prepared test file: " + itr.next().getPath());
+ }
+ }
- }
+ @After
+ public void tearDown() throws Exception {
+ cleanupSparkContexts();
+ cleanupDFS();
+ cleanupTestDataGenerator();
+ }
- @After
- public void cleanup() {
- if (spark != null) {
- spark.stop();
+ private HoodieWriteConfig getHoodieWriteConfig(String basePath) {
+ return HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withEmbeddedTimelineServerEnabled(false)
+ .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+ .withParallelism(2, 2)
+ .withBulkInsertParallelism(2)
+ .forTable(TABLE_NAME)
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build())
+ .build();
}
}
- @Test
- public void testSnapshotExporter() throws IOException {
- // Insert Operation
- List<String> records =
DataSourceTestUtils.convertToStringList(dataGen.generateInserts("000", 100));
- Dataset<Row> inputDF = spark.read().json(new
JavaSparkContext(spark.sparkContext()).parallelize(records, 2));
- inputDF.write().format("hudi")
- .options(commonOpts)
- .option(DataSourceWriteOptions.OPERATION_OPT_KEY(),
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())
- .mode(SaveMode.Overwrite)
- .save(basePath);
- long sourceCount = inputDF.count();
-
- HoodieSnapshotExporter hoodieSnapshotExporter = new
HoodieSnapshotExporter();
- hoodieSnapshotExporter.export(spark, cfg);
-
- long targetCount = spark.read().json(outputPath).count();
-
- assertTrue(sourceCount == targetCount);
-
- // Test Invalid OutputFormat
- cfg.outputFormat = "foo";
- int isError = hoodieSnapshotExporter.export(spark, cfg);
- assertTrue(isError == -1);
- }
+ public static class TestHoodieSnapshotExporterForHudi extends
ExporterTestHarness {
+
+ @Test
+ public void testExportAsHudi() throws IOException {
+ HoodieSnapshotExporter.Config cfg = new Config();
+ cfg.sourceBasePath = sourcePath;
+ cfg.targetOutputPath = targetPath;
+ cfg.outputFormat = "hudi";
+ new
HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(),
cfg);
+
+ // Check results
+ assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME +
".clean")));
+ assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME +
".clean.inflight")));
+ assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME +
".clean.requested")));
+ assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME +
".commit")));
+ assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME +
".commit.requested")));
+ assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME +
".inflight")));
+ assertTrue(dfs.exists(new Path(targetPath +
"/.hoodie/hoodie.properties")));
+ String partition = targetPath + "/" + PARTITION_PATH;
+ long numParquetFiles = Arrays.stream(dfs.listStatus(new Path(partition)))
+ .filter(fileStatus ->
fileStatus.getPath().toString().endsWith(".parquet"))
+ .count();
+ assertTrue("There should exist at least 1 parquet file.",
numParquetFiles >= 1);
+ assertEquals(NUM_RECORDS, sqlContext.read().parquet(partition).count());
+ assertTrue(dfs.exists(new Path(partition +
"/.hoodie_partition_metadata")));
+ assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
+ }
- // for testEmptySnapshotCopy
- public void init() throws IOException {
- TemporaryFolder folder = new TemporaryFolder();
- folder.create();
- rootPath = "file://" + folder.getRoot().getAbsolutePath();
- basePath = rootPath + "/" + HoodieTestUtils.RAW_TRIPS_TEST_NAME;
- outputPath = rootPath + "/output";
-
- final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
- fs = FSUtils.getFs(basePath, hadoopConf);
- HoodieTestUtils.init(hadoopConf, basePath);
+ @Test
+ public void testExportEmptyDataset() throws IOException {
+ // delete all source data
+ dfs.delete(new Path(sourcePath + "/" + PARTITION_PATH), true);
+
+ // export
+ HoodieSnapshotExporter.Config cfg = new Config();
+ cfg.sourceBasePath = sourcePath;
+ cfg.targetOutputPath = targetPath;
+ cfg.outputFormat = "hudi";
+ new
HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(),
cfg);
+
+ // Check results
+ assertEquals("Target path should be empty.", 0, dfs.listStatus(new
Path(targetPath)).length);
+ assertFalse(dfs.exists(new Path(targetPath + "/_SUCCESS")));
+ }
}
- @Test
- public void testEmptySnapshotCopy() throws IOException {
- init();
- // There is no real data (only .hoodie directory)
- assertEquals(fs.listStatus(new Path(basePath)).length, 1);
- assertFalse(fs.exists(new Path(outputPath)));
-
- // Do the snapshot
- HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
- copier.snapshot(jsc, basePath, outputPath, true);
+ @RunWith(Parameterized.class)
+ public static class TestHoodieSnapshotExporterForNonHudi extends
ExporterTestHarness {
- // Nothing changed; we just bail out
- assertEquals(fs.listStatus(new Path(basePath)).length, 1);
- assertFalse(fs.exists(new Path(outputPath + "/_SUCCESS")));
- }
+ @Parameters
+ public static Iterable<String[]> formats() {
+ return Arrays.asList(new String[][] {{"json"}, {"parquet"}});
+ }
- // TODO - uncomment this after fixing test failures
- // @Test
- public void testSnapshotCopy() throws Exception {
- // Generate some commits and corresponding parquets
- String commitTime1 = "20160501010101";
- String commitTime2 = "20160502020601";
- String commitTime3 = "20160506030611";
- new File(basePath + "/.hoodie").mkdirs();
- new File(basePath + "/.hoodie/hoodie.properties").createNewFile();
- // Only first two have commit files
- new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
- new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
- new File(basePath + "/.hoodie/" + commitTime3 +
".inflight").createNewFile();
-
- // Some parquet files
- new File(basePath + "/2016/05/01/").mkdirs();
- new File(basePath + "/2016/05/02/").mkdirs();
- new File(basePath + "/2016/05/06/").mkdirs();
- HoodieTestDataGenerator.writePartitionMetadata(fs, new
String[]{"2016/05/01", "2016/05/02", "2016/05/06"},
- basePath);
- // Make commit1
- File file11 = new File(basePath + "/2016/05/01/" +
FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id11"));
- file11.createNewFile();
- File file12 = new File(basePath + "/2016/05/02/" +
FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id12"));
- file12.createNewFile();
- File file13 = new File(basePath + "/2016/05/06/" +
FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id13"));
- file13.createNewFile();
-
- // Make commit2
- File file21 = new File(basePath + "/2016/05/01/" +
FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id21"));
- file21.createNewFile();
- File file22 = new File(basePath + "/2016/05/02/" +
FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id22"));
- file22.createNewFile();
- File file23 = new File(basePath + "/2016/05/06/" +
FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id23"));
- file23.createNewFile();
-
- // Make commit3
- File file31 = new File(basePath + "/2016/05/01/" +
FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id31"));
- file31.createNewFile();
- File file32 = new File(basePath + "/2016/05/02/" +
FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id32"));
- file32.createNewFile();
- File file33 = new File(basePath + "/2016/05/06/" +
FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id33"));
- file33.createNewFile();
-
- // Do a snapshot copy
- HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
- copier.snapshot(jsc, basePath, outputPath, false);
-
- // Check results
- assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" +
file11.getName())));
- assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" +
file12.getName())));
- assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" +
file13.getName())));
- assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" +
file21.getName())));
- assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" +
file22.getName())));
- assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" +
file23.getName())));
- assertFalse(fs.exists(new Path(outputPath + "/2016/05/01/" +
file31.getName())));
- assertFalse(fs.exists(new Path(outputPath + "/2016/05/02/" +
file32.getName())));
- assertFalse(fs.exists(new Path(outputPath + "/2016/05/06/" +
file33.getName())));
-
- assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime1 +
".commit")));
- assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime2 +
".commit")));
- assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 +
".commit")));
- assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 +
".inflight")));
- assertTrue(fs.exists(new Path(outputPath + "/.hoodie/hoodie.properties")));
-
- assertTrue(fs.exists(new Path(outputPath + "/_SUCCESS")));
+ @Parameter
+ public String format;
+
+ @Test
+ public void testExportAsNonHudi() throws IOException {
+ HoodieSnapshotExporter.Config cfg = new Config();
+ cfg.sourceBasePath = sourcePath;
+ cfg.targetOutputPath = targetPath;
+ cfg.outputFormat = format;
+ new
HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(),
cfg);
+ assertEquals(NUM_RECORDS,
sqlContext.read().format(format).load(targetPath).count());
+ assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
+ }
}
}