This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 2245a9515f [HUDI-3798] Fixing ending of a transaction by different owner and removing some extraneous methods in trxn manager (#5255) 2245a9515f is described below commit 2245a9515f1d048b3f0a832c5087c227fbab132e Author: Sivabalan Narayanan <n.siv...@gmail.com> AuthorDate: Sun Apr 10 21:46:07 2022 -0700 [HUDI-3798] Fixing ending of a transaction by different owner and removing some extraneous methods in trxn manager (#5255) --- .../apache/hudi/client/BaseHoodieWriteClient.java | 16 +++-- .../apache/hudi/client/HoodieTimelineArchiver.java | 5 +- .../client/transaction/TransactionManager.java | 27 ++------ .../table/action/index/RunIndexActionExecutor.java | 4 +- .../client/transaction/TestTransactionManager.java | 74 ++++++++++++++++------ 5 files changed, 75 insertions(+), 51 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 32a8dee517..c1ebef7bb6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -795,7 +795,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime(); Timer.Context timerContext = metrics.getRollbackCtx(); try { - HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty(), initialMetadataTableIfNecessary); + HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.of(restoreInstantTime), initialMetadataTableIfNecessary); Option<HoodieRestorePlan> restorePlanOption = table.scheduleRestore(context, restoreInstantTime, instantTime); if (restorePlanOption.isPresent()) { HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime); @@ -1035,7 +1035,8 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, public void dropIndex(List<MetadataPartitionType> partitionTypes) { HoodieTable table = createTable(config, hadoopConf); String dropInstant = HoodieActiveTimeline.createNewInstantTime(); - this.txnManager.beginTransaction(); + HoodieInstant ownerInstant = new HoodieInstant(true, HoodieTimeline.INDEXING_ACTION, dropInstant); + this.txnManager.beginTransaction(Option.of(ownerInstant), Option.empty()); try { context.setJobStatus(this.getClass().getSimpleName(), "Dropping partitions from metadata table"); table.getMetadataWriter(dropInstant).ifPresent(w -> { @@ -1046,7 +1047,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, } }); } finally { - this.txnManager.endTransaction(); + this.txnManager.endTransaction(Option.of(ownerInstant)); } } @@ -1451,13 +1452,16 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, } HoodieTable table; - - this.txnManager.beginTransaction(); + Option<HoodieInstant> ownerInstant = Option.empty(); + if (instantTime.isPresent()) { + ownerInstant = Option.of(new HoodieInstant(true, CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), instantTime.get())); + } + this.txnManager.beginTransaction(ownerInstant, Option.empty()); try { tryUpgrade(metaClient, instantTime); table = doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary); } finally { - this.txnManager.endTransaction(); + this.txnManager.endTransaction(ownerInstant); } // Validate table properties diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index ca76e4e3bf..12d00bf618 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -157,7 +157,8 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> { public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLock) throws IOException { try { if (acquireLock) { - txnManager.beginTransaction(); + // there is no owner or instant time per se for archival. + txnManager.beginTransaction(Option.empty(), Option.empty()); } List<HoodieInstant> instantsToArchive = getInstantsToArchive().collect(Collectors.toList()); verifyLastMergeArchiveFilesIfNecessary(context); @@ -179,7 +180,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> { } finally { close(); if (acquireLock) { - txnManager.endTransaction(); + txnManager.endTransaction(Option.empty()); } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java index d9b9d3d269..aef1fee5e0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java @@ -45,14 +45,6 @@ public class TransactionManager implements Serializable { this.isOptimisticConcurrencyControlEnabled = config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl(); } - public void beginTransaction() { - if (isOptimisticConcurrencyControlEnabled) { - LOG.info("Transaction starting without a transaction owner"); - lockManager.lock(); - LOG.info("Transaction started without a transaction owner"); - } - } - public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant, Option<HoodieInstant> lastCompletedTxnOwnerInstant) { if (isOptimisticConcurrencyControlEnabled) { @@ -65,30 +57,25 @@ public class TransactionManager implements Serializable { } } - public void endTransaction() { - if (isOptimisticConcurrencyControlEnabled) { - LOG.info("Transaction ending without a transaction owner"); - lockManager.unlock(); - LOG.info("Transaction ended without a transaction owner"); - } - } - public void endTransaction(Option<HoodieInstant> currentTxnOwnerInstant) { if (isOptimisticConcurrencyControlEnabled) { LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstant); - reset(currentTxnOwnerInstant, Option.empty(), Option.empty()); - lockManager.unlock(); - LOG.info("Transaction ended with transaction owner " + currentTxnOwnerInstant); + if (reset(currentTxnOwnerInstant, Option.empty(), Option.empty())) { + lockManager.unlock(); + LOG.info("Transaction ended with transaction owner " + currentTxnOwnerInstant); + } } } - private synchronized void reset(Option<HoodieInstant> callerInstant, + private synchronized boolean reset(Option<HoodieInstant> callerInstant, Option<HoodieInstant> newTxnOwnerInstant, Option<HoodieInstant> lastCompletedTxnOwnerInstant) { if (!this.currentTxnOwnerInstant.isPresent() || this.currentTxnOwnerInstant.get().equals(callerInstant.get())) { this.currentTxnOwnerInstant = newTxnOwnerInstant; this.lastCompletedTxnOwnerInstant = lastCompletedTxnOwnerInstant; + return true; } + return false; } public void close() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java index 8c86a298f8..339e95b9e0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java @@ -232,14 +232,14 @@ public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> exte HoodieIndexCommitMetadata indexCommitMetadata) throws IOException { try { // update the table config and timeline in a lock as there could be another indexer running - txnManager.beginTransaction(); + txnManager.beginTransaction(Option.of(indexInstant), Option.empty()); updateMetadataPartitionsTableConfig(table.getMetaClient(), finalIndexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet())); table.getActiveTimeline().saveAsComplete( new HoodieInstant(true, INDEXING_ACTION, indexInstant.getTimestamp()), TimelineMetadataUtils.serializeIndexCommitMetadata(indexCommitMetadata)); } finally { - txnManager.endTransaction(); + txnManager.endTransaction(Option.of(indexInstant)); } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java index 22f8017841..6573560e75 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java @@ -69,20 +69,28 @@ public class TestTransactionManager extends HoodieCommonTestHarness { @Test public void testSingleWriterTransaction() { - transactionManager.beginTransaction(); - transactionManager.endTransaction(); + Option<HoodieInstant> lastCompletedInstant = getInstant("0000001"); + Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002"); + transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant); + transactionManager.endTransaction(newTxnOwnerInstant); } @Test public void testSingleWriterNestedTransaction() { - transactionManager.beginTransaction(); + Option<HoodieInstant> lastCompletedInstant = getInstant("0000001"); + Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002"); + transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant); + + Option<HoodieInstant> lastCompletedInstant1 = getInstant("0000003"); + Option<HoodieInstant> newTxnOwnerInstant1 = getInstant("0000004"); + assertThrows(HoodieLockException.class, () -> { - transactionManager.beginTransaction(); + transactionManager.beginTransaction(newTxnOwnerInstant1, lastCompletedInstant1); }); - transactionManager.endTransaction(); + transactionManager.endTransaction(newTxnOwnerInstant); assertDoesNotThrow(() -> { - transactionManager.endTransaction(); + transactionManager.endTransaction(newTxnOwnerInstant1); }); } @@ -94,11 +102,16 @@ public class TestTransactionManager extends HoodieCommonTestHarness { final AtomicBoolean writer1Completed = new AtomicBoolean(false); final AtomicBoolean writer2Completed = new AtomicBoolean(false); + Option<HoodieInstant> lastCompletedInstant1 = getInstant("0000001"); + Option<HoodieInstant> newTxnOwnerInstant1 = getInstant("0000002"); + Option<HoodieInstant> lastCompletedInstant2 = getInstant("0000003"); + Option<HoodieInstant> newTxnOwnerInstant2 = getInstant("0000004"); + // Let writer1 get the lock first, then wait for others // to join the sync up point. Thread writer1 = new Thread(() -> { assertDoesNotThrow(() -> { - transactionManager.beginTransaction(); + transactionManager.beginTransaction(newTxnOwnerInstant1, lastCompletedInstant1); }); latch.countDown(); try { @@ -111,7 +124,7 @@ public class TestTransactionManager extends HoodieCommonTestHarness { // } assertDoesNotThrow(() -> { - transactionManager.endTransaction(); + transactionManager.endTransaction(newTxnOwnerInstant1); }); writer1Completed.set(true); }); @@ -127,10 +140,10 @@ public class TestTransactionManager extends HoodieCommonTestHarness { // } assertDoesNotThrow(() -> { - transactionManager.beginTransaction(); + transactionManager.beginTransaction(newTxnOwnerInstant2, lastCompletedInstant2); }); assertDoesNotThrow(() -> { - transactionManager.endTransaction(); + transactionManager.endTransaction(newTxnOwnerInstant2); }); writer2Completed.set(true); }); @@ -152,6 +165,32 @@ public class TestTransactionManager extends HoodieCommonTestHarness { Assertions.assertTrue(writer2Completed.get()); } + @Test + public void testEndTransactionByDiffOwner() throws InterruptedException { + // 1. Begin and end by the same transaction owner + Option<HoodieInstant> lastCompletedInstant = getInstant("0000001"); + Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002"); + transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant); + + CountDownLatch countDownLatch = new CountDownLatch(1); + // Another writer thread + Thread writer2 = new Thread(() -> { + Option<HoodieInstant> newTxnOwnerInstant1 = getInstant("0000003"); + transactionManager.endTransaction(newTxnOwnerInstant1); + countDownLatch.countDown(); + }); + + writer2.start(); + countDownLatch.await(30, TimeUnit.SECONDS); + // should not have reset the state within transaction manager since the owner is different. + Assertions.assertTrue(transactionManager.getCurrentTransactionOwner().isPresent()); + Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner().isPresent()); + + transactionManager.endTransaction(newTxnOwnerInstant); + Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent()); + Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent()); + } + @Test public void testTransactionsWithInstantTime() { // 1. Begin and end by the same transaction owner @@ -164,14 +203,15 @@ public class TestTransactionManager extends HoodieCommonTestHarness { Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent()); Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent()); - // 2. Begin transaction with a new txn owner, but end transaction with no/wrong owner + // 2. Begin transaction with a new txn owner, but end transaction with wrong owner lastCompletedInstant = getInstant("0000002"); newTxnOwnerInstant = getInstant("0000003"); transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant); - transactionManager.endTransaction(); + transactionManager.endTransaction(getInstant("0000004")); // Owner reset would not happen as the end txn was invoked with an incorrect current txn owner Assertions.assertTrue(transactionManager.getCurrentTransactionOwner() == newTxnOwnerInstant); Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner() == lastCompletedInstant); + transactionManager.endTransaction(newTxnOwnerInstant); // 3. But, we should be able to begin a new transaction for a new owner lastCompletedInstant = getInstant("0000003"); @@ -183,15 +223,7 @@ public class TestTransactionManager extends HoodieCommonTestHarness { Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent()); Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent()); - // 4. Transactions with no owners should also go through - transactionManager.beginTransaction(); - Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent()); - Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent()); - transactionManager.endTransaction(); - Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent()); - Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent()); - - // 5. Transactions with new instants but with same timestamps should properly reset owners + // 4. Transactions with new instants but with same timestamps should properly reset owners transactionManager.beginTransaction(getInstant("0000005"), Option.empty()); Assertions.assertTrue(transactionManager.getCurrentTransactionOwner().isPresent()); Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());