This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 545a26222da [HUDI-6147] Deltastreamer finish failed compaction before 
ingestion (#8589)
545a26222da is described below

commit 545a26222da67fa271266f883912f710c63d3178
Author: Jon Vexler <[email protected]>
AuthorDate: Mon May 8 13:08:10 2023 -0400

    [HUDI-6147] Deltastreamer finish failed compaction before ingestion (#8589)
    
    Add support to retry compaction in deltastreamer even before ingesting the 
batch using --retry-last-pending-inline-compaction config.
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../table/action/compact/TestInlineCompaction.java |  2 +-
 .../hudi/utilities/deltastreamer/DeltaSync.java    | 22 ++++++++++-
 .../deltastreamer/HoodieDeltaStreamer.java         |  3 ++
 .../deltastreamer/HoodieDeltaStreamerTestBase.java | 12 +++++-
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 44 ++++++++++++++++++++++
 5 files changed, 78 insertions(+), 5 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
index 70115c3bfa3..9e7d1b2f666 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
@@ -135,7 +135,7 @@ public class TestInlineCompaction extends 
CompactionTestBase {
       requestInstant = HoodieActiveTimeline.createNewInstantTime();
       try {
         scheduleCompaction(requestInstant, writeClient, cfg);
-        Assertions.fail();
+        Object fail = Assertions.fail();
       } catch (AssertionError error) {
         //should be here
       }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index daae0a385c6..64686e3f38f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -76,6 +76,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.metrics.HoodieMetrics;
 import org.apache.hudi.sync.common.util.SyncUtilHelpers;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.util.SparkKeyGenUtils;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback;
@@ -420,8 +421,17 @@ public class DeltaSync implements Serializable, Closeable {
         }
       }
 
-      // complete the pending clustering before writing to sink
-      if (cfg.retryLastPendingInlineClusteringJob && 
getHoodieClientConfig(this.schemaProvider).inlineClusteringEnabled()) {
+      // complete the pending compaction before writing to sink
+      if (cfg.retryLastPendingInlineCompactionJob && 
getHoodieClientConfig(this.schemaProvider).inlineCompactionEnabled()) {
+        Option<String> pendingCompactionInstant = 
getLastPendingCompactionInstant(allCommitsTimelineOpt);
+        if (pendingCompactionInstant.isPresent()) {
+          HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = 
writeClient.compact(pendingCompactionInstant.get());
+          writeClient.commitCompaction(pendingCompactionInstant.get(), 
writeMetadata.getCommitMetadata().get(), Option.empty());
+          refreshTimeline();
+          reInitWriteClient(schemaProvider.getSourceSchema(), 
schemaProvider.getTargetSchema(), null);
+        }
+      } else if (cfg.retryLastPendingInlineClusteringJob && 
getHoodieClientConfig(this.schemaProvider).inlineClusteringEnabled()) {
+        // complete the pending clustering before writing to sink
         Option<String> pendingClusteringInstant = 
getLastPendingClusteringInstant(allCommitsTimelineOpt);
         if (pendingClusteringInstant.isPresent()) {
           writeClient.cluster(pendingClusteringInstant.get());
@@ -444,6 +454,14 @@ public class DeltaSync implements Serializable, Closeable {
     return Option.empty();
   }
 
+  private Option<String> 
getLastPendingCompactionInstant(Option<HoodieTimeline> commitTimelineOpt) {
+    if (commitTimelineOpt.isPresent()) {
+      Option<HoodieInstant> pendingCompactionInstant = 
commitTimelineOpt.get().filterPendingCompactionTimeline().lastInstant();
+      return pendingCompactionInstant.isPresent() ? 
Option.of(pendingCompactionInstant.get().getTimestamp()) : Option.empty();
+    }
+    return Option.empty();
+  }
+
   /**
    * Read from Upstream Source and apply transformation if needed.
    *
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index d99298b92a6..73e419ae371 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -405,6 +405,9 @@ public class HoodieDeltaStreamer implements Serializable {
     @Parameter(names = {"--retry-last-pending-inline-clustering", "-rc"}, 
description = "Retry last pending inline clustering plan before writing to 
sink.")
     public Boolean retryLastPendingInlineClusteringJob = false;
 
+    @Parameter(names = {"--retry-last-pending-inline-compaction"}, description 
= "Retry last pending inline compaction plan before writing to sink.")
+    public Boolean retryLastPendingInlineCompactionJob = false;
+
     @Parameter(names = {"--cluster-scheduling-weight"}, description = 
"Scheduling weight for clustering as defined in "
         + "https://spark.apache.org/docs/latest/job-scheduling.html";)
     public Integer clusterSchedulingWeight = 1;
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 81d015f72f3..981a48a1564 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -319,11 +319,19 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
         partitionPath, "");
   }
 
+  protected void prepareParquetDFSSource(boolean useSchemaProvider, boolean 
hasTransformer, String sourceSchemaFile, String targetSchemaFile,
+                                         String propsFileName, String 
parquetSourceRoot, boolean addCommonProps,
+                                         String partitionPath, String 
emptyBatchParam) throws IOException {
+    prepareParquetDFSSource(useSchemaProvider, hasTransformer, 
sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, 
addCommonProps,
+        partitionPath, emptyBatchParam, null);
+
+  }
+
   protected void prepareParquetDFSSource(boolean useSchemaProvider, boolean 
hasTransformer, String sourceSchemaFile, String targetSchemaFile,
                                        String propsFileName, String 
parquetSourceRoot, boolean addCommonProps,
-                                       String partitionPath, String 
emptyBatchParam) throws IOException {
+                                       String partitionPath, String 
emptyBatchParam, TypedProperties extraProps) throws IOException {
     // Properties used for testing delta-streamer with Parquet source
-    TypedProperties parquetProps = new TypedProperties();
+    TypedProperties parquetProps = new TypedProperties(extraProps);
 
     if (addCommonProps) {
       populateCommonProps(parquetProps, basePath);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index cebafdab59a..d749b15d35d 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -1028,6 +1028,50 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
   }
 
+  @Test
+  public void testDeltaSyncWithPendingCompaction() throws Exception {
+    PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + testNum;
+    int parquetRecordsCount = 100;
+    HoodieTestDataGenerator dataGenerator = 
prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, 
FIRST_PARQUET_FILE_NAME, false, null, null);
+    TypedProperties extraProps = new TypedProperties();
+    extraProps.setProperty("hoodie.compact.inline", "true");
+    extraProps.setProperty("hoodie.compact.inline.max.delta.commits", "2");
+    extraProps.setProperty("hoodie.datasource.write.table.type", 
"MERGE_ON_READ");
+    extraProps.setProperty("hoodie.datasource.compaction.async.enable", 
"false");
+    prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", 
PROPS_FILENAME_TEST_PARQUET,
+        PARQUET_SOURCE_ROOT, false, "partition_path", "", extraProps);
+    String tableBasePath = basePath + "test_parquet_table" + testNum;
+    HoodieDeltaStreamer.Config deltaCfg =  
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
ParquetDFSSource.class.getName(),
+        null, PROPS_FILENAME_TEST_PARQUET, false,
+        false, 100000, false, null, "MERGE_ON_READ", "timestamp", null);
+    deltaCfg.retryLastPendingInlineCompactionJob = false;
+
+    // sync twice and trigger compaction
+    HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(deltaCfg, jsc);
+    deltaStreamer.sync();
+    TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, 
sqlContext);
+    prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "2.parquet", false, 
null, null, dataGenerator, "001");
+    deltaStreamer.sync();
+    TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs);
+    TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs);
+
+    // delete compaction commit
+    HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
+    HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
+    HoodieInstant commitInstant = timeline.lastInstant().get();
+    String commitFileName = tableBasePath + "/.hoodie/" + 
commitInstant.getFileName();
+    fs.delete(new Path(commitFileName), false);
+
+    // sync again
+    prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "3.parquet", false, 
null, null, dataGenerator, "002");
+    deltaStreamer = new HoodieDeltaStreamer(deltaCfg, jsc);
+    deltaStreamer.sync();
+    TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs);
+    meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
+    timeline = meta.getActiveTimeline().getRollbackTimeline();
+    assertEquals(1, timeline.getInstants().size());
+  }
+
   @ParameterizedTest
   @CsvSource(value = {"true, AVRO", "true, SPARK", "false, AVRO", "false, 
SPARK"})
   public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean, 
HoodieRecordType recordType) throws Exception {

Reply via email to