nsivabalan commented on code in PR #18305:
URL: https://github.com/apache/hudi/pull/18305#discussion_r2933991645
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -949,13 +976,13 @@ protected void archive(HoodieTable table) {
}
/**
- * Get inflight timeline excluding compaction and clustering.
+ * Get inflight timeline excluding compaction, log compaction, and
clustering.
*
* @param metaClient
* @return
*/
private HoodieTimeline
getInflightTimelineExcludeCompactionAndClustering(HoodieTableMetaClient
metaClient) {
- HoodieTimeline inflightTimelineExcludingCompaction =
metaClient.getCommitsTimeline().filterPendingExcludingCompaction();
+ HoodieTimeline inflightTimelineExcludingCompaction =
metaClient.getCommitsTimeline().filterPendingExcludingCompactionAndLogCompaction();
Review Comment:
hmmm, based on current code, I don't think we can exclude log compaction
from here.
as per master, log compaction is treated as yet another ingestion commit.
So, rollback of failed writes by ingestion writer or by Clean (lazy), should
account for log compaction instants.
and this is the api being used w/n rollbackFailedWrites ->
getInstantsToRollback.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -224,25 +224,46 @@ protected HoodieWriteMetadata<O> logCompact(String
logCompactionInstantTime, boo
+ "timestamp exists. Instant details %s",
compactionInstantWithGreaterTimestamp.get()));
}
- HoodieTimeline pendingLogCompactionTimeline =
table.getActiveTimeline().filterPendingLogCompactionTimeline();
InstantGenerator instantGenerator =
table.getMetaClient().getInstantGenerator();
HoodieInstant inflightInstant =
instantGenerator.getLogCompactionInflightInstant(logCompactionInstantTime);
- if (pendingLogCompactionTimeline.containsInstant(inflightInstant)) {
- log.info("Found Log compaction inflight file. Rolling back the commit
and exiting.");
- table.rollbackInflightLogCompaction(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false),
txnManager);
- table.getMetaClient().reloadActiveTimeline();
- throw new HoodieException("Execution is aborted since it found an
Inflight logcompaction,"
- + "log compaction plans are mutable plans, so reschedule another
logcompaction.");
+ boolean isMultiWriter =
config.getWriteConcurrencyMode().supportsMultiWriter();
+ if (isMultiWriter) {
Review Comment:
can we move this to a private method named `enableHeartBeatForMultiWriter`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -224,25 +224,46 @@ protected HoodieWriteMetadata<O> logCompact(String
logCompactionInstantTime, boo
+ "timestamp exists. Instant details %s",
compactionInstantWithGreaterTimestamp.get()));
}
- HoodieTimeline pendingLogCompactionTimeline =
table.getActiveTimeline().filterPendingLogCompactionTimeline();
InstantGenerator instantGenerator =
table.getMetaClient().getInstantGenerator();
HoodieInstant inflightInstant =
instantGenerator.getLogCompactionInflightInstant(logCompactionInstantTime);
- if (pendingLogCompactionTimeline.containsInstant(inflightInstant)) {
- log.info("Found Log compaction inflight file. Rolling back the commit
and exiting.");
- table.rollbackInflightLogCompaction(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false),
txnManager);
- table.getMetaClient().reloadActiveTimeline();
- throw new HoodieException("Execution is aborted since it found an
Inflight logcompaction,"
- + "log compaction plans are mutable plans, so reschedule another
logcompaction.");
+ boolean isMultiWriter =
config.getWriteConcurrencyMode().supportsMultiWriter();
+ if (isMultiWriter) {
+ try {
+ txnManager.beginStateChange(Option.of(inflightInstant),
txnManager.getLastCompletedTransactionOwner());
+ validateHeartBeat(logCompactionInstantTime);
Review Comment:
whats the expected behavior if the log compaction is completed by the time
we reach L 233?
this thread will hit exception due to heart heat ?
guess we might see heartbeat as expired in both cases
a. heart beat expired due to a failed write.
b. or if the log compaction completed.
don't we want to differentiate a and b. or its ok to fail current thread in
both cases.
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -1296,6 +1296,196 @@ public void
testCompactionFailsWhenInstantNotInActiveTimeline() throws Exception
FileIOUtils.deleteDirectory(new File(basePath));
}
+ /**
+ * Test that when two writers attempt to execute the same log compaction
plan concurrently,
+ * at least one will succeed and the other will fail due to heartbeat guard.
+ *
+ * This test uses a MOR table with multiwriter/optimistic concurrent control
enabled.
+ */
+ @Test
+ public void testConcurrentLogCompactionExecutionOnSamePlan() throws
Exception {
+ setUpMORTestTable();
+
+ Properties properties = new Properties();
+ properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
"3000");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
"1000");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,
"3");
+
+ HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+ .withHeartbeatIntervalInMs(60 * 1000)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withAutoClean(false).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .withAutoArchive(false).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(2)
+ .withLogCompactionBlocksThreshold(1).build())
+ .withEmbeddedTimelineServerEnabled(false)
+ .withMarkersType(MarkerType.DIRECT.name())
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withStorageType(FileSystemViewStorageType.MEMORY)
+
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ .withLockConfig(HoodieLockConfig.newBuilder()
+ .withLockProvider(InProcessLockProvider.class)
+ .withConflictResolutionStrategy(new
SimpleConcurrentFileWritesConflictResolutionStrategy())
+ .build())
+ .withProperties(properties);
+
+ HoodieWriteConfig cfg = writeConfigBuilder.build();
+
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+ String firstCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithInserts(cfg, client, "000", firstCommitTime, 200);
+
+ String secondCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithUpserts(cfg, client, firstCommitTime, Option.empty(),
secondCommitTime, 100);
+ String thirdCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithUpserts(cfg, client, secondCommitTime, Option.empty(),
thirdCommitTime, 100);
+
+ Option<String> logCompactionInstantOpt =
client.scheduleTableService(Option.empty(), Option.empty(),
TableServiceType.LOG_COMPACT);
+ assertTrue(logCompactionInstantOpt.isPresent(), "Log compaction should be
scheduled");
+ String logCompactionInstantTime = logCompactionInstantOpt.get();
+
+ HoodieTimeline pendingLogCompactionTimeline =
metaClient.reloadActiveTimeline().filterPendingLogCompactionTimeline();
+
assertTrue(pendingLogCompactionTimeline.containsInstant(logCompactionInstantTime),
+ "Log compaction instant should be in pending state after scheduling");
+
+ final int threadCount = 2;
+ final ExecutorService executors =
Executors.newFixedThreadPool(threadCount);
+ final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
+ final AtomicBoolean writer1Succeeded = new AtomicBoolean(false);
+ final AtomicBoolean writer2Succeeded = new AtomicBoolean(false);
+
+ SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+ SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+
+ Future<?> future1 = executors.submit(() -> {
+ try {
+ cyclicBarrier.await();
+ client1.logCompact(logCompactionInstantTime, true);
+ writer1Succeeded.set(true);
+ } catch (Exception e) {
+ LOG.info("Writer 1 failed with exception: " + e.getMessage());
+ writer1Succeeded.set(false);
+ }
+ });
+
+ Future<?> future2 = executors.submit(() -> {
+ try {
+ cyclicBarrier.await();
+ client2.logCompact(logCompactionInstantTime, true);
+ writer2Succeeded.set(true);
+ } catch (Exception e) {
+ LOG.info("Writer 2 failed with exception: " + e.getMessage());
+ writer2Succeeded.set(false);
+ }
+ });
+
+ future1.get();
+ future2.get();
+
+ assertTrue(writer1Succeeded.get() || writer2Succeeded.get(),
+ "At least one writer should succeed in executing the log compaction");
+
+ assertTrue(writer1Succeeded.get() ^ writer2Succeeded.get(),
+ "Exactly one writer should succeed in executing the log compaction");
+
+ executors.shutdown();
Review Comment:
can we move shutdown and close calls and any such clean ups to finally block
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]