danny0405 commented on code in PR #13229:
URL: https://github.com/apache/hudi/pull/13229#discussion_r2102069475


##########
hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java:
##########
@@ -2806,7 +2858,7 @@ private void validateMetadata(HoodieJavaWriteClient 
testClient, Option<String> i
       }
     });
 
-    try (HoodieBackedTableMetadataWriter<List<HoodieRecord>> metadataWriter = 
metadataWriter(client)) {
+    try (HoodieBackedTableMetadataWriter<List<HoodieRecord>,List<WriteStatus>> 
metadataWriter = metadataWriter(client)) {

Review Comment:
   `,List` -> `, List`



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java:
##########
@@ -48,15 +44,7 @@ public void compact(String instantTime) {
     LOG.info("Compactor executing compaction {}", instantTime);
     SparkRDDWriteClient<T> writeClient = (SparkRDDWriteClient<T>) 
compactionClient;
     HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = 
writeClient.compact(instantTime);
-    List<HoodieWriteStat> writeStats = 
compactionMetadata.getCommitMetadata().get().getWriteStats();
-    long numWriteErrors = 
writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum();
-    if (numWriteErrors != 0) {
-      // We treat even a single error in compaction as fatal
-      LOG.error("Compaction for instant ({}) failed with write errors. Errors 
:{}", instantTime, numWriteErrors);
-      throw new HoodieException(
-          "Compaction for instant (" + instantTime + ") failed with write 
errors. Errors :" + numWriteErrors);

Review Comment:
   Looks like a function regression, does OneHouse need this in production?



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java:
##########
@@ -118,8 +121,10 @@ public void 
testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap
         records.add(dup);
       }
       JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
-      List<WriteStatus> statuses = client.bulkInsert(writeRecords, 
newCommitTime).collect();
-      assertNoWriteErrors(statuses);
+      JavaRDD<WriteStatus> statuses = client.bulkInsert(writeRecords, 
newCommitTime);
+      statuses = jsc.parallelize(statuses.collect(), 1);
+      client.commit(newCommitTime, statuses, Option.empty(), COMMIT_ACTION, 
Collections.emptyMap(), Option.empty());

Review Comment:
   Maybe we just set up the statuses as a List with `RDD.collect()`, then 
`jsc.parallelize` when commiting, so that there is no need to collect it 
multiple times.
   
   Let's check the whole file to fix it.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -1411,16 +1445,17 @@ private void testDeletes(SparkRDDWriteClient client, 
List<HoodieRecord> previous
 
     List<HoodieKey> hoodieKeysToDelete = 
randomSelectAsHoodieKeys(previousRecords, sizeToDelete);
     JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(hoodieKeysToDelete, 1);
-    List<WriteStatus> statuses = client.delete(deleteKeys, 
instantTime).collect();
-
-    assertNoWriteErrors(statuses);
+    JavaRDD<WriteStatus> rawStatuses = client.delete(deleteKeys, instantTime);
+    JavaRDD<WriteStatus> statuses = jsc.parallelize(rawStatuses.collect(),1);
+    client.commit(instantTime, statuses, Option.empty(), COMMIT_ACTION, 
Collections.emptyMap(), Option.empty());

Review Comment:
   ditto



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java:
##########
@@ -172,70 +174,81 @@ public void testListBasedRollbackStrategy() throws 
Exception {
     HoodieTestDataGenerator.writePartitionMetadataDeprecated(
         storage, new String[] {DEFAULT_FIRST_PARTITION_PATH, 
DEFAULT_SECOND_PARTITION_PATH},
         basePath);
-    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
-
-    String newCommitTime = "001";
-    WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
-    List<HoodieRecord> records = 
dataGen.generateInsertsContainsAllPartitions(newCommitTime, 3);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
-    JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime);
-    Assertions.assertNoWriteErrors(statuses.collect());
-
-    newCommitTime = "002";
-    WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
-    records = dataGen.generateUpdates(newCommitTime, records);
-    statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime);
-    Assertions.assertNoWriteErrors(statuses.collect());
-
-    context = new HoodieSparkEngineContext(jsc);
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable table = this.getHoodieTable(metaClient, cfg);
-    HoodieInstant needRollBackInstant = 
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "002");
-    String rollbackInstant = "003";
-
-    ListingBasedRollbackStrategy rollbackStrategy = new 
ListingBasedRollbackStrategy(table, context, table.getConfig(), 
rollbackInstant, false);
-    List<HoodieRollbackRequest> rollBackRequests = 
rollbackStrategy.getRollbackRequests(needRollBackInstant);
-
-    HoodieRollbackRequest rollbackRequest = 
rollBackRequests.stream().filter(entry -> 
entry.getPartitionPath().equals(DEFAULT_FIRST_PARTITION_PATH)).findFirst().get();
-
-    FileSystem fs = Mockito.mock(FileSystem.class);
-    MockitoAnnotations.initMocks(this);
-
-    // mock to throw exception when fs.exists() is invoked
-    Mockito.when(fs.exists(any()))
-        .thenThrow(new IOException("Failing exists call for " + 
rollbackRequest.getFilesToBeDeleted().get(0)));
-
-    rollbackStrategy = new ListingBasedRollbackStrategy(table, context, cfg, 
rollbackInstant, false);
-    List<HoodieRollbackRequest> rollBackRequestsUpdated = 
rollbackStrategy.getRollbackRequests(needRollBackInstant);
-
-    assertEquals(rollBackRequests, rollBackRequestsUpdated);
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+
+      String newCommitTime = "001";
+      WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
+      List<HoodieRecord> records = 
dataGen.generateInsertsContainsAllPartitions(newCommitTime, 3);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+      JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, 
newCommitTime);
+      statuses = jsc.parallelize(statuses.collect(), 1);
+      client.commit(newCommitTime, statuses, Option.empty(), COMMIT_ACTION, 
Collections.emptyMap(), Option.empty());
+      assertNoWriteErrors(statuses.collect());
+
+      newCommitTime = "002";
+      WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
+      records = dataGen.generateUpdates(newCommitTime, records);
+      statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime);
+      statuses = jsc.parallelize(statuses.collect(), 1);
+      client.commit(newCommitTime, statuses, Option.empty(), COMMIT_ACTION, 
Collections.emptyMap(), Option.empty());

Review Comment:
   Maybe we just set up the statuses as a List with `RDD.collect()`, then 
`jsc.parallelize` when commiting, so that there is no need to collect it 
multiple times.
   
   Let's check the whole file to fix it.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java:
##########
@@ -153,26 +153,19 @@ public static Pair<String, JavaRDD<WriteStatus>> 
insertFirstBigBatchForClientCle
     JavaRDD<HoodieRecord> writeRecords = 
context.getJavaSparkContext().parallelize(records, PARALLELISM);
 
     JavaRDD<WriteStatus> statuses = insertFn.apply(client, writeRecords, 
newCommitTime);
-    // Verify there are no errors
+    statuses = context.getJavaSparkContext().parallelize(statuses.collect(), 
1);
+    client.commit(newCommitTime, statuses, Option.empty(), COMMIT_ACTION, 
Collections.emptyMap(), Option.empty());
     assertNoWriteErrors(statuses.collect());

Review Comment:
   Maybe we just set up the statuses as a List with `RDD.collect()`, then 
`jsc.parallelize` when commiting, so that there is no need to collect it 
multiple times.
   
   Let's check the whole file to fix it.



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java:
##########
@@ -343,11 +343,10 @@ private Pair<String, List<HoodieRecord>> 
writeS3MetadataRecords(String commitTim
       List<HoodieRecord> s3MetadataRecords = Arrays.asList(
           generateS3EventMetadata(commitTime, "bucket-1", "data-file-1.json", 
1L)
       );
-      JavaRDD<WriteStatus> result = 
writeClient.upsert(jsc().parallelize(s3MetadataRecords, 1), commitTime);
-
-      List<WriteStatus> statuses = result.collect();
-      assertNoWriteErrors(statuses);
-
+      JavaRDD<WriteStatus> writeStatuses = 
writeClient.upsert(jsc().parallelize(s3MetadataRecords, 1), commitTime);
+      writeStatuses = jsc.parallelize(writeStatuses.collect(), 1);
+      writeClient.commit(commitTime, writeStatuses);
+      assertNoWriteErrors(writeStatuses.collect());

Review Comment:
   Maybe we just set up the statuses as a List with `RDD.collect()`, then 
`jsc.parallelize` when commiting, so that there is no need to collect it 
multiple times.
   
   Let's check the whole file to fix it.



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java:
##########
@@ -443,10 +445,9 @@ private Pair<String, List<HoodieRecord>> 
writeGcsMetadataRecords(String commitTi
           getGcsMetadataRecord(commitTime, "data-file-4.json", "bucket-1", "1")
       );
       JavaRDD<WriteStatus> result = 
writeClient.upsert(jsc().parallelize(gcsMetadataRecords, 1), commitTime);
-
-      List<WriteStatus> statuses = result.collect();
-      assertNoWriteErrors(statuses);
-
+      result = jsc.parallelize(result.collect(), 1);
+      writeClient.commit(commitTime, result, Option.empty(), COMMIT_ACTION, 
Collections.emptyMap(), Option.empty());
+      assertNoWriteErrors(result.collect());

Review Comment:
   Maybe we just set up the statuses as a List with `RDD.collect()`, then 
`jsc.parallelize` when commiting, so that there is no need to collect it 
multiple times.
   
   Let's check the whole file to fix it.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -1310,17 +1338,21 @@ private Set<String> 
insertPartitionRecordsWithCommit(SparkRDDWriteClient client,
     WriteClientTestUtils.startCommitWithTime(client, commitTime1);
     List<HoodieRecord> inserts1 = 
dataGen.generateInsertsForPartition(commitTime1, recordsCount, partitionPath);
     JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 2);
-    List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, 
commitTime1).collect();
-    assertNoWriteErrors(statuses);
-    Set<String> batchBuckets = 
statuses.stream().map(WriteStatus::getFileId).collect(Collectors.toSet());
-    verifyRecordsWritten(commitTime1, true, inserts1, statuses, 
client.getConfig(),
+    JavaRDD<WriteStatus> rawStatuses = client.upsert(insertRecordsRDD1, 
commitTime1);

Review Comment:
   ditto



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSavepoint.java:
##########
@@ -86,15 +90,21 @@ public void testSavepoint(boolean enableMetadataTable,
       WriteClientTestUtils.startCommitWithTime(client, commitTime1);
       List<HoodieRecord> records1 = dataGen.generateInserts(commitTime1, 200);
       JavaRDD<HoodieRecord> writeRecords1 = jsc.parallelize(records1, 1);
-      List<WriteStatus> statuses1 = client.upsert(writeRecords1, 
commitTime1).collect();
-      assertNoWriteErrors(statuses1);
+      JavaRDD<WriteStatus> statuses1 = client.upsert(writeRecords1, 
commitTime1);
+      statuses1 = jsc.parallelize(statuses1.collect(), 1);

Review Comment:
   Maybe we just set up the statuses as a List with `RDD.collect()`, then 
`jsc.parallelize` when commiting, so that there is no need to collect it 
multiple times.
   
   Let's check the whole file to fix it.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java:
##########
@@ -110,8 +111,10 @@ public void testSavepointAndRollback(Boolean 
testFailedRestore, Boolean failedRe
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
       JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
 
-      List<WriteStatus> statuses = client.upsert(writeRecords, 
newCommitTime).collect();
-      assertNoWriteErrors(statuses);
+      List<WriteStatus> rawStatuses = client.upsert(writeRecords, 
newCommitTime).collect();
+      JavaRDD<WriteStatus> statuses = jsc.parallelize(rawStatuses, 1);
+      client.commit(newCommitTime, statuses, Option.empty(), COMMIT_ACTION, 
Collections.emptyMap(), Option.empty());

Review Comment:
   Maybe we just set up the statuses as a List with `RDD.collect()`, then 
`jsc.parallelize` when commiting, so that there is no need to collect it 
multiple times.
   
   Let's check the whole file to fix it.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -984,9 +1003,12 @@ public void testDeletesWithDeleteApi() throws Exception {
     List<HoodieRecord> dummyInserts3 = dataGen.generateInserts(commitTime6, 
20);
     List<HoodieKey> hoodieKeysToDelete3 = 
randomSelectAsHoodieKeys(dummyInserts3, 20);
     JavaRDD<HoodieKey> deleteKeys3 = jsc.parallelize(hoodieKeysToDelete3, 1);
-    statuses = client.delete(deleteKeys3, commitTime6).collect();
-    assertNoWriteErrors(statuses);
-    assertEquals(0, statuses.size(), "Just 0 write status for delete.");
+    JavaRDD<WriteStatus> preStatuses = client.delete(deleteKeys3, commitTime6);
+    statuses = jsc.parallelize(preStatuses.collect(), 1);

Review Comment:
   Maybe we just set up the statuses as a List with `rawStatuses.collect()`, 
then `jsc.parallelize` when commiting, so that there is no need to collect it 
multiple times.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -954,14 +972,15 @@ public void testDeletesWithDeleteApi() throws Exception {
     Set<String> keys1 = recordsToRecordKeySet(inserts1);
     List<String> keysSoFar = new ArrayList<>(keys1);
     JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
-    List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, 
commitTime1).collect();
-
-    assertNoWriteErrors(statuses);
+    JavaRDD<WriteStatus> rawStatuses = client.upsert(insertRecordsRDD1, 
commitTime1);
+    JavaRDD<WriteStatus> statuses = jsc.parallelize(rawStatuses.collect(), 1);

Review Comment:
   Maybe we just set up the statuses as a List with `rawStatuses.collect()`, 
then `jsc.parallelize` when commiting, so that there is no need to collect it 
multiple times.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -1269,11 +1296,12 @@ private void verifyInsertOverwritePartitionHandling(int 
batch1RecordsCount, int
     List<HoodieRecord> insertsAndUpdates2 = new ArrayList<>(inserts2);
     JavaRDD<HoodieRecord> insertAndUpdatesRDD2 = 
jsc.parallelize(insertsAndUpdates2, 2);
     HoodieWriteResult writeResult = 
client.insertOverwrite(insertAndUpdatesRDD2, commitTime2);
-    statuses = writeResult.getWriteStatuses().collect();
-    assertNoWriteErrors(statuses);
+    JavaRDD<WriteStatus> statusJavaRDD = 
jsc.parallelize(writeResult.getWriteStatuses().collect(), 2);
+    client.commit(commitTime2, statusJavaRDD, Option.empty(), 
REPLACE_COMMIT_ACTION, Collections.emptyMap(), Option.empty());
+    assertNoWriteErrors(statusJavaRDD.collect());
 
     assertEquals(batch1Buckets, new 
HashSet<>(writeResult.getPartitionToReplaceFileIds().get(testPartitionPath)));
-    verifyRecordsWritten(commitTime2, populateMetaFields, inserts2, statuses, 
config,
+    verifyRecordsWritten(commitTime2, populateMetaFields, inserts2, 
statusJavaRDD.collect(), config,

Review Comment:
   ditto



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -791,13 +805,15 @@ public void testSmallInsertHandlingForUpserts() throws 
Exception {
     insertsAndUpdates2.addAll(dataGen.generateUpdates(commitTime2, inserts1));
 
     JavaRDD<HoodieRecord> insertAndUpdatesRDD2 = 
jsc.parallelize(insertsAndUpdates2, 1);
-    statuses = client.upsert(insertAndUpdatesRDD2, commitTime2).collect();
-    assertNoWriteErrors(statuses);
+    rawStatuses = client.upsert(insertAndUpdatesRDD2, commitTime2);
+    statuses = jsc.parallelize(rawStatuses.collect(), 1);

Review Comment:
   Maybe we just set up the statuses as a List with `rawStatuses.collect()`, 
then `jsc.parallelize` when commiting, so that there is no need to collect it 
multiple times.



##########
hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java:
##########
@@ -2912,7 +2964,7 @@ private List<StoragePath> getAllFiles(HoodieTableMetadata 
metadata) throws Excep
     return allfiles;
   }
 
-  private HoodieBackedTableMetadataWriter<List<HoodieRecord>> 
metadataWriter(HoodieJavaWriteClient client) {
+  private 
HoodieBackedTableMetadataWriter<List<HoodieRecord>,List<WriteStatus>> 
metadataWriter(HoodieJavaWriteClient client) {

Review Comment:
   `,List` -> `, List`



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -771,14 +784,15 @@ public void testSmallInsertHandlingForUpserts() throws 
Exception {
     Set<String> keys1 = recordsToRecordKeySet(inserts1);
 
     JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
-    List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, 
commitTime1).collect();
-
-    assertNoWriteErrors(statuses);
+    JavaRDD<WriteStatus> rawStatuses = client.upsert(insertRecordsRDD1, 
commitTime1);
+    JavaRDD<WriteStatus> statuses = jsc.parallelize(rawStatuses.collect(), 1);

Review Comment:
   Maybe we just set up the statuses as a List with `RDD.collect()`, then 
`jsc.parallelize` when commiting, so that there is no need to collect it 
multiple times.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -1398,8 +1430,10 @@ private Pair<Set<String>, List<HoodieRecord>> 
testUpdates(String instantTime, Sp
     insertsAndUpdates.addAll(dataGen.generateUpdates(instantTime, inserts));
 
     JavaRDD<HoodieRecord> insertAndUpdatesRDD = 
jsc.parallelize(insertsAndUpdates, 1);
-    List<WriteStatus> statuses = client.upsert(insertAndUpdatesRDD, 
instantTime).collect();
-    assertNoWriteErrors(statuses);
+    JavaRDD<WriteStatus> rawStatuses = client.upsert(insertAndUpdatesRDD, 
instantTime);
+    JavaRDD<WriteStatus> statuses = jsc.parallelize(rawStatuses.collect(), 1);

Review Comment:
   ditto



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java:
##########
@@ -116,9 +118,10 @@ private void testReadFilterExist(HoodieWriteConfig config,
 
       JavaRDD<HoodieRecord> smallRecordsRDD = 
jsc.parallelize(records.subList(0, 75), PARALLELISM);
       // We create three base file, each having one record. (3 different 
partitions)
-      List<WriteStatus> statuses = writeFn.apply(writeClient, smallRecordsRDD, 
newCommitTime).collect();
-      // Verify there are no errors
-      assertNoWriteErrors(statuses);
+      JavaRDD<WriteStatus> statuses = writeFn.apply(writeClient, 
smallRecordsRDD, newCommitTime);
+      statuses = jsc.parallelize(statuses.collect(), 1);
+      writeClient.commit(newCommitTime, statuses, Option.empty(), 
COMMIT_ACTION, Collections.emptyMap(), Option.empty());

Review Comment:
   Maybe we just set up the statuses as a List with `RDD.collect()`, then 
`jsc.parallelize` when commiting, so that there is no need to collect it 
multiple times.
   
   Let's check the whole file to fix it.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java:
##########
@@ -151,13 +152,13 @@ private void testInsertAndCleanByCommits(
       for (int i = 0; i < 8; i++) {
         String newCommitTime = client.startCommit();
         List<HoodieRecord> records = 
recordUpsertGenWrappedFunction.apply(newCommitTime, BATCH_SIZE);
-
-        List<WriteStatus> statuses = upsertFn.apply(client, 
jsc().parallelize(records, PARALLELISM), newCommitTime).collect();
-        // Verify there are no errors
-        assertNoWriteErrors(statuses);
+        JavaRDD<WriteStatus> rawStatuses = upsertFn.apply(client, 
jsc().parallelize(records, PARALLELISM), newCommitTime);
+        JavaRDD<WriteStatus> statuses = 
jsc().parallelize(rawStatuses.collect(), 1);
+        client.commit(newCommitTime, statuses, Option.empty(), COMMIT_ACTION, 
Collections.emptyMap(), Option.empty());
+        assertNoWriteErrors(statuses.collect());

Review Comment:
   Maybe we just set up the statuses as a List with `RDD.collect()`, then 
`jsc.parallelize` when commiting, so that there is no need to collect it 
multiple times.
   
   Let's check the whole file to fix it.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java:
##########
@@ -178,9 +180,10 @@ private void testInsertAndCleanByVersions(
         WriteClientTestUtils.startCommitWithTime(client, newInstantTime);
         List<HoodieRecord> records = 
recordUpsertGenWrappedFunction.apply(newInstantTime, BATCH_SIZE);
 
-        List<WriteStatus> statuses = upsertFn.apply(client, 
jsc().parallelize(records, PARALLELISM), newInstantTime).collect();
-        // Verify there are no errors
-        assertNoWriteErrors(statuses);
+        JavaRDD<WriteStatus> rawStatuses = upsertFn.apply(client, 
jsc().parallelize(records, PARALLELISM), newInstantTime);
+        JavaRDD<WriteStatus> statuses = 
jsc().parallelize(rawStatuses.collect(), 1);
+        client.commit(newInstantTime, statuses, Option.empty(), COMMIT_ACTION, 
Collections.emptyMap(), Option.empty());
+        assertNoWriteErrors(statuses.collect());

Review Comment:
   Maybe we just set up the statuses as a List with `RDD.collect()`, then 
`jsc.parallelize` when commiting, so that there is no need to collect it 
multiple times.
   
   Let's check the whole file to fix it.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -817,10 +833,12 @@ public void testSmallInsertHandlingForUpserts() throws 
Exception {
     insertsAndUpdates3.addAll(updates3);
 
     JavaRDD<HoodieRecord> insertAndUpdatesRDD3 = 
jsc.parallelize(insertsAndUpdates3, 1);
-    statuses = client.upsert(insertAndUpdatesRDD3, commitTime3).collect();
-    assertNoWriteErrors(statuses);
-
-    assertEquals(2, statuses.size(), "2 files needs to be committed.");
+    rawStatuses = client.upsert(insertAndUpdatesRDD3, commitTime3);
+    statuses = jsc.parallelize(rawStatuses.collect(), 1);

Review Comment:
   Maybe we just set up the statuses as a List with `rawStatuses.collect()`, 
then `jsc.parallelize` when commiting, so that there is no need to collect it 
multiple times.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java:
##########
@@ -47,41 +48,39 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class HoodieClientRollbackTestBase extends HoodieClientTestBase {
-  protected void twoUpsertCommitDataWithTwoPartitions(List<FileSlice> 
firstPartitionCommit2FileSlices,
-                                                      List<FileSlice> 
secondPartitionCommit2FileSlices,
-                                                      HoodieWriteConfig cfg,
-                                                      boolean 
commitSecondUpsert) throws IOException {
+  protected void twoUpsertCommitDataWithTwoPartitions(List<FileSlice> 
firstPartitionCommit2FileSlices, List<FileSlice> 
secondPartitionCommit2FileSlices,
+                                                      HoodieWriteConfig cfg, 
boolean commitSecondUpsert, SparkRDDWriteClient client) throws IOException {
     //just generate two partitions
     dataGen = new HoodieTestDataGenerator(
         new String[] {DEFAULT_FIRST_PARTITION_PATH, 
DEFAULT_SECOND_PARTITION_PATH});
     //1. prepare data
     HoodieTestDataGenerator.writePartitionMetadataDeprecated(
         storage, new String[] {DEFAULT_FIRST_PARTITION_PATH, 
DEFAULT_SECOND_PARTITION_PATH},
         basePath);
-    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
     /**
      * Write 1 (only inserts)
      */
-    String newCommitTime = "001";
+    String newCommitTime = InProcessTimeGenerator.createNewInstantTime();
     WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
     List<HoodieRecord> records = 
dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2);
     JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
-    JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime);
-    Assertions.assertNoWriteErrors(statuses.collect());
+    JavaRDD<WriteStatus> rawStatuses = client.upsert(writeRecords, 
newCommitTime);

Review Comment:
   Maybe we just set up the statuses as a List with `RDD.collect()`, then 
`jsc.parallelize` when commiting, so that there is no need to collect it 
multiple times.
   
   Let's check the whole file to fix it.



##########
hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestDataSourceUtils.java:
##########
@@ -46,9 +49,10 @@ void testDeduplicationAgainstRecordsAlreadyInTable() {
       String newCommitTime = writeClient.startCommit();
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
       JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 2);
-      List<WriteStatus> statuses = writeClient.bulkInsert(recordsRDD, 
newCommitTime).collect();
-      // Verify there are no errors
-      assertNoWriteErrors(statuses);
+      JavaRDD<WriteStatus> rawStatuses = writeClient.bulkInsert(recordsRDD, 
newCommitTime);
+      JavaRDD<WriteStatus> statuses = jsc.parallelize(rawStatuses.collect(), 
1);
+      writeClient.commit(newCommitTime, statuses, Option.empty(), 
COMMIT_ACTION, Collections.emptyMap(), Option.empty());
+      assertNoWriteErrors(statuses.collect());

Review Comment:
   Maybe we just set up the statuses as a List with `RDD.collect()`, then 
`jsc.parallelize` when commiting, so that there is no need to collect it 
multiple times.
   
   Let's check the whole file to fix it.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java:
##########
@@ -439,54 +471,66 @@ public JavaRDD<WriteStatus> 
writeBatch(SparkRDDWriteClient client, String newCom
    * @param expRecordsInThisCommit       Expected number of records in this 
commit
    * @param expTotalRecords              Expected number of records when 
scanned
    * @param expTotalCommits              Expected number of commits (including 
this commit)
-   * @param doCommit
    * @throws Exception in case of error
    */
   public JavaRDD<WriteStatus> writeBatch(SparkRDDWriteClient client, String 
newCommitTime, String prevCommitTime,
                                          Option<List<String>> 
commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
                                          Function2<List<HoodieRecord>, String, 
Integer> recordGenFunction,
                                          Function3<JavaRDD<WriteStatus>, 
SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
-                                         boolean assertForCommit, int 
expRecordsInThisCommit, int expTotalRecords, int expTotalCommits, boolean 
doCommit,
-                                         boolean 
filterForCommitTimeWithAssert, InstantGenerator instantGenerator) throws 
Exception {
+                                         boolean assertForCommit, int 
expRecordsInThisCommit, int expTotalRecords, int expTotalCommits,
+                                         boolean 
filterForCommitTimeWithAssert, InstantGenerator instantGenerator,
+                                         boolean skipCommit) throws Exception {
 
     List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, 
numRecordsInThisCommit);
     return writeBatchHelper(client, newCommitTime, prevCommitTime, 
commitTimesBetweenPrevAndNew, initCommitTime,
         numRecordsInThisCommit, records, writeFn, assertForCommit, 
expRecordsInThisCommit, expTotalRecords,
-        expTotalCommits, doCommit, filterForCommitTimeWithAssert, 
instantGenerator);
+        expTotalCommits, filterForCommitTimeWithAssert, instantGenerator, 
skipCommit);
   }
 
   public JavaRDD<WriteStatus> writeBatch(SparkRDDWriteClient client, String 
newCommitTime, String prevCommitTime,
                                          Option<List<String>> 
commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
                                          Function3<List<HoodieRecord>, String, 
Integer, String> recordGenFunction,
                                          Function3<JavaRDD<WriteStatus>, 
SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
-                                         boolean assertForCommit, int 
expRecordsInThisCommit, int expTotalRecords, int expTotalCommits, boolean 
doCommit,
+                                         boolean assertForCommit, int 
expRecordsInThisCommit, int expTotalRecords, int expTotalCommits,
                                          boolean filterForCommitTimeWithAssert,
                                          String partition,
                                          InstantGenerator instantGenerator) 
throws Exception {
 
     List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, 
numRecordsInThisCommit, partition);
     return writeBatchHelper(client, newCommitTime, prevCommitTime, 
commitTimesBetweenPrevAndNew, initCommitTime,
         numRecordsInThisCommit, records, writeFn, assertForCommit, 
expRecordsInThisCommit, expTotalRecords,
-        expTotalCommits, doCommit, filterForCommitTimeWithAssert, 
instantGenerator);
+        expTotalCommits, filterForCommitTimeWithAssert, instantGenerator);
   }
 
   private JavaRDD<WriteStatus> writeBatchHelper(SparkRDDWriteClient client, 
String newCommitTime, String prevCommitTime,
                                                 Option<List<String>> 
commitTimesBetweenPrevAndNew, String initCommitTime,
                                                 int numRecordsInThisCommit, 
List<HoodieRecord> records,
                                                 
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, 
String> writeFn,
                                                 boolean assertForCommit, int 
expRecordsInThisCommit, int expTotalRecords,
-                                                int expTotalCommits, boolean 
doCommit, boolean filterForCommitTimeWithAssert,
+                                                int expTotalCommits, boolean 
filterForCommitTimeWithAssert,
                                                 InstantGenerator 
instantGenerator) throws IOException {
+    return writeBatchHelper(client, newCommitTime, prevCommitTime, 
commitTimesBetweenPrevAndNew, initCommitTime, numRecordsInThisCommit, records, 
writeFn,
+        assertForCommit, expRecordsInThisCommit, expTotalRecords, 
expTotalCommits, filterForCommitTimeWithAssert,
+        instantGenerator, false);
+  }
+
+  private JavaRDD<WriteStatus> writeBatchHelper(SparkRDDWriteClient client, 
String newCommitTime, String prevCommitTime,
+                                                Option<List<String>> 
commitTimesBetweenPrevAndNew, String initCommitTime,
+                                                int numRecordsInThisCommit, 
List<HoodieRecord> records,
+                                                
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, 
String> writeFn,
+                                                boolean assertForCommit, int 
expRecordsInThisCommit, int expTotalRecords,
+                                                int expTotalCommits, boolean 
filterForCommitTimeWithAssert,
+                                                InstantGenerator 
instantGenerator, boolean skipCommit) throws IOException {
     // Write 1 (only inserts)
     WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
 
     JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
 
-    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, 
newCommitTime);
-    List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
+    JavaRDD<WriteStatus> rawResult = writeFn.apply(client, writeRecords, 
newCommitTime);
+    JavaRDD<WriteStatus> result = jsc.parallelize(rawResult.collect(), 1);
+    assertNoWriteErrors(result.collect());

Review Comment:
   Maybe we just set up the statuses as a List with `RDD.collect()`, then 
`jsc.parallelize` when commiting, so that there is no need to collect it 
multiple times.
   
   Let's check the whole file to fix it.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java:
##########
@@ -482,16 +486,13 @@ public static Pair<HashMap<String, WorkloadStat>, 
WorkloadStat> buildProfile(Jav
   protected List<WriteStatus> writeAndVerifyBatch(BaseHoodieWriteClient 
client, List<HoodieRecord> inserts, String commitTime, boolean 
populateMetaFields, boolean autoCommitOff) {
     WriteClientTestUtils.startCommitWithTime(client, commitTime);
     JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts, 2);
-    JavaRDD<WriteStatus> statusRDD = ((SparkRDDWriteClient) 
client).upsert(insertRecordsRDD1, commitTime);
-    if (autoCommitOff) {
-      client.commit(commitTime, statusRDD);
-    }
-    List<WriteStatus> statuses = statusRDD.collect();
-    assertNoWriteErrors(statuses);
-    verifyRecordsWritten(commitTime, populateMetaFields, inserts, statuses, 
client.getConfig(),
+    JavaRDD<WriteStatus> rawStatusRDD = ((SparkRDDWriteClient) 
client).upsert(insertRecordsRDD1, commitTime);
+    JavaRDD<WriteStatus> statusRDD = jsc.parallelize(rawStatusRDD.collect(), 
1);
+    client.commit(commitTime, statusRDD);
+    assertNoWriteErrors(statusRDD.collect());

Review Comment:
   Maybe we just set up the statuses as a List with `RDD.collect()`, then 
`jsc.parallelize` when commiting, so that there is no need to collect it 
multiple times.
   
   Let's check the whole file to fix it.



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java:
##########
@@ -151,7 +155,10 @@ public void testRecordGenerationAPIsForCOW() throws 
IOException {
       records2.addAll(updates2);
       records2.addAll(deletes2);
 
-      List<WriteStatus> writeStatuses2 = 
client.upsert(jsc.parallelize(records2, 1), commitTime).collect();
+      JavaRDD<WriteStatus> rawWriteStatuses2 = 
client.upsert(jsc.parallelize(records2, 1), commitTime);
+      JavaRDD<WriteStatus> writeStatusesRDD2 = 
jsc.parallelize(rawWriteStatuses2.collect(), 1);

Review Comment:
   `rawWriteStatuses2.collect()` is already a list?



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java:
##########
@@ -203,16 +210,19 @@ public void testRecordGenerationAPIsForMOR() throws 
IOException {
 
     HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
     HoodieWriteConfig writeConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER)
-        
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(2)
-            .withInlineCompaction(true)
+        
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(3)
+            .withInlineCompaction(false)
             .compactionSmallFileSize(0).build()).build();
 
     try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
       // Insert
       String commitTime = client.startCommit();
       List<HoodieRecord> records1 = dataGen.generateInserts(commitTime, 100);
-      List<WriteStatus> writeStatuses1 = 
client.insert(jsc.parallelize(records1, 1), commitTime).collect();
-      assertNoWriteErrors(writeStatuses1);
+      JavaRDD<WriteStatus> rawWriteStatusesRDD1 = 
client.insert(jsc.parallelize(records1, 1), commitTime);
+      List<WriteStatus> writeStatuses1 = rawWriteStatusesRDD1.collect();
+      JavaRDD<WriteStatus> writeStatusesRDD1 = jsc.parallelize(writeStatuses1, 
1);
+      client.commit(commitTime, writeStatusesRDD1, Option.empty(), 
DELTA_COMMIT_ACTION, Collections.emptyMap(), Option.empty());
+      assertNoWriteErrors(writeStatusesRDD1.collect());

Review Comment:
   Maybe we just set up the statuses as a List with `RDD.collect()`, then 
`jsc.parallelize` when commiting, so that there is no need to collect it 
multiple times.
   
   Let's check the whole file to fix it.



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java:
##########
@@ -497,9 +497,10 @@ private void upsertToTable(HoodieMetadataConfig 
metadataConfig, String tableName
       String instant = writeClient.createNewInstantTime();
       WriteClientTestUtils.startCommitWithTime(writeClient, instant);
       List<HoodieRecord> records = DATA_GENERATOR.generateInserts(instant, 
100);
-      JavaRDD<WriteStatus> result = 
writeClient.upsert(jsc().parallelize(records, 1), instant);
-      List<WriteStatus> statuses = result.collect();
-      assertNoWriteErrors(statuses);
+      JavaRDD<WriteStatus> writeStatuses = 
writeClient.upsert(jsc().parallelize(records, 1), instant);
+      writeStatuses = jsc().parallelize(writeStatuses.collect(), 1);
+      writeClient.commit(instant, writeStatuses);
+      assertNoWriteErrors(writeStatuses.collect());

Review Comment:
   Maybe we just set up the statuses as a List with `RDD.collect()`, then 
`jsc.parallelize` when commiting, so that there is no need to collect it 
multiple times.
   
   Let's check the whole file to fix it.



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java:
##########
@@ -224,10 +226,9 @@ protected Pair<String, List<HoodieRecord>> 
writeS3MetadataRecords(String commitT
           generateS3EventMetadata(commitTime, "bucket-1", "data-file-1.json", 
1L)
       );
       JavaRDD<WriteStatus> result = 
writeClient.upsert(jsc().parallelize(s3MetadataRecords, 1), commitTime);
-
-      List<WriteStatus> statuses = result.collect();
-      assertNoWriteErrors(statuses);
-
+      result = jsc.parallelize(result.collect(), 1);
+      writeClient.commit(commitTime, result, Option.empty(), COMMIT_ACTION, 
Collections.emptyMap(), Option.empty());
+      assertNoWriteErrors(result.collect());

Review Comment:
   Maybe we just set up the statuses as a List with `RDD.collect()`, then 
`jsc.parallelize` when commiting, so that there is no need to collect it 
multiple times.
   
   Let's check the whole file to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to