rpuch commented on code in PR #5209: URL: https://github.com/apache/ignite-3/pull/5209#discussion_r1974946891
########## modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java: ########## @@ -62,18 +62,14 @@ static CompletableFuture<ClientTransaction> beginAsync( @Nullable String preferredNodeName, @Nullable TransactionOptions options, long observableTimestamp) { - if (options != null && options.timeoutMillis() != 0 && !options.readOnly()) { - // TODO: IGNITE-16193 - throw new UnsupportedOperationException("Timeouts are not supported yet for RW transactions"); - } - boolean readOnly = options != null && options.readOnly(); + long timeout = options == null ? 0 : options.timeoutMillis(); Review Comment: Does 0 mean 'use default from configuration'? If yes, could you please introduce a constant and use it instead of 0? ########## modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxTimeoutOneNodeTest.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import org.apache.ignite.Ignite; +import org.apache.ignite.internal.ClusterPerTestIntegrationTest; +import org.apache.ignite.lang.ErrorGroups.Transactions; +import org.apache.ignite.table.Table; +import org.apache.ignite.tx.Transaction; +import org.apache.ignite.tx.TransactionException; +import org.apache.ignite.tx.TransactionOptions; +import org.junit.jupiter.api.Test; + +abstract class ItTxTimeoutOneNodeTest extends ClusterPerTestIntegrationTest { + private static final String TABLE_NAME = "TEST"; + + @Override + protected int initialNodes() { + return 1; + } + + abstract Ignite ignite(); + + abstract InternalTransaction toInternalTransaction(Transaction tx); + + @Test + void roTransactionTimesOut() throws Exception { + Ignite ignite = ignite(); + + ignite.sql().executeScript("CREATE TABLE " + TABLE_NAME + " (ID INT PRIMARY KEY, VAL VARCHAR)"); Review Comment: Let's extract table creation (maybe something else) to a method (maybe even do it in a setup method), to avoid duplication ########## modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java: ########## @@ -47,9 +47,7 @@ public abstract class IgniteAbstractTransactionImpl implements InternalTransacti /** The tracker is used to track an observable timestamp. */ protected final HybridTimestampTracker observableTsTracker; - /** - * Transaction coordinator inconsistent ID. - */ + /** Transaction coordinator inconsistent ID. */ Review Comment: ```suggestion /** Transaction coordinator ephemeral ID. */ ``` 'Inconsistent' sounds like something causes troubles, and here it's just about something that can change. Although, if we change it, we might want to do it *consistently* (pun intended), not just here, so if it's used in more than a couple of places, maybe it's better to file a separate issue. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java: ########## @@ -397,6 +397,23 @@ private <R> CompletableFuture<R> enlistInTx( }).thenCompose(identity()); } + private long getTimeout(InternalTransaction tx) { + // TODO Review Comment: An issue should be mentioned, or TODO removed ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java: ########## @@ -775,6 +801,10 @@ private <T> CompletableFuture<T> postEnlist(CompletableFuture<T> fut, boolean au }).thenCompose(identity()); } + private static boolean isFinishedDueToTimeout(Throwable e) { + return e instanceof TransactionException && ((TransactionException) e).errorCode() == TX_ALREADY_FINISHED_ERR; Review Comment: The method name says that it's finished due to timeout, but `TX_ALREADY_FINISHED_ERR` does not mention 'due to timeout'. Could the transaction be finished normally? Isn't there a discrepancy here? ########## modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java: ########## @@ -146,8 +146,11 @@ default InternalTransaction beginExplicitRo(HybridTimestampTracker timestampTrac * @param txId Transaction id. * @param ts The timestamp which is associated to txn completion. * @param commit {@code True} if a commit requested. + * @param timeoutExceeded {@code True} if a timeout exceeded. MUST: 'commit' != 'timeoutExceeded'. Review Comment: `'commit' != 'timeoutExceeded'` - but can they both be `false`? ########## modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxTimeoutOneNodeTest.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import org.apache.ignite.Ignite; +import org.apache.ignite.internal.ClusterPerTestIntegrationTest; +import org.apache.ignite.lang.ErrorGroups.Transactions; +import org.apache.ignite.table.Table; +import org.apache.ignite.tx.Transaction; +import org.apache.ignite.tx.TransactionException; +import org.apache.ignite.tx.TransactionOptions; +import org.junit.jupiter.api.Test; + +abstract class ItTxTimeoutOneNodeTest extends ClusterPerTestIntegrationTest { + private static final String TABLE_NAME = "TEST"; + + @Override + protected int initialNodes() { + return 1; + } + + abstract Ignite ignite(); + + abstract InternalTransaction toInternalTransaction(Transaction tx); + + @Test + void roTransactionTimesOut() throws Exception { + Ignite ignite = ignite(); + + ignite.sql().executeScript("CREATE TABLE " + TABLE_NAME + " (ID INT PRIMARY KEY, VAL VARCHAR)"); + + Table table = ignite.tables().table(TABLE_NAME); + + Transaction roTx = ignite.transactions().begin(new TransactionOptions().readOnly(true).timeoutMillis(100)); + + // Make sure the RO tx actually begins on the server (as thin client transactions are lazy). + doGetOn(table, roTx); + + assertTrue( + waitForCondition(() -> toInternalTransaction(roTx).isFinishingOrFinished(), SECONDS.toMillis(10)), + "Transaction should have been finished due to timeout" + ); + + assertThrows(TransactionException.class, () -> doGetOn(table, roTx)); + // TODO: uncomment the following assert after IGNITE-24233 is fixed. + // assertThrows(TransactionException.class, roTx::commit); + } + + @Test + void readWriteTransactionTimesOut() throws InterruptedException { + Ignite ignite = ignite(); + + ignite.sql().executeScript("CREATE TABLE " + TABLE_NAME + " (ID INT PRIMARY KEY, VAL VARCHAR)"); + + Table table = ignite.tables().table(TABLE_NAME); + + Transaction rwTx = ignite.transactions().begin(new TransactionOptions().readOnly(false).timeoutMillis(5_000)); + + // Make sure the tx actually begins on the server (as thin client transactions are lazy). + doPutOn(table, rwTx); + + assertTrue( + waitForCondition(() -> toInternalTransaction(rwTx).isFinishingOrFinished(), SECONDS.toMillis(10)), + "Transaction should have been finished due to timeout" + ); + + assertThrows(TransactionException.class, () -> doGetOn(table, rwTx)); + // TODO: uncomment the following assert after IGNITE-24233 is fixed. + // assertThrows(TransactionException.class, roTx::commit); + } + + @Test + void timeoutExceptionHasCorrectCause() throws InterruptedException { + Ignite ignite = ignite(); + + ignite.sql().executeScript("CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (ID INT PRIMARY KEY, VAL VARCHAR)"); + + Table table = ignite.tables().table(TABLE_NAME); + + Transaction rwTx = ignite.transactions().begin(new TransactionOptions().readOnly(false).timeoutMillis(1_000)); + + // Wait for an exception. + assertTrue( + waitForCondition(() -> timeoutExceeded(table, rwTx), 10_000), + "Write operation should throw an exception with TX_TIMEOUT_EXCEEDED error code" + ); + + assertThrows(TransactionException.class, () -> doGetOn(table, rwTx)); + // TODO: uncomment the following assert after IGNITE-24233 is fixed. + // assertThrows(TransactionException.class, roTx::commit); Review Comment: These asserts don't seem to belong to this test. They make sure that a finished (due to timeout) transaction cannot be operated in, but this is already asserted by other tests. Current test is just about error code, so let's remove everything except this. ########## modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java: ########## @@ -416,34 +416,49 @@ private InternalTransaction beginBusy( if (readOnly) { HybridTimestamp beginTimestamp = clockService.now(); - - UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp, options.priority()); - - tx = beginReadOnlyTransaction(timestampTracker, beginTimestamp, txId, implicit, options); + tx = beginReadOnlyTransaction(timestampTracker, beginTimestamp, implicit, options); } else { HybridTimestamp beginTimestamp = createBeginTimestampWithIncrementRwTxCounter(); - - UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp, options.priority()); - - // TODO: RW timeouts will be supported in https://issues.apache.org/jira/browse/IGNITE-24244 - // long timeout = options.timeoutMillis() == 0 ? txConfig.readWriteTimeout().value() : options.timeoutMillis(); - long timeout = 3_000; - - tx = new ReadWriteTransactionImpl(this, timestampTracker, txId, localNodeId, implicit, timeout); + tx = beginReadWriteTransaction(timestampTracker, beginTimestamp, implicit, options); } txStateVolatileStorage.initialize(tx); return tx; } + private ReadWriteTransactionImpl beginReadWriteTransaction( + HybridTimestampTracker timestampTracker, + HybridTimestamp beginTimestamp, + boolean implicit, + InternalTxOptions options) { + + UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp, options.priority()); Review Comment: There seems to be quite a lot of duplication with the method that begins an RO transaction ########## modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxTimeoutOneNodeTest.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import org.apache.ignite.Ignite; +import org.apache.ignite.internal.ClusterPerTestIntegrationTest; +import org.apache.ignite.lang.ErrorGroups.Transactions; +import org.apache.ignite.table.Table; +import org.apache.ignite.tx.Transaction; +import org.apache.ignite.tx.TransactionException; +import org.apache.ignite.tx.TransactionOptions; +import org.junit.jupiter.api.Test; + +abstract class ItTxTimeoutOneNodeTest extends ClusterPerTestIntegrationTest { + private static final String TABLE_NAME = "TEST"; + + @Override + protected int initialNodes() { + return 1; + } + + abstract Ignite ignite(); + + abstract InternalTransaction toInternalTransaction(Transaction tx); + + @Test + void roTransactionTimesOut() throws Exception { + Ignite ignite = ignite(); + + ignite.sql().executeScript("CREATE TABLE " + TABLE_NAME + " (ID INT PRIMARY KEY, VAL VARCHAR)"); + + Table table = ignite.tables().table(TABLE_NAME); + + Transaction roTx = ignite.transactions().begin(new TransactionOptions().readOnly(true).timeoutMillis(100)); + + // Make sure the RO tx actually begins on the server (as thin client transactions are lazy). + doGetOn(table, roTx); + + assertTrue( + waitForCondition(() -> toInternalTransaction(roTx).isFinishingOrFinished(), SECONDS.toMillis(10)), + "Transaction should have been finished due to timeout" + ); + + assertThrows(TransactionException.class, () -> doGetOn(table, roTx)); + // TODO: uncomment the following assert after IGNITE-24233 is fixed. + // assertThrows(TransactionException.class, roTx::commit); + } + + @Test + void readWriteTransactionTimesOut() throws InterruptedException { + Ignite ignite = ignite(); + + ignite.sql().executeScript("CREATE TABLE " + TABLE_NAME + " (ID INT PRIMARY KEY, VAL VARCHAR)"); + + Table table = ignite.tables().table(TABLE_NAME); + + Transaction rwTx = ignite.transactions().begin(new TransactionOptions().readOnly(false).timeoutMillis(5_000)); Review Comment: We'll have to wait for at least 5 seconds for the transaction to timeout, this seems a lot for a test duration. Can this be reduced? For instance, the test for RO case uses a timeout of 100 milliseconds. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java: ########## @@ -397,6 +397,23 @@ private <R> CompletableFuture<R> enlistInTx( }).thenCompose(identity()); } + private long getTimeout(InternalTransaction tx) { + // TODO + if (tx.isReadOnly()) { + return tx.timeout(); + } + + if (tx.timeout() == 0) { + return 1_000; + } + + if (tx.implicit()) { + return 10_000; + } Review Comment: Looks like this is not finalized ########## modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java: ########## @@ -150,26 +150,39 @@ private void checkEnlistPossibility() { @Override public CompletableFuture<Void> commitAsync() { return TransactionsExceptionMapperUtil.convertToPublicFuture( - finish(true, null, false), + finish(true, null, false, false), TX_COMMIT_ERR ); } @Override public CompletableFuture<Void> rollbackAsync() { return TransactionsExceptionMapperUtil.convertToPublicFuture( - finish(false, null, false), + finish(false, null, false, false), TX_ROLLBACK_ERR ); } @Override - public CompletableFuture<Void> finish(boolean commit, @Nullable HybridTimestamp executionTimestamp, boolean full) { + public CompletableFuture<Void> rollbackTimeoutExceededAsync() { + return TransactionsExceptionMapperUtil.convertToPublicFuture( + finish(false, null, false, true) + .thenAccept(unused -> timeoutExceeded = true), + TX_ROLLBACK_ERR + ).thenAccept(unused -> this.timeoutExceeded = true); Review Comment: Why does this assignment happen after the finish gets replicated? Also, can there be a race with finish()? ########## modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java: ########## @@ -117,9 +117,10 @@ void enlist( * @param executionTimestamp The timestamp is the time when a read-only transaction is applied to the remote node. The parameter * is not used for read-write transactions. * @param full Full state transaction marker. + * @param timeoutExceeded Timeout exceeded flag (commit flag must be {@code false}). * @return The future. */ - CompletableFuture<Void> finish(boolean commit, @Nullable HybridTimestamp executionTimestamp, boolean full); + CompletableFuture<Void> finish(boolean commit, @Nullable HybridTimestamp executionTimestamp, boolean full, boolean timeoutExceeded); Review Comment: A transaction might be aborted for different reasons, at least 1. user request 2. an operation error 3. a timeout 4. schema change We currently don't record 1 and 2, and 4 is not implemented (but we might implement it later). 3 is the only abort reason that stands out now, but should we still represent it using an enum and not a boolean flag? ########## modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxTimeoutOneNodeTest.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import org.apache.ignite.Ignite; +import org.apache.ignite.internal.ClusterPerTestIntegrationTest; +import org.apache.ignite.lang.ErrorGroups.Transactions; +import org.apache.ignite.table.Table; +import org.apache.ignite.tx.Transaction; +import org.apache.ignite.tx.TransactionException; +import org.apache.ignite.tx.TransactionOptions; +import org.junit.jupiter.api.Test; + +abstract class ItTxTimeoutOneNodeTest extends ClusterPerTestIntegrationTest { + private static final String TABLE_NAME = "TEST"; + + @Override + protected int initialNodes() { + return 1; + } + + abstract Ignite ignite(); + + abstract InternalTransaction toInternalTransaction(Transaction tx); + + @Test + void roTransactionTimesOut() throws Exception { + Ignite ignite = ignite(); + + ignite.sql().executeScript("CREATE TABLE " + TABLE_NAME + " (ID INT PRIMARY KEY, VAL VARCHAR)"); + + Table table = ignite.tables().table(TABLE_NAME); + + Transaction roTx = ignite.transactions().begin(new TransactionOptions().readOnly(true).timeoutMillis(100)); + + // Make sure the RO tx actually begins on the server (as thin client transactions are lazy). + doGetOn(table, roTx); + + assertTrue( + waitForCondition(() -> toInternalTransaction(roTx).isFinishingOrFinished(), SECONDS.toMillis(10)), + "Transaction should have been finished due to timeout" + ); + + assertThrows(TransactionException.class, () -> doGetOn(table, roTx)); + // TODO: uncomment the following assert after IGNITE-24233 is fixed. + // assertThrows(TransactionException.class, roTx::commit); + } + + @Test + void readWriteTransactionTimesOut() throws InterruptedException { + Ignite ignite = ignite(); + + ignite.sql().executeScript("CREATE TABLE " + TABLE_NAME + " (ID INT PRIMARY KEY, VAL VARCHAR)"); + + Table table = ignite.tables().table(TABLE_NAME); + + Transaction rwTx = ignite.transactions().begin(new TransactionOptions().readOnly(false).timeoutMillis(5_000)); + + // Make sure the tx actually begins on the server (as thin client transactions are lazy). + doPutOn(table, rwTx); + + assertTrue( + waitForCondition(() -> toInternalTransaction(rwTx).isFinishingOrFinished(), SECONDS.toMillis(10)), + "Transaction should have been finished due to timeout" + ); + + assertThrows(TransactionException.class, () -> doGetOn(table, rwTx)); + // TODO: uncomment the following assert after IGNITE-24233 is fixed. + // assertThrows(TransactionException.class, roTx::commit); + } + + @Test + void timeoutExceptionHasCorrectCause() throws InterruptedException { + Ignite ignite = ignite(); + + ignite.sql().executeScript("CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (ID INT PRIMARY KEY, VAL VARCHAR)"); + + Table table = ignite.tables().table(TABLE_NAME); + + Transaction rwTx = ignite.transactions().begin(new TransactionOptions().readOnly(false).timeoutMillis(1_000)); + + // Wait for an exception. + assertTrue( Review Comment: Do we actually need to spam with puts till the transaction gets aborted? How about just waiting for 1 second (tx timeout) plus 1 millisecond and making sure a put causes an exception? ########## modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java: ########## @@ -113,22 +113,31 @@ public TablePartitionId commitPartition() { @Override public CompletableFuture<Void> commitAsync() { return TransactionsExceptionMapperUtil.convertToPublicFuture( - finish(true, readTimestamp, false), + finish(true, readTimestamp, false, false), TX_COMMIT_ERR ); } @Override public CompletableFuture<Void> rollbackAsync() { return TransactionsExceptionMapperUtil.convertToPublicFuture( - finish(false, readTimestamp, false), + finish(false, readTimestamp, false, false), TX_ROLLBACK_ERR ); } @Override - public CompletableFuture<Void> finish(boolean commit, HybridTimestamp executionTimestamp, boolean full) { + public CompletableFuture<Void> rollbackTimeoutExceededAsync() { + return TransactionsExceptionMapperUtil.convertToPublicFuture( + finish(false, readTimestamp, false, true), + TX_ROLLBACK_ERR + ).thenAccept(unused -> this.timeoutExceeded = true); Review Comment: Why does this assignment happen after the transaction finish completes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org