This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ad2a37d6796 [HUDI-5817] Fix async indexer metadata writer to avoid 
eager rollback and failed write cleaning (#8001)
ad2a37d6796 is described below

commit ad2a37d6796316129b3c9cc3f92c3b1445c86fda
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Feb 22 15:54:41 2023 -0800

    [HUDI-5817] Fix async indexer metadata writer to avoid eager rollback and 
failed write cleaning (#8001)
    
    - This PR fixes SparkHoodieBackedTableMetadataWriter so that the rollback 
of failed writes is not triggered by the async indexer.
---
 .../SparkHoodieBackedTableMetadataWriter.java      |  3 +-
 .../apache/hudi/utilities/TestHoodieIndexer.java   | 66 +++++++++++++++++++++-
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  1 -
 3 files changed, 66 insertions(+), 4 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 37222c8266a..e321c641e04 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -150,7 +150,8 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     engineContext.setJobStatus(this.getClass().getName(), "Committing " + 
instantTime + " to metadata table " + metadataWriteConfig.getTableName());
     try (SparkRDDWriteClient writeClient = new 
SparkRDDWriteClient(engineContext, metadataWriteConfig)) {
       // rollback partially failed writes if any.
-      if (writeClient.rollbackFailedWrites()) {
+      if (dataWriteConfig.getFailedWritesCleanPolicy().isEager()
+          && writeClient.rollbackFailedWrites()) {
         metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
       }
       if (canTriggerTableService) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
index 67504cb957d..bb4f6abe893 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
@@ -158,11 +158,11 @@ public class TestHoodieIndexer extends 
SparkClientFunctionalTestHarness implemen
   }
 
   @Test
-  public void testIndexerWithWriter() throws IOException {
+  public void testIndexerWithWriterFinishingFirst() throws IOException {
     // Test the case where the indexer is running, i.e., the delta commit in 
the metadata table
     // is inflight, while the regular writer is updating metadata table.
     // The delta commit from the indexer should not be rolled back.
-    String tableName = "indexer_with_writer";
+    String tableName = "indexer_with_writer_finishing_first";
     // Enable files and bloom_filters on the regular write client
     HoodieMetadataConfig.Builder metadataConfigBuilder =
         getMetadataConfigBuilder(true, 
false).withMetadataIndexBloomFilter(true);
@@ -229,6 +229,68 @@ public class TestHoodieIndexer extends 
SparkClientFunctionalTestHarness implemen
         .stream().findFirst().get().getCommitTime());
   }
 
+  @Test
+  public void testIndexerWithWriterFinishingLast() throws IOException {
+    // Test the case where a regular write updating the metadata table is in 
progress,
+    // i.e., a delta commit in the metadata table is inflight, and the async 
indexer
+    // finishes the original delta commit.  In this case, the async indexer 
should not
+    // trigger the rollback on other inflight writes in the metadata table.
+    String tableName = "indexer_with_writer_finishing_first";
+    // Enable files and bloom_filters on the regular write client
+    HoodieMetadataConfig.Builder metadataConfigBuilder =
+        getMetadataConfigBuilder(true, 
false).withMetadataIndexBloomFilter(true);
+    HoodieMetadataConfig metadataConfig = metadataConfigBuilder.build();
+    upsertToTable(metadataConfig, tableName);
+    upsertToTable(metadataConfig, tableName);
+
+    // Transition the last commit to inflight
+    HoodieInstant commit = metaClient.getActiveTimeline().lastInstant().get();
+    String commitTime = commit.getTimestamp();
+    metaClient.getActiveTimeline().revertToInflight(commit);
+
+    HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(
+        context(), metadataConfig, metaClient.getBasePathV2().toString(),
+        getWriteConfigBuilder(basePath(), 
tableName).build().getSpillableMapBasePath());
+    HoodieTableMetaClient metadataMetaClient = 
metadata.getMetadataMetaClient();
+    HoodieInstant mdtCommit = metadataMetaClient.getActiveTimeline()
+        .filter(i -> i.getTimestamp().equals(commitTime))
+        .getInstants().get(0);
+    metadataMetaClient.getActiveTimeline().revertToInflight(mdtCommit);
+
+    // Run async indexer, creating a new indexing instant in the data table 
and a new delta commit
+    // in the metadata table, with the suffix "004"
+    HoodieIndexer.Config config = new HoodieIndexer.Config();
+    String propsPath = 
Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath();
+    config.basePath = basePath();
+    config.tableName = tableName;
+    config.indexTypes = COLUMN_STATS.name();
+    config.runningMode = SCHEDULE_AND_EXECUTE;
+    config.propsFilePath = propsPath;
+    
config.configs.add(HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.key()
 + "=" + colStatsFileGroupCount);
+    
config.configs.add(HoodieMetadataConfig.METADATA_INDEX_CHECK_TIMEOUT_SECONDS + 
"=1");
+
+    // start the indexer and validate files index is completely built out
+    HoodieIndexer indexer = new HoodieIndexer(jsc(), config);
+    // The catchup won't finish due to inflight delta commit, and this is 
expected
+    assertEquals(-1, indexer.start(0));
+
+    // Now, make sure that the inflight delta commit happened before the async 
indexer
+    // is intact
+    metaClient = reload(metaClient);
+    metadataMetaClient = reload(metadataMetaClient);
+
+    assertTrue(metaClient.getActiveTimeline().containsInstant(commitTime));
+    
assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(commitTime));
+    assertTrue(metaClient.getActiveTimeline()
+        .filter(i -> i.getTimestamp().equals(commitTime))
+        .getInstants().get(0).isInflight());
+    assertTrue(metadataMetaClient.getActiveTimeline()
+        .filter(i -> i.getTimestamp().equals(commitTime))
+        .getInstants().get(0).isInflight());
+    assertTrue(metaClient.getActiveTimeline().getRollbackTimeline().empty());
+    
assertTrue(metadataMetaClient.getActiveTimeline().getRollbackTimeline().empty());
+  }
+
   private static Stream<Arguments> colStatsFileGroupCountParams() {
     return Stream.of(
         Arguments.of(1),
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 03f8392511b..eb6ab80b5f9 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -1155,7 +1155,6 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     return config;
   }
 
-  @Disabled("HUDI-5815 for investigation")
   @ParameterizedTest
   @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
   public void testHoodieIndexer(HoodieRecordType recordType) throws Exception {

Reply via email to