lokeshj1703 commented on code in PR #13229:
URL: https://github.com/apache/hudi/pull/13229#discussion_r2085324043


##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java:
##########
@@ -284,48 +293,56 @@ public void testMergeOnReadRestoreCompactionCommit() 
throws IOException {
     assertEquals(1, secondPartitionCommit2LogFiles.size());
     HoodieTable table = this.getHoodieTable(metaClient, cfg);
 
-    //3. rollback the update to partition1 and partition2
-    HoodieInstant rollBackInstant = 
INSTANT_GENERATOR.createNewInstant(isUsingMarkers ? 
HoodieInstant.State.INFLIGHT : HoodieInstant.State.COMPLETED,
-        HoodieTimeline.DELTA_COMMIT_ACTION, "002");
-    BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor =
-        new BaseRollbackPlanActionExecutor(context, cfg, table, "003", 
rollBackInstant, false,
-            cfg.shouldRollbackUsingMarkers(), true);
-    mergeOnReadRollbackPlanActionExecutor.execute().get();
-    MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new 
MergeOnReadRollbackActionExecutor(
-        context,
-        cfg,
-        table,
-        "003",
-        rollBackInstant,
-        true,
-        false);
-    //3. assert the rollback stat
-    Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = 
mergeOnReadRollbackActionExecutor.execute().getPartitionMetadata();
-    assertEquals(2, rollbackMetadata.size());
-    assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table, 
"002").doesMarkerDirExist());
-
-    // rollback 001 as well. this time since its part of the restore, entire 
file slice should be deleted and not just log files (for partition1 and 
partition2)
-    HoodieInstant rollBackInstant1 = 
INSTANT_GENERATOR.createNewInstant(isUsingMarkers ? 
HoodieInstant.State.INFLIGHT : HoodieInstant.State.COMPLETED,
-        HoodieTimeline.DELTA_COMMIT_ACTION, "001");
-    BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor1 =
-        new BaseRollbackPlanActionExecutor(context, cfg, table, "004", 
rollBackInstant1, false,
-            cfg.shouldRollbackUsingMarkers(), true);
-    mergeOnReadRollbackPlanActionExecutor1.execute().get();
-    MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor1 = new 
MergeOnReadRollbackActionExecutor(
-        context,
-        cfg,
-        table,
-        "004",
-        rollBackInstant1,
-        true,
-        false);
-    mergeOnReadRollbackActionExecutor1.execute().getPartitionMetadata();
-
-    //assert there are no valid file groups in both partition1 and partition2
-    assertEquals(0, 
table.getFileSystemView().getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).count());
-    assertEquals(0, 
table.getFileSystemView().getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).count());
-    // and only 3rd partition should have valid file groups.
-    
assertTrue(table.getFileSystemView().getAllFileGroups(DEFAULT_THIRD_PARTITION_PATH).count()
 > 0);
+    // Start a client so that timeline server starts
+    client = getHoodieWriteClient(cfg);
+    // Sleep for 1 second to ensure the timeline server port is listening
+    Thread.sleep(1000);

Review Comment:
   BaseRollbackPlanActionExecutor makes a call to timeline server. Added a 
comment.



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java:
##########
@@ -110,52 +111,57 @@ public void testMergeOnReadRollbackActionExecutor(boolean 
isUsingMarkers) throws
     assertEquals(1, secondPartitionCommit2LogFiles.size());
     HoodieTable table = this.getHoodieTable(metaClient, cfg);
 
-    //2. rollback
-    HoodieInstant rollBackInstant = 
INSTANT_GENERATOR.createNewInstant(isUsingMarkers ? 
HoodieInstant.State.INFLIGHT : HoodieInstant.State.COMPLETED,
-        HoodieTimeline.DELTA_COMMIT_ACTION, "002");
-    BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor =
-        new BaseRollbackPlanActionExecutor(context, cfg, table, "003", 
rollBackInstant, false,
-            cfg.shouldRollbackUsingMarkers(), false);
-    mergeOnReadRollbackPlanActionExecutor.execute().get();
-    MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new 
MergeOnReadRollbackActionExecutor(
-        context,
-        cfg,
-        table,
-        "003",
-        rollBackInstant,
-        true,
-        false);
-    //3. assert the rollback stat
-    Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = 
mergeOnReadRollbackActionExecutor.execute().getPartitionMetadata();
-    assertEquals(2, rollbackMetadata.size());
-
-    for (Map.Entry<String, HoodieRollbackPartitionMetadata> entry : 
rollbackMetadata.entrySet()) {
-      HoodieRollbackPartitionMetadata meta = entry.getValue();
-      assertEquals(0, meta.getFailedDeleteFiles().size());
-      assertEquals(1, meta.getSuccessDeleteFiles().size());
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+      // create client so that timeline server starts
+      // Wait for embedded timeline server port to be ready
+      Thread.sleep(1000);

Review Comment:
   BaseRollbackPlanActionExecutor makes a call to timeline server. Added a 
comment.



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java:
##########
@@ -112,41 +111,41 @@ public void testPartitionChanges(HoodieTableType 
tableType, IndexType indexType)
       String commitTimeAtEpoch0 = getCommitTimeAtUTC(0);
       List<HoodieRecord> insertsAtEpoch0 = getInserts(totalRecords, p1, 0, 
payloadClass);
       client.startCommitWithTime(commitTimeAtEpoch0);
-      assertNoWriteErrors(client.upsert(jsc().parallelize(insertsAtEpoch0, 2), 
commitTimeAtEpoch0).collect());
+      client.commit(commitTimeAtEpoch0, 
client.upsert(jsc().parallelize(insertsAtEpoch0, 2), commitTimeAtEpoch0));
 
       // 2nd batch: normal updates same partition
       String commitTimeAtEpoch5 = getCommitTimeAtUTC(5);
       List<HoodieRecord> updatesAtEpoch5 = getUpdates(insertsAtEpoch0, 5, 
payloadClass);
       client.startCommitWithTime(commitTimeAtEpoch5);
-      assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch5, 2), 
commitTimeAtEpoch5).collect());
+      client.commit(commitTimeAtEpoch5, 
client.upsert(jsc().parallelize(updatesAtEpoch5, 2), commitTimeAtEpoch5));
       readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1, 5);
 
       // 3rd batch: update all from p1 to p2
       String commitTimeAtEpoch6 = getCommitTimeAtUTC(6);
       List<HoodieRecord> updatesAtEpoch6 = getUpdates(updatesAtEpoch5, p2, 6, 
payloadClass);
       client.startCommitWithTime(commitTimeAtEpoch6);
-      assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch6, 2), 
commitTimeAtEpoch6).collect());
+      client.commit(commitTimeAtEpoch6, 
client.upsert(jsc().parallelize(updatesAtEpoch6, 2), commitTimeAtEpoch6));
       readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p2, 6);
 
       // 4th batch: update all from p2 to p3
       String commitTimeAtEpoch7 = getCommitTimeAtUTC(7);
       List<HoodieRecord> updatesAtEpoch7 = getUpdates(updatesAtEpoch6, p3, 7, 
payloadClass);
       client.startCommitWithTime(commitTimeAtEpoch7);
-      assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch7, 2), 
commitTimeAtEpoch7).collect());
+      client.commit(commitTimeAtEpoch7, 
client.upsert(jsc().parallelize(updatesAtEpoch7, 2), commitTimeAtEpoch7));
       readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p3, 7);
 
       // 5th batch: late update all to p4; discarded
       String commitTimeAtEpoch8 = getCommitTimeAtUTC(8);
       List<HoodieRecord> updatesAtEpoch2 = getUpdates(insertsAtEpoch0, p4, 2, 
payloadClass);
       client.startCommitWithTime(commitTimeAtEpoch8);
-      assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch2, 2), 
commitTimeAtEpoch8).collect());

Review Comment:
   Addressed



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java:
##########
@@ -242,19 +243,16 @@ public void 
testHoodieClientMultiWriterWithClustering(HoodieTableType tableType)
 
   private void createCommitWithInserts(HoodieWriteConfig cfg, 
SparkRDDWriteClient client,
                                        String prevCommitTime, String 
newCommitTime, int numRecords) throws Exception {
-    // Finish first base commmit
-    JavaRDD<WriteStatus> result = insertFirstBatch(cfg, client, newCommitTime, 
prevCommitTime, numRecords, SparkRDDWriteClient::bulkInsert,
+    insertFirstBatch(cfg, client, newCommitTime, prevCommitTime, numRecords, 
SparkRDDWriteClient::bulkInsert,
         false, false, numRecords, INSTANT_GENERATOR);
-    assertTrue(client.commit(newCommitTime, result), "Commit should succeed");
   }
 
   private void createCommitWithUpserts(HoodieWriteConfig cfg, 
SparkRDDWriteClient client, String prevCommit,
                                        String commitTimeBetweenPrevAndNew, 
String newCommitTime, int numRecords)
       throws Exception {
-    JavaRDD<WriteStatus> result = updateBatch(cfg, client, newCommitTime, 
prevCommit,
+    updateBatch(cfg, client, newCommitTime, prevCommit,
         Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), "000", 
numRecords, SparkRDDWriteClient::upsert, false, false,
         numRecords, 200, 2, INSTANT_GENERATOR);
-    client.commit(newCommitTime, result);

Review Comment:
   updateBatch API used here eventually calls overloaded method with skipCommit 
as false. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to