danny0405 commented on code in PR #13229:
URL: https://github.com/apache/hudi/pull/13229#discussion_r2102069475
##########
hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java:
##########
@@ -2806,7 +2858,7 @@ private void validateMetadata(HoodieJavaWriteClient
testClient, Option<String> i
}
});
- try (HoodieBackedTableMetadataWriter<List<HoodieRecord>> metadataWriter =
metadataWriter(client)) {
+ try (HoodieBackedTableMetadataWriter<List<HoodieRecord>,List<WriteStatus>>
metadataWriter = metadataWriter(client)) {
Review Comment:
`,List` -> `, List`
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java:
##########
@@ -48,15 +44,7 @@ public void compact(String instantTime) {
LOG.info("Compactor executing compaction {}", instantTime);
SparkRDDWriteClient<T> writeClient = (SparkRDDWriteClient<T>)
compactionClient;
HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata =
writeClient.compact(instantTime);
- List<HoodieWriteStat> writeStats =
compactionMetadata.getCommitMetadata().get().getWriteStats();
- long numWriteErrors =
writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum();
- if (numWriteErrors != 0) {
- // We treat even a single error in compaction as fatal
- LOG.error("Compaction for instant ({}) failed with write errors. Errors
:{}", instantTime, numWriteErrors);
- throw new HoodieException(
- "Compaction for instant (" + instantTime + ") failed with write
errors. Errors :" + numWriteErrors);
Review Comment:
Looks like a function regression, does OneHouse need this in production?
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java:
##########
@@ -118,8 +121,10 @@ public void
testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap
records.add(dup);
}
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
- List<WriteStatus> statuses = client.bulkInsert(writeRecords,
newCommitTime).collect();
- assertNoWriteErrors(statuses);
+ JavaRDD<WriteStatus> statuses = client.bulkInsert(writeRecords,
newCommitTime);
+ statuses = jsc.parallelize(statuses.collect(), 1);
+ client.commit(newCommitTime, statuses, Option.empty(), COMMIT_ACTION,
Collections.emptyMap(), Option.empty());
Review Comment:
Maybe we just set up the statuses as a List with `RDD.collect()`, then
`jsc.parallelize` when commiting, so that there is no need to collect it
multiple times.
Let's check the whole file to fix it.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -1411,16 +1445,17 @@ private void testDeletes(SparkRDDWriteClient client,
List<HoodieRecord> previous
List<HoodieKey> hoodieKeysToDelete =
randomSelectAsHoodieKeys(previousRecords, sizeToDelete);
JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(hoodieKeysToDelete, 1);
- List<WriteStatus> statuses = client.delete(deleteKeys,
instantTime).collect();
-
- assertNoWriteErrors(statuses);
+ JavaRDD<WriteStatus> rawStatuses = client.delete(deleteKeys, instantTime);
+ JavaRDD<WriteStatus> statuses = jsc.parallelize(rawStatuses.collect(),1);
+ client.commit(instantTime, statuses, Option.empty(), COMMIT_ACTION,
Collections.emptyMap(), Option.empty());
Review Comment:
ditto
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java:
##########
@@ -172,70 +174,81 @@ public void testListBasedRollbackStrategy() throws
Exception {
HoodieTestDataGenerator.writePartitionMetadataDeprecated(
storage, new String[] {DEFAULT_FIRST_PARTITION_PATH,
DEFAULT_SECOND_PARTITION_PATH},
basePath);
- SparkRDDWriteClient client = getHoodieWriteClient(cfg);
-
- String newCommitTime = "001";
- WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
- List<HoodieRecord> records =
dataGen.generateInsertsContainsAllPartitions(newCommitTime, 3);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
- JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime);
- Assertions.assertNoWriteErrors(statuses.collect());
-
- newCommitTime = "002";
- WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
- records = dataGen.generateUpdates(newCommitTime, records);
- statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime);
- Assertions.assertNoWriteErrors(statuses.collect());
-
- context = new HoodieSparkEngineContext(jsc);
- metaClient = HoodieTableMetaClient.reload(metaClient);
- HoodieTable table = this.getHoodieTable(metaClient, cfg);
- HoodieInstant needRollBackInstant =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "002");
- String rollbackInstant = "003";
-
- ListingBasedRollbackStrategy rollbackStrategy = new
ListingBasedRollbackStrategy(table, context, table.getConfig(),
rollbackInstant, false);
- List<HoodieRollbackRequest> rollBackRequests =
rollbackStrategy.getRollbackRequests(needRollBackInstant);
-
- HoodieRollbackRequest rollbackRequest =
rollBackRequests.stream().filter(entry ->
entry.getPartitionPath().equals(DEFAULT_FIRST_PARTITION_PATH)).findFirst().get();
-
- FileSystem fs = Mockito.mock(FileSystem.class);
- MockitoAnnotations.initMocks(this);
-
- // mock to throw exception when fs.exists() is invoked
- Mockito.when(fs.exists(any()))
- .thenThrow(new IOException("Failing exists call for " +
rollbackRequest.getFilesToBeDeleted().get(0)));
-
- rollbackStrategy = new ListingBasedRollbackStrategy(table, context, cfg,
rollbackInstant, false);
- List<HoodieRollbackRequest> rollBackRequestsUpdated =
rollbackStrategy.getRollbackRequests(needRollBackInstant);
-
- assertEquals(rollBackRequests, rollBackRequestsUpdated);
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+
+ String newCommitTime = "001";
+ WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
+ List<HoodieRecord> records =
dataGen.generateInsertsContainsAllPartitions(newCommitTime, 3);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<WriteStatus> statuses = client.upsert(writeRecords,
newCommitTime);
+ statuses = jsc.parallelize(statuses.collect(), 1);
+ client.commit(newCommitTime, statuses, Option.empty(), COMMIT_ACTION,
Collections.emptyMap(), Option.empty());
+ assertNoWriteErrors(statuses.collect());
+
+ newCommitTime = "002";
+ WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
+ records = dataGen.generateUpdates(newCommitTime, records);
+ statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime);
+ statuses = jsc.parallelize(statuses.collect(), 1);
+ client.commit(newCommitTime, statuses, Option.empty(), COMMIT_ACTION,
Collections.emptyMap(), Option.empty());
Review Comment:
Maybe we just set up the statuses as a List with `RDD.collect()`, then
`jsc.parallelize` when commiting, so that there is no need to collect it
multiple times.
Let's check the whole file to fix it.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java:
##########
@@ -153,26 +153,19 @@ public static Pair<String, JavaRDD<WriteStatus>>
insertFirstBigBatchForClientCle
JavaRDD<HoodieRecord> writeRecords =
context.getJavaSparkContext().parallelize(records, PARALLELISM);
JavaRDD<WriteStatus> statuses = insertFn.apply(client, writeRecords,
newCommitTime);
- // Verify there are no errors
+ statuses = context.getJavaSparkContext().parallelize(statuses.collect(),
1);
+ client.commit(newCommitTime, statuses, Option.empty(), COMMIT_ACTION,
Collections.emptyMap(), Option.empty());
assertNoWriteErrors(statuses.collect());
Review Comment:
Maybe we just set up the statuses as a List with `RDD.collect()`, then
`jsc.parallelize` when commiting, so that there is no need to collect it
multiple times.
Let's check the whole file to fix it.
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java:
##########
@@ -343,11 +343,10 @@ private Pair<String, List<HoodieRecord>>
writeS3MetadataRecords(String commitTim
List<HoodieRecord> s3MetadataRecords = Arrays.asList(
generateS3EventMetadata(commitTime, "bucket-1", "data-file-1.json",
1L)
);
- JavaRDD<WriteStatus> result =
writeClient.upsert(jsc().parallelize(s3MetadataRecords, 1), commitTime);
-
- List<WriteStatus> statuses = result.collect();
- assertNoWriteErrors(statuses);
-
+ JavaRDD<WriteStatus> writeStatuses =
writeClient.upsert(jsc().parallelize(s3MetadataRecords, 1), commitTime);
+ writeStatuses = jsc.parallelize(writeStatuses.collect(), 1);
+ writeClient.commit(commitTime, writeStatuses);
+ assertNoWriteErrors(writeStatuses.collect());
Review Comment:
Maybe we just set up the statuses as a List with `RDD.collect()`, then
`jsc.parallelize` when commiting, so that there is no need to collect it
multiple times.
Let's check the whole file to fix it.
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java:
##########
@@ -443,10 +445,9 @@ private Pair<String, List<HoodieRecord>>
writeGcsMetadataRecords(String commitTi
getGcsMetadataRecord(commitTime, "data-file-4.json", "bucket-1", "1")
);
JavaRDD<WriteStatus> result =
writeClient.upsert(jsc().parallelize(gcsMetadataRecords, 1), commitTime);
-
- List<WriteStatus> statuses = result.collect();
- assertNoWriteErrors(statuses);
-
+ result = jsc.parallelize(result.collect(), 1);
+ writeClient.commit(commitTime, result, Option.empty(), COMMIT_ACTION,
Collections.emptyMap(), Option.empty());
+ assertNoWriteErrors(result.collect());
Review Comment:
Maybe we just set up the statuses as a List with `RDD.collect()`, then
`jsc.parallelize` when commiting, so that there is no need to collect it
multiple times.
Let's check the whole file to fix it.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -1310,17 +1338,21 @@ private Set<String>
insertPartitionRecordsWithCommit(SparkRDDWriteClient client,
WriteClientTestUtils.startCommitWithTime(client, commitTime1);
List<HoodieRecord> inserts1 =
dataGen.generateInsertsForPartition(commitTime1, recordsCount, partitionPath);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 2);
- List<WriteStatus> statuses = client.upsert(insertRecordsRDD1,
commitTime1).collect();
- assertNoWriteErrors(statuses);
- Set<String> batchBuckets =
statuses.stream().map(WriteStatus::getFileId).collect(Collectors.toSet());
- verifyRecordsWritten(commitTime1, true, inserts1, statuses,
client.getConfig(),
+ JavaRDD<WriteStatus> rawStatuses = client.upsert(insertRecordsRDD1,
commitTime1);
Review Comment:
ditto
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSavepoint.java:
##########
@@ -86,15 +90,21 @@ public void testSavepoint(boolean enableMetadataTable,
WriteClientTestUtils.startCommitWithTime(client, commitTime1);
List<HoodieRecord> records1 = dataGen.generateInserts(commitTime1, 200);
JavaRDD<HoodieRecord> writeRecords1 = jsc.parallelize(records1, 1);
- List<WriteStatus> statuses1 = client.upsert(writeRecords1,
commitTime1).collect();
- assertNoWriteErrors(statuses1);
+ JavaRDD<WriteStatus> statuses1 = client.upsert(writeRecords1,
commitTime1);
+ statuses1 = jsc.parallelize(statuses1.collect(), 1);
Review Comment:
Maybe we just set up the statuses as a List with `RDD.collect()`, then
`jsc.parallelize` when commiting, so that there is no need to collect it
multiple times.
Let's check the whole file to fix it.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java:
##########
@@ -110,8 +111,10 @@ public void testSavepointAndRollback(Boolean
testFailedRestore, Boolean failedRe
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
- List<WriteStatus> statuses = client.upsert(writeRecords,
newCommitTime).collect();
- assertNoWriteErrors(statuses);
+ List<WriteStatus> rawStatuses = client.upsert(writeRecords,
newCommitTime).collect();
+ JavaRDD<WriteStatus> statuses = jsc.parallelize(rawStatuses, 1);
+ client.commit(newCommitTime, statuses, Option.empty(), COMMIT_ACTION,
Collections.emptyMap(), Option.empty());
Review Comment:
Maybe we just set up the statuses as a List with `RDD.collect()`, then
`jsc.parallelize` when commiting, so that there is no need to collect it
multiple times.
Let's check the whole file to fix it.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -984,9 +1003,12 @@ public void testDeletesWithDeleteApi() throws Exception {
List<HoodieRecord> dummyInserts3 = dataGen.generateInserts(commitTime6,
20);
List<HoodieKey> hoodieKeysToDelete3 =
randomSelectAsHoodieKeys(dummyInserts3, 20);
JavaRDD<HoodieKey> deleteKeys3 = jsc.parallelize(hoodieKeysToDelete3, 1);
- statuses = client.delete(deleteKeys3, commitTime6).collect();
- assertNoWriteErrors(statuses);
- assertEquals(0, statuses.size(), "Just 0 write status for delete.");
+ JavaRDD<WriteStatus> preStatuses = client.delete(deleteKeys3, commitTime6);
+ statuses = jsc.parallelize(preStatuses.collect(), 1);
Review Comment:
Maybe we just set up the statuses as a List with `rawStatuses.collect()`,
then `jsc.parallelize` when commiting, so that there is no need to collect it
multiple times.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -954,14 +972,15 @@ public void testDeletesWithDeleteApi() throws Exception {
Set<String> keys1 = recordsToRecordKeySet(inserts1);
List<String> keysSoFar = new ArrayList<>(keys1);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
- List<WriteStatus> statuses = client.upsert(insertRecordsRDD1,
commitTime1).collect();
-
- assertNoWriteErrors(statuses);
+ JavaRDD<WriteStatus> rawStatuses = client.upsert(insertRecordsRDD1,
commitTime1);
+ JavaRDD<WriteStatus> statuses = jsc.parallelize(rawStatuses.collect(), 1);
Review Comment:
Maybe we just set up the statuses as a List with `rawStatuses.collect()`,
then `jsc.parallelize` when commiting, so that there is no need to collect it
multiple times.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -1269,11 +1296,12 @@ private void verifyInsertOverwritePartitionHandling(int
batch1RecordsCount, int
List<HoodieRecord> insertsAndUpdates2 = new ArrayList<>(inserts2);
JavaRDD<HoodieRecord> insertAndUpdatesRDD2 =
jsc.parallelize(insertsAndUpdates2, 2);
HoodieWriteResult writeResult =
client.insertOverwrite(insertAndUpdatesRDD2, commitTime2);
- statuses = writeResult.getWriteStatuses().collect();
- assertNoWriteErrors(statuses);
+ JavaRDD<WriteStatus> statusJavaRDD =
jsc.parallelize(writeResult.getWriteStatuses().collect(), 2);
+ client.commit(commitTime2, statusJavaRDD, Option.empty(),
REPLACE_COMMIT_ACTION, Collections.emptyMap(), Option.empty());
+ assertNoWriteErrors(statusJavaRDD.collect());
assertEquals(batch1Buckets, new
HashSet<>(writeResult.getPartitionToReplaceFileIds().get(testPartitionPath)));
- verifyRecordsWritten(commitTime2, populateMetaFields, inserts2, statuses,
config,
+ verifyRecordsWritten(commitTime2, populateMetaFields, inserts2,
statusJavaRDD.collect(), config,
Review Comment:
ditto
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -791,13 +805,15 @@ public void testSmallInsertHandlingForUpserts() throws
Exception {
insertsAndUpdates2.addAll(dataGen.generateUpdates(commitTime2, inserts1));
JavaRDD<HoodieRecord> insertAndUpdatesRDD2 =
jsc.parallelize(insertsAndUpdates2, 1);
- statuses = client.upsert(insertAndUpdatesRDD2, commitTime2).collect();
- assertNoWriteErrors(statuses);
+ rawStatuses = client.upsert(insertAndUpdatesRDD2, commitTime2);
+ statuses = jsc.parallelize(rawStatuses.collect(), 1);
Review Comment:
Maybe we just set up the statuses as a List with `rawStatuses.collect()`,
then `jsc.parallelize` when commiting, so that there is no need to collect it
multiple times.
##########
hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java:
##########
@@ -2912,7 +2964,7 @@ private List<StoragePath> getAllFiles(HoodieTableMetadata
metadata) throws Excep
return allfiles;
}
- private HoodieBackedTableMetadataWriter<List<HoodieRecord>>
metadataWriter(HoodieJavaWriteClient client) {
+ private
HoodieBackedTableMetadataWriter<List<HoodieRecord>,List<WriteStatus>>
metadataWriter(HoodieJavaWriteClient client) {
Review Comment:
`,List` -> `, List`
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -771,14 +784,15 @@ public void testSmallInsertHandlingForUpserts() throws
Exception {
Set<String> keys1 = recordsToRecordKeySet(inserts1);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
- List<WriteStatus> statuses = client.upsert(insertRecordsRDD1,
commitTime1).collect();
-
- assertNoWriteErrors(statuses);
+ JavaRDD<WriteStatus> rawStatuses = client.upsert(insertRecordsRDD1,
commitTime1);
+ JavaRDD<WriteStatus> statuses = jsc.parallelize(rawStatuses.collect(), 1);
Review Comment:
Maybe we just set up the statuses as a List with `RDD.collect()`, then
`jsc.parallelize` when commiting, so that there is no need to collect it
multiple times.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -1398,8 +1430,10 @@ private Pair<Set<String>, List<HoodieRecord>>
testUpdates(String instantTime, Sp
insertsAndUpdates.addAll(dataGen.generateUpdates(instantTime, inserts));
JavaRDD<HoodieRecord> insertAndUpdatesRDD =
jsc.parallelize(insertsAndUpdates, 1);
- List<WriteStatus> statuses = client.upsert(insertAndUpdatesRDD,
instantTime).collect();
- assertNoWriteErrors(statuses);
+ JavaRDD<WriteStatus> rawStatuses = client.upsert(insertAndUpdatesRDD,
instantTime);
+ JavaRDD<WriteStatus> statuses = jsc.parallelize(rawStatuses.collect(), 1);
Review Comment:
ditto
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java:
##########
@@ -116,9 +118,10 @@ private void testReadFilterExist(HoodieWriteConfig config,
JavaRDD<HoodieRecord> smallRecordsRDD =
jsc.parallelize(records.subList(0, 75), PARALLELISM);
// We create three base file, each having one record. (3 different
partitions)
- List<WriteStatus> statuses = writeFn.apply(writeClient, smallRecordsRDD,
newCommitTime).collect();
- // Verify there are no errors
- assertNoWriteErrors(statuses);
+ JavaRDD<WriteStatus> statuses = writeFn.apply(writeClient,
smallRecordsRDD, newCommitTime);
+ statuses = jsc.parallelize(statuses.collect(), 1);
+ writeClient.commit(newCommitTime, statuses, Option.empty(),
COMMIT_ACTION, Collections.emptyMap(), Option.empty());
Review Comment:
Maybe we just set up the statuses as a List with `RDD.collect()`, then
`jsc.parallelize` when commiting, so that there is no need to collect it
multiple times.
Let's check the whole file to fix it.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java:
##########
@@ -151,13 +152,13 @@ private void testInsertAndCleanByCommits(
for (int i = 0; i < 8; i++) {
String newCommitTime = client.startCommit();
List<HoodieRecord> records =
recordUpsertGenWrappedFunction.apply(newCommitTime, BATCH_SIZE);
-
- List<WriteStatus> statuses = upsertFn.apply(client,
jsc().parallelize(records, PARALLELISM), newCommitTime).collect();
- // Verify there are no errors
- assertNoWriteErrors(statuses);
+ JavaRDD<WriteStatus> rawStatuses = upsertFn.apply(client,
jsc().parallelize(records, PARALLELISM), newCommitTime);
+ JavaRDD<WriteStatus> statuses =
jsc().parallelize(rawStatuses.collect(), 1);
+ client.commit(newCommitTime, statuses, Option.empty(), COMMIT_ACTION,
Collections.emptyMap(), Option.empty());
+ assertNoWriteErrors(statuses.collect());
Review Comment:
Maybe we just set up the statuses as a List with `RDD.collect()`, then
`jsc.parallelize` when commiting, so that there is no need to collect it
multiple times.
Let's check the whole file to fix it.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java:
##########
@@ -178,9 +180,10 @@ private void testInsertAndCleanByVersions(
WriteClientTestUtils.startCommitWithTime(client, newInstantTime);
List<HoodieRecord> records =
recordUpsertGenWrappedFunction.apply(newInstantTime, BATCH_SIZE);
- List<WriteStatus> statuses = upsertFn.apply(client,
jsc().parallelize(records, PARALLELISM), newInstantTime).collect();
- // Verify there are no errors
- assertNoWriteErrors(statuses);
+ JavaRDD<WriteStatus> rawStatuses = upsertFn.apply(client,
jsc().parallelize(records, PARALLELISM), newInstantTime);
+ JavaRDD<WriteStatus> statuses =
jsc().parallelize(rawStatuses.collect(), 1);
+ client.commit(newInstantTime, statuses, Option.empty(), COMMIT_ACTION,
Collections.emptyMap(), Option.empty());
+ assertNoWriteErrors(statuses.collect());
Review Comment:
Maybe we just set up the statuses as a List with `RDD.collect()`, then
`jsc.parallelize` when commiting, so that there is no need to collect it
multiple times.
Let's check the whole file to fix it.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -817,10 +833,12 @@ public void testSmallInsertHandlingForUpserts() throws
Exception {
insertsAndUpdates3.addAll(updates3);
JavaRDD<HoodieRecord> insertAndUpdatesRDD3 =
jsc.parallelize(insertsAndUpdates3, 1);
- statuses = client.upsert(insertAndUpdatesRDD3, commitTime3).collect();
- assertNoWriteErrors(statuses);
-
- assertEquals(2, statuses.size(), "2 files needs to be committed.");
+ rawStatuses = client.upsert(insertAndUpdatesRDD3, commitTime3);
+ statuses = jsc.parallelize(rawStatuses.collect(), 1);
Review Comment:
Maybe we just set up the statuses as a List with `rawStatuses.collect()`,
then `jsc.parallelize` when commiting, so that there is no need to collect it
multiple times.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java:
##########
@@ -47,41 +48,39 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
public class HoodieClientRollbackTestBase extends HoodieClientTestBase {
- protected void twoUpsertCommitDataWithTwoPartitions(List<FileSlice>
firstPartitionCommit2FileSlices,
- List<FileSlice>
secondPartitionCommit2FileSlices,
- HoodieWriteConfig cfg,
- boolean
commitSecondUpsert) throws IOException {
+ protected void twoUpsertCommitDataWithTwoPartitions(List<FileSlice>
firstPartitionCommit2FileSlices, List<FileSlice>
secondPartitionCommit2FileSlices,
+ HoodieWriteConfig cfg,
boolean commitSecondUpsert, SparkRDDWriteClient client) throws IOException {
//just generate two partitions
dataGen = new HoodieTestDataGenerator(
new String[] {DEFAULT_FIRST_PARTITION_PATH,
DEFAULT_SECOND_PARTITION_PATH});
//1. prepare data
HoodieTestDataGenerator.writePartitionMetadataDeprecated(
storage, new String[] {DEFAULT_FIRST_PARTITION_PATH,
DEFAULT_SECOND_PARTITION_PATH},
basePath);
- SparkRDDWriteClient client = getHoodieWriteClient(cfg);
/**
* Write 1 (only inserts)
*/
- String newCommitTime = "001";
+ String newCommitTime = InProcessTimeGenerator.createNewInstantTime();
WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
List<HoodieRecord> records =
dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
- JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime);
- Assertions.assertNoWriteErrors(statuses.collect());
+ JavaRDD<WriteStatus> rawStatuses = client.upsert(writeRecords,
newCommitTime);
Review Comment:
Maybe we just set up the statuses as a List with `RDD.collect()`, then
`jsc.parallelize` when commiting, so that there is no need to collect it
multiple times.
Let's check the whole file to fix it.
##########
hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestDataSourceUtils.java:
##########
@@ -46,9 +49,10 @@ void testDeduplicationAgainstRecordsAlreadyInTable() {
String newCommitTime = writeClient.startCommit();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 2);
- List<WriteStatus> statuses = writeClient.bulkInsert(recordsRDD,
newCommitTime).collect();
- // Verify there are no errors
- assertNoWriteErrors(statuses);
+ JavaRDD<WriteStatus> rawStatuses = writeClient.bulkInsert(recordsRDD,
newCommitTime);
+ JavaRDD<WriteStatus> statuses = jsc.parallelize(rawStatuses.collect(),
1);
+ writeClient.commit(newCommitTime, statuses, Option.empty(),
COMMIT_ACTION, Collections.emptyMap(), Option.empty());
+ assertNoWriteErrors(statuses.collect());
Review Comment:
Maybe we just set up the statuses as a List with `RDD.collect()`, then
`jsc.parallelize` when commiting, so that there is no need to collect it
multiple times.
Let's check the whole file to fix it.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java:
##########
@@ -439,54 +471,66 @@ public JavaRDD<WriteStatus>
writeBatch(SparkRDDWriteClient client, String newCom
* @param expRecordsInThisCommit Expected number of records in this
commit
* @param expTotalRecords Expected number of records when
scanned
* @param expTotalCommits Expected number of commits (including
this commit)
- * @param doCommit
* @throws Exception in case of error
*/
public JavaRDD<WriteStatus> writeBatch(SparkRDDWriteClient client, String
newCommitTime, String prevCommitTime,
Option<List<String>>
commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
Function2<List<HoodieRecord>, String,
Integer> recordGenFunction,
Function3<JavaRDD<WriteStatus>,
SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
- boolean assertForCommit, int
expRecordsInThisCommit, int expTotalRecords, int expTotalCommits, boolean
doCommit,
- boolean
filterForCommitTimeWithAssert, InstantGenerator instantGenerator) throws
Exception {
+ boolean assertForCommit, int
expRecordsInThisCommit, int expTotalRecords, int expTotalCommits,
+ boolean
filterForCommitTimeWithAssert, InstantGenerator instantGenerator,
+ boolean skipCommit) throws Exception {
List<HoodieRecord> records = recordGenFunction.apply(newCommitTime,
numRecordsInThisCommit);
return writeBatchHelper(client, newCommitTime, prevCommitTime,
commitTimesBetweenPrevAndNew, initCommitTime,
numRecordsInThisCommit, records, writeFn, assertForCommit,
expRecordsInThisCommit, expTotalRecords,
- expTotalCommits, doCommit, filterForCommitTimeWithAssert,
instantGenerator);
+ expTotalCommits, filterForCommitTimeWithAssert, instantGenerator,
skipCommit);
}
public JavaRDD<WriteStatus> writeBatch(SparkRDDWriteClient client, String
newCommitTime, String prevCommitTime,
Option<List<String>>
commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
Function3<List<HoodieRecord>, String,
Integer, String> recordGenFunction,
Function3<JavaRDD<WriteStatus>,
SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
- boolean assertForCommit, int
expRecordsInThisCommit, int expTotalRecords, int expTotalCommits, boolean
doCommit,
+ boolean assertForCommit, int
expRecordsInThisCommit, int expTotalRecords, int expTotalCommits,
boolean filterForCommitTimeWithAssert,
String partition,
InstantGenerator instantGenerator)
throws Exception {
List<HoodieRecord> records = recordGenFunction.apply(newCommitTime,
numRecordsInThisCommit, partition);
return writeBatchHelper(client, newCommitTime, prevCommitTime,
commitTimesBetweenPrevAndNew, initCommitTime,
numRecordsInThisCommit, records, writeFn, assertForCommit,
expRecordsInThisCommit, expTotalRecords,
- expTotalCommits, doCommit, filterForCommitTimeWithAssert,
instantGenerator);
+ expTotalCommits, filterForCommitTimeWithAssert, instantGenerator);
}
private JavaRDD<WriteStatus> writeBatchHelper(SparkRDDWriteClient client,
String newCommitTime, String prevCommitTime,
Option<List<String>>
commitTimesBetweenPrevAndNew, String initCommitTime,
int numRecordsInThisCommit,
List<HoodieRecord> records,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>,
String> writeFn,
boolean assertForCommit, int
expRecordsInThisCommit, int expTotalRecords,
- int expTotalCommits, boolean
doCommit, boolean filterForCommitTimeWithAssert,
+ int expTotalCommits, boolean
filterForCommitTimeWithAssert,
InstantGenerator
instantGenerator) throws IOException {
+ return writeBatchHelper(client, newCommitTime, prevCommitTime,
commitTimesBetweenPrevAndNew, initCommitTime, numRecordsInThisCommit, records,
writeFn,
+ assertForCommit, expRecordsInThisCommit, expTotalRecords,
expTotalCommits, filterForCommitTimeWithAssert,
+ instantGenerator, false);
+ }
+
+ private JavaRDD<WriteStatus> writeBatchHelper(SparkRDDWriteClient client,
String newCommitTime, String prevCommitTime,
+ Option<List<String>>
commitTimesBetweenPrevAndNew, String initCommitTime,
+ int numRecordsInThisCommit,
List<HoodieRecord> records,
+
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>,
String> writeFn,
+ boolean assertForCommit, int
expRecordsInThisCommit, int expTotalRecords,
+ int expTotalCommits, boolean
filterForCommitTimeWithAssert,
+ InstantGenerator
instantGenerator, boolean skipCommit) throws IOException {
// Write 1 (only inserts)
WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
- JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords,
newCommitTime);
- List<WriteStatus> statuses = result.collect();
- assertNoWriteErrors(statuses);
+ JavaRDD<WriteStatus> rawResult = writeFn.apply(client, writeRecords,
newCommitTime);
+ JavaRDD<WriteStatus> result = jsc.parallelize(rawResult.collect(), 1);
+ assertNoWriteErrors(result.collect());
Review Comment:
Maybe we just set up the statuses as a List with `RDD.collect()`, then
`jsc.parallelize` when commiting, so that there is no need to collect it
multiple times.
Let's check the whole file to fix it.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java:
##########
@@ -482,16 +486,13 @@ public static Pair<HashMap<String, WorkloadStat>,
WorkloadStat> buildProfile(Jav
protected List<WriteStatus> writeAndVerifyBatch(BaseHoodieWriteClient
client, List<HoodieRecord> inserts, String commitTime, boolean
populateMetaFields, boolean autoCommitOff) {
WriteClientTestUtils.startCommitWithTime(client, commitTime);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts, 2);
- JavaRDD<WriteStatus> statusRDD = ((SparkRDDWriteClient)
client).upsert(insertRecordsRDD1, commitTime);
- if (autoCommitOff) {
- client.commit(commitTime, statusRDD);
- }
- List<WriteStatus> statuses = statusRDD.collect();
- assertNoWriteErrors(statuses);
- verifyRecordsWritten(commitTime, populateMetaFields, inserts, statuses,
client.getConfig(),
+ JavaRDD<WriteStatus> rawStatusRDD = ((SparkRDDWriteClient)
client).upsert(insertRecordsRDD1, commitTime);
+ JavaRDD<WriteStatus> statusRDD = jsc.parallelize(rawStatusRDD.collect(),
1);
+ client.commit(commitTime, statusRDD);
+ assertNoWriteErrors(statusRDD.collect());
Review Comment:
Maybe we just set up the statuses as a List with `RDD.collect()`, then
`jsc.parallelize` when commiting, so that there is no need to collect it
multiple times.
Let's check the whole file to fix it.
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java:
##########
@@ -151,7 +155,10 @@ public void testRecordGenerationAPIsForCOW() throws
IOException {
records2.addAll(updates2);
records2.addAll(deletes2);
- List<WriteStatus> writeStatuses2 =
client.upsert(jsc.parallelize(records2, 1), commitTime).collect();
+ JavaRDD<WriteStatus> rawWriteStatuses2 =
client.upsert(jsc.parallelize(records2, 1), commitTime);
+ JavaRDD<WriteStatus> writeStatusesRDD2 =
jsc.parallelize(rawWriteStatuses2.collect(), 1);
Review Comment:
`rawWriteStatuses2.collect()` is already a list?
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java:
##########
@@ -203,16 +210,19 @@ public void testRecordGenerationAPIsForMOR() throws
IOException {
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
HoodieWriteConfig writeConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER)
-
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(2)
- .withInlineCompaction(true)
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(3)
+ .withInlineCompaction(false)
.compactionSmallFileSize(0).build()).build();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
writeConfig)) {
// Insert
String commitTime = client.startCommit();
List<HoodieRecord> records1 = dataGen.generateInserts(commitTime, 100);
- List<WriteStatus> writeStatuses1 =
client.insert(jsc.parallelize(records1, 1), commitTime).collect();
- assertNoWriteErrors(writeStatuses1);
+ JavaRDD<WriteStatus> rawWriteStatusesRDD1 =
client.insert(jsc.parallelize(records1, 1), commitTime);
+ List<WriteStatus> writeStatuses1 = rawWriteStatusesRDD1.collect();
+ JavaRDD<WriteStatus> writeStatusesRDD1 = jsc.parallelize(writeStatuses1,
1);
+ client.commit(commitTime, writeStatusesRDD1, Option.empty(),
DELTA_COMMIT_ACTION, Collections.emptyMap(), Option.empty());
+ assertNoWriteErrors(writeStatusesRDD1.collect());
Review Comment:
Maybe we just set up the statuses as a List with `RDD.collect()`, then
`jsc.parallelize` when commiting, so that there is no need to collect it
multiple times.
Let's check the whole file to fix it.
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java:
##########
@@ -497,9 +497,10 @@ private void upsertToTable(HoodieMetadataConfig
metadataConfig, String tableName
String instant = writeClient.createNewInstantTime();
WriteClientTestUtils.startCommitWithTime(writeClient, instant);
List<HoodieRecord> records = DATA_GENERATOR.generateInserts(instant,
100);
- JavaRDD<WriteStatus> result =
writeClient.upsert(jsc().parallelize(records, 1), instant);
- List<WriteStatus> statuses = result.collect();
- assertNoWriteErrors(statuses);
+ JavaRDD<WriteStatus> writeStatuses =
writeClient.upsert(jsc().parallelize(records, 1), instant);
+ writeStatuses = jsc().parallelize(writeStatuses.collect(), 1);
+ writeClient.commit(instant, writeStatuses);
+ assertNoWriteErrors(writeStatuses.collect());
Review Comment:
Maybe we just set up the statuses as a List with `RDD.collect()`, then
`jsc.parallelize` when commiting, so that there is no need to collect it
multiple times.
Let's check the whole file to fix it.
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java:
##########
@@ -224,10 +226,9 @@ protected Pair<String, List<HoodieRecord>>
writeS3MetadataRecords(String commitT
generateS3EventMetadata(commitTime, "bucket-1", "data-file-1.json",
1L)
);
JavaRDD<WriteStatus> result =
writeClient.upsert(jsc().parallelize(s3MetadataRecords, 1), commitTime);
-
- List<WriteStatus> statuses = result.collect();
- assertNoWriteErrors(statuses);
-
+ result = jsc.parallelize(result.collect(), 1);
+ writeClient.commit(commitTime, result, Option.empty(), COMMIT_ACTION,
Collections.emptyMap(), Option.empty());
+ assertNoWriteErrors(result.collect());
Review Comment:
Maybe we just set up the statuses as a List with `RDD.collect()`, then
`jsc.parallelize` when commiting, so that there is no need to collect it
multiple times.
Let's check the whole file to fix it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]