This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 545a26222da [HUDI-6147] Deltastreamer finish failed compaction before
ingestion (#8589)
545a26222da is described below
commit 545a26222da67fa271266f883912f710c63d3178
Author: Jon Vexler <[email protected]>
AuthorDate: Mon May 8 13:08:10 2023 -0400
[HUDI-6147] Deltastreamer finish failed compaction before ingestion (#8589)
Add support to retry compaction in deltastreamer even before ingesting the
batch using --retry-last-pending-inline-compaction config.
Co-authored-by: Jonathan Vexler <=>
---
.../table/action/compact/TestInlineCompaction.java | 2 +-
.../hudi/utilities/deltastreamer/DeltaSync.java | 22 ++++++++++-
.../deltastreamer/HoodieDeltaStreamer.java | 3 ++
.../deltastreamer/HoodieDeltaStreamerTestBase.java | 12 +++++-
.../deltastreamer/TestHoodieDeltaStreamer.java | 44 ++++++++++++++++++++++
5 files changed, 78 insertions(+), 5 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
index 70115c3bfa3..9e7d1b2f666 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
@@ -135,7 +135,7 @@ public class TestInlineCompaction extends
CompactionTestBase {
requestInstant = HoodieActiveTimeline.createNewInstantTime();
try {
scheduleCompaction(requestInstant, writeClient, cfg);
- Assertions.fail();
+ Object fail = Assertions.fail();
} catch (AssertionError error) {
//should be here
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index daae0a385c6..64686e3f38f 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -76,6 +76,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.sync.common.util.SyncUtilHelpers;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.util.SparkKeyGenUtils;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback;
@@ -420,8 +421,17 @@ public class DeltaSync implements Serializable, Closeable {
}
}
- // complete the pending clustering before writing to sink
- if (cfg.retryLastPendingInlineClusteringJob &&
getHoodieClientConfig(this.schemaProvider).inlineClusteringEnabled()) {
+ // complete the pending compaction before writing to sink
+ if (cfg.retryLastPendingInlineCompactionJob &&
getHoodieClientConfig(this.schemaProvider).inlineCompactionEnabled()) {
+ Option<String> pendingCompactionInstant =
getLastPendingCompactionInstant(allCommitsTimelineOpt);
+ if (pendingCompactionInstant.isPresent()) {
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata =
writeClient.compact(pendingCompactionInstant.get());
+ writeClient.commitCompaction(pendingCompactionInstant.get(),
writeMetadata.getCommitMetadata().get(), Option.empty());
+ refreshTimeline();
+ reInitWriteClient(schemaProvider.getSourceSchema(),
schemaProvider.getTargetSchema(), null);
+ }
+ } else if (cfg.retryLastPendingInlineClusteringJob &&
getHoodieClientConfig(this.schemaProvider).inlineClusteringEnabled()) {
+ // complete the pending clustering before writing to sink
Option<String> pendingClusteringInstant =
getLastPendingClusteringInstant(allCommitsTimelineOpt);
if (pendingClusteringInstant.isPresent()) {
writeClient.cluster(pendingClusteringInstant.get());
@@ -444,6 +454,14 @@ public class DeltaSync implements Serializable, Closeable {
return Option.empty();
}
+ private Option<String>
getLastPendingCompactionInstant(Option<HoodieTimeline> commitTimelineOpt) {
+ if (commitTimelineOpt.isPresent()) {
+ Option<HoodieInstant> pendingCompactionInstant =
commitTimelineOpt.get().filterPendingCompactionTimeline().lastInstant();
+ return pendingCompactionInstant.isPresent() ?
Option.of(pendingCompactionInstant.get().getTimestamp()) : Option.empty();
+ }
+ return Option.empty();
+ }
+
/**
* Read from Upstream Source and apply transformation if needed.
*
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index d99298b92a6..73e419ae371 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -405,6 +405,9 @@ public class HoodieDeltaStreamer implements Serializable {
@Parameter(names = {"--retry-last-pending-inline-clustering", "-rc"},
description = "Retry last pending inline clustering plan before writing to
sink.")
public Boolean retryLastPendingInlineClusteringJob = false;
+ @Parameter(names = {"--retry-last-pending-inline-compaction"}, description
= "Retry last pending inline compaction plan before writing to sink.")
+ public Boolean retryLastPendingInlineCompactionJob = false;
+
@Parameter(names = {"--cluster-scheduling-weight"}, description =
"Scheduling weight for clustering as defined in "
+ "https://spark.apache.org/docs/latest/job-scheduling.html")
public Integer clusterSchedulingWeight = 1;
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 81d015f72f3..981a48a1564 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -319,11 +319,19 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
partitionPath, "");
}
+ protected void prepareParquetDFSSource(boolean useSchemaProvider, boolean
hasTransformer, String sourceSchemaFile, String targetSchemaFile,
+ String propsFileName, String
parquetSourceRoot, boolean addCommonProps,
+ String partitionPath, String
emptyBatchParam) throws IOException {
+ prepareParquetDFSSource(useSchemaProvider, hasTransformer,
sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot,
addCommonProps,
+ partitionPath, emptyBatchParam, null);
+
+ }
+
protected void prepareParquetDFSSource(boolean useSchemaProvider, boolean
hasTransformer, String sourceSchemaFile, String targetSchemaFile,
String propsFileName, String
parquetSourceRoot, boolean addCommonProps,
- String partitionPath, String
emptyBatchParam) throws IOException {
+ String partitionPath, String
emptyBatchParam, TypedProperties extraProps) throws IOException {
// Properties used for testing delta-streamer with Parquet source
- TypedProperties parquetProps = new TypedProperties();
+ TypedProperties parquetProps = new TypedProperties(extraProps);
if (addCommonProps) {
populateCommonProps(parquetProps, basePath);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index cebafdab59a..d749b15d35d 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -1028,6 +1028,50 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
}
+ @Test
+ public void testDeltaSyncWithPendingCompaction() throws Exception {
+ PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + testNum;
+ int parquetRecordsCount = 100;
+ HoodieTestDataGenerator dataGenerator =
prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT,
FIRST_PARQUET_FILE_NAME, false, null, null);
+ TypedProperties extraProps = new TypedProperties();
+ extraProps.setProperty("hoodie.compact.inline", "true");
+ extraProps.setProperty("hoodie.compact.inline.max.delta.commits", "2");
+ extraProps.setProperty("hoodie.datasource.write.table.type",
"MERGE_ON_READ");
+ extraProps.setProperty("hoodie.datasource.compaction.async.enable",
"false");
+ prepareParquetDFSSource(false, false, "source.avsc", "target.avsc",
PROPS_FILENAME_TEST_PARQUET,
+ PARQUET_SOURCE_ROOT, false, "partition_path", "", extraProps);
+ String tableBasePath = basePath + "test_parquet_table" + testNum;
+ HoodieDeltaStreamer.Config deltaCfg =
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
ParquetDFSSource.class.getName(),
+ null, PROPS_FILENAME_TEST_PARQUET, false,
+ false, 100000, false, null, "MERGE_ON_READ", "timestamp", null);
+ deltaCfg.retryLastPendingInlineCompactionJob = false;
+
+ // sync twice and trigger compaction
+ HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(deltaCfg, jsc);
+ deltaStreamer.sync();
+ TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath,
sqlContext);
+ prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "2.parquet", false,
null, null, dataGenerator, "001");
+ deltaStreamer.sync();
+ TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs);
+ TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs);
+
+ // delete compaction commit
+ HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
+ HoodieTimeline timeline =
meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
+ HoodieInstant commitInstant = timeline.lastInstant().get();
+ String commitFileName = tableBasePath + "/.hoodie/" +
commitInstant.getFileName();
+ fs.delete(new Path(commitFileName), false);
+
+ // sync again
+ prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "3.parquet", false,
null, null, dataGenerator, "002");
+ deltaStreamer = new HoodieDeltaStreamer(deltaCfg, jsc);
+ deltaStreamer.sync();
+ TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs);
+ meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
+ timeline = meta.getActiveTimeline().getRollbackTimeline();
+ assertEquals(1, timeline.getInstants().size());
+ }
+
@ParameterizedTest
@CsvSource(value = {"true, AVRO", "true, SPARK", "false, AVRO", "false,
SPARK"})
public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean,
HoodieRecordType recordType) throws Exception {