the-other-tim-brown commented on code in PR #13653:
URL: https://github.com/apache/hudi/pull/13653#discussion_r2248287540
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSavepointRestoreMergeOnRead.java:
##########
@@ -326,21 +466,246 @@ void testCleaningCompletedRollback() throws Exception {
// restore
client.restoreToSavepoint(Objects.requireNonNull(savepointCommit,
"restore commit should not be null"));
- assertRowNumberEqualsTo(20);
+ assertEquals(commitToRowCount, getRecordCountPerCommit());
+ validateFilesMetadata(hoodieWriteConfig);
+ assertEquals(tableVersion,
HoodieTableMetaClient.reload(metaClient).getTableConfig().getTableVersion());
}
}
- private void upsertBatch(SparkRDDWriteClient client, List<HoodieRecord>
baseRecordsToUpdate) throws IOException {
+ @Test
+ void rollbackWithAsyncServices_compactionCompletesDuringCommit() {
+ HoodieWriteConfig hoodieWriteConfig =
getHoodieWriteConfigWithCompactionAndConcurrencyControl(HoodieTableVersion.EIGHT);
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ final int numRecords = 10;
+ writeInitialCommitsForAsyncServicesTests(numRecords);
+ String inflightCommit = client.startCommit();
+ List<HoodieRecord> records =
dataGen.generateUniqueUpdates(inflightCommit, numRecords);
+ JavaRDD<WriteStatus> writeStatus =
writeClient.upsert(jsc.parallelize(records, 1), inflightCommit);
+
+ // Run compaction while delta-commit is in-flight
+ Option<String> compactionInstant =
client.scheduleCompaction(Option.empty());
+ HoodieWriteMetadata result = client.compact(compactionInstant.get());
+ client.commitCompaction(compactionInstant.get(), result, Option.empty());
+ // commit the inflight delta commit
+ client.commit(inflightCommit, writeStatus);
+
+ client.savepoint(inflightCommit, "user1", "Savepoint for commit that
completed after compaction");
+
+ // write one more commit
+ String newCommitTime = client.startCommit();
+ records = dataGen.generateInserts(newCommitTime, numRecords);
+ writeStatus = client.insert(jsc.parallelize(records, 1), newCommitTime);
+ client.commit(newCommitTime, writeStatus);
+
+ // restore to savepoint
+ client.restoreToSavepoint(inflightCommit);
+ validateFilesMetadata(hoodieWriteConfig);
+ assertEquals(Collections.singletonMap(inflightCommit, numRecords),
getRecordCountPerCommit());
+ // ensure the compaction instant is still present because it was
completed before the target of the restore
+
assertTrue(metaClient.reloadActiveTimeline().filterCompletedInstants().getInstantsAsStream()
+ .anyMatch(hoodieInstant ->
hoodieInstant.requestedTime().equals(compactionInstant.get())));
+ }
+ }
+
+ @Test
+ void rollbackWithAsyncServices_commitCompletesDuringCompaction() {
Review Comment:
Similarly here, the compaction cannot be scheduled for v6
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]