vldpyatkov commented on code in PR #5383: URL: https://github.com/apache/ignite-3/pull/5383#discussion_r2018698606
########## modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java: ########## @@ -391,24 +413,47 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) { * @param in Unpacker. * @param out Packer. * @param resources Resource registry. + * @param txManager Tx manager. * @return Transaction, if present, or null. */ public static @Nullable InternalTransaction readTx( ClientMessageUnpacker in, ClientMessagePacker out, - ClientResourceRegistry resources + ClientResourceRegistry resources, + @Nullable TxManager txManager ) { if (in.tryUnpackNil()) { return null; } try { - var tx = resources.get(in.unpackLong()).get(InternalTransaction.class); + long id = in.unpackLong(); + if (id == 0) { Review Comment: Probably we also have to check features here. ########## modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java: ########## @@ -546,4 +546,6 @@ Publisher<BinaryRow> lookup( * @return Streamer receiver runner. */ StreamerReceiverRunner streamerReceiverRunner(); + + boolean mergeEnlistment(int partId, String consistentId, long token, InternalTransaction tx, boolean commit); Review Comment: This method looks alien for the table class. Would it be possible to move this logic into the transaction class? ########## modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistry.java: ########## @@ -136,20 +150,20 @@ void abortAllRegistered() { } void unregister(InternalTransaction tx) { - Long expirationTime = expirationTimeByTx.remove(tx); + unregister(tx, physicalExpirationTimeMillis(tx.startTimestamp(), tx.getTimeout())); + } - if (expirationTime != null) { - txsByExpirationTime.computeIfPresent(expirationTime, (k, txOrSet) -> { - if (txOrSet instanceof Set) { - Set<InternalTransaction> set = (Set<InternalTransaction>) txOrSet; + void unregister(InternalTransaction tx, long expirationTime) { + txsByExpirationTime.computeIfPresent(expirationTime, (k, txOrSet) -> { + if (txOrSet instanceof Set) { + Set<InternalTransaction> set = (Set<InternalTransaction>) txOrSet; - set.remove(tx); + set.remove(tx); - return set.size() == 1 ? set.iterator().next() : set; - } else { - return null; - } - }); - } + return set.size() == 1 ? set.iterator().next() : set; + } else { + return null; Review Comment: Could you add an assert here that the removed transaction equals this tx? ########## modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java: ########## @@ -291,20 +300,32 @@ public String toString() { * * @param tx Transaction. * @param out Packer. + * @param ctx Write context. */ - public static void writeTx(@Nullable Transaction tx, PayloadOutputChannel out) { + public static void writeTx(@Nullable Transaction tx, PayloadOutputChannel out, @Nullable WriteContext ctx) { + // TODO write table id here. Review Comment: TODO without a ticket. ########## modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java: ########## @@ -888,6 +873,48 @@ public int pending() { return startedTxs.intValue() - finishedTxs.intValue(); } + @Override + public InternalTransaction beginRemote(UUID txId, TablePartitionId commitPartId, UUID coord, long token, long timeout) { + assert commitPartId.tableId() > 0 && commitPartId.partitionId() >= 0 : "Illegal condition for direct mapping: " + commitPartId; + + // Switch to default timeout if needed. + timeout = timeout == USE_CONFIGURED_TIMEOUT_DEFAULT ? txConfig.readWriteTimeout().value() : timeout; Review Comment: So a remote transaction timeout is always more than the original one. ########## modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java: ########## @@ -888,6 +873,48 @@ public int pending() { return startedTxs.intValue() - finishedTxs.intValue(); } + @Override + public InternalTransaction beginRemote(UUID txId, TablePartitionId commitPartId, UUID coord, long token, long timeout) { + assert commitPartId.tableId() > 0 && commitPartId.partitionId() >= 0 : "Illegal condition for direct mapping: " + commitPartId; + + // Switch to default timeout if needed. + timeout = timeout == USE_CONFIGURED_TIMEOUT_DEFAULT ? txConfig.readWriteTimeout().value() : timeout; + + // Adjust the timeout so local expiration happens after coordinator expiration. + var tx = new RemoteReadWriteTransaction(txId, commitPartId, coord, token, topologyService.localMember(), Review Comment: I would like to see a usual class instead of an inlined one. ########## modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TransactionTracker.java: ########## @@ -24,7 +24,7 @@ */ public interface TransactionTracker { /** Registers given transaction within the tracker. */ - boolean register(UUID txId, boolean readOnly); + boolean register(UUID txId, boolean readOnly); // TODO FORCE READ ONLY HERE Review Comment: Unclear comment. ########## modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java: ########## @@ -176,4 +320,89 @@ static IgniteException unsupportedTxTypeException(Transaction tx) { private void setState(int state) { this.state.compareAndExchange(STATE_OPEN, state); } + + private void checkEnlistPossible() { + if (finishFut.get() != null) { + throw new TransactionException(TX_ALREADY_FINISHED_ERR, format("Transaction is already finished [tx={}].", this)); + } + } + + /** + * Enlists a write operation in direct mapping. + * + * @param opChannel Operation channge. + * @param ctx The context. + * + * @return The future. + */ + public CompletableFuture<Void> enlistFuture(ClientChannel opChannel, WriteContext ctx) { + // Check if direct mapping is applicable. + if (ctx.pm != null && ctx.pm.node().equals(opChannel.protocolContext().clusterNode().name()) && hasCommitPartition()) { + if (!enlistPartitionLock.readLock().tryLock()) { + throw new TransactionException(TX_ALREADY_FINISHED_ERR, format("Transaction is already finished [tx={}].", this)); + } + + checkEnlistPossible(); + + boolean[] first = {false}; + + TablePartitionId tablePartitionId = new TablePartitionId(ctx.pm.tableId(), ctx.pm.partition()); + + CompletableFuture<IgniteBiTuple<String, Long>> fut = enlisted.compute(tablePartitionId, (k, v) -> { + if (v == null) { + first[0] = true; + return new CompletableFuture<>(); + } else { + return v; + } + }); + + enlistPartitionLock.readLock().unlock(); + + // Re-check after unlock. + checkEnlistPossible(); + + if (first[0]) { + ctx.enlistmentToken = 0L; + // For the first request return completed future. + return nullCompletedFuture(); + } else { + return fut.thenAccept(tup -> ctx.enlistmentToken = tup.get2()); + } + } + + return nullCompletedFuture(); + } + + /** + * Tries to finish existing enlistment. + * + * @param pm Partition mapping. + * @param consistentId Consistent id. + * @param token Enlistment token. + */ + public void tryFinishEnlist(PartitionMapping pm, String consistentId, long token) { + if (!hasCommitPartition()) { + return; + } + + // TODO avoid new object. Review Comment: Need a ticket here or remove this comment. ########## modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java: ########## @@ -546,4 +546,6 @@ Publisher<BinaryRow> lookup( * @return Streamer receiver runner. */ StreamerReceiverRunner streamerReceiverRunner(); + + boolean mergeEnlistment(int partId, String consistentId, long token, InternalTransaction tx, boolean commit); Review Comment: Also, this method required a javadoc. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java: ########## @@ -1172,6 +1174,7 @@ public CompletableFuture<Void> upsert(BinaryRowEx row, @Nullable InternalTransac .timestamp(txo.startTimestamp()) .full(txo.implicit()) .coordinatorId(txo.coordinatorId()) + .skipDelayedAck(txo.remote()) Review Comment: Why do we use it only for upsert operations? ########## modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java: ########## @@ -391,24 +413,47 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) { * @param in Unpacker. * @param out Packer. * @param resources Resource registry. + * @param txManager Tx manager. * @return Transaction, if present, or null. */ public static @Nullable InternalTransaction readTx( ClientMessageUnpacker in, ClientMessagePacker out, - ClientResourceRegistry resources + ClientResourceRegistry resources, + @Nullable TxManager txManager ) { if (in.tryUnpackNil()) { return null; } try { - var tx = resources.get(in.unpackLong()).get(InternalTransaction.class); + long id = in.unpackLong(); + if (id == 0) { + long token = in.unpackLong(); + UUID txId = in.unpackUuid(); + int commitTableId = in.unpackInt(); + int commitPart = in.unpackInt(); + UUID coord = in.unpackUuid(); + long timeout = in.unpackLong(); + + InternalTransaction remote = txManager.beginRemote(txId, new TablePartitionId(commitTableId, commitPart), + coord, token, timeout); + + // Remote transaction will be synchronously rolled back if the timeout has exceeded. + if (remote.isRolledBackWithTimeoutExceeded()) { + throw new TransactionException(TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR, + "Transaction is already finished [tx=" + remote + "]."); + } + + return remote; + } + + var tx = resources.get(id).get(InternalTransaction.class); if (tx != null && tx.isReadOnly()) { // For read-only tx, override observable timestamp that we send to the client: // use readTimestamp() instead of now(). - out.meta(tx.readTimestamp()); + out.meta(tx.readTimestamp()); // TODO REMOVE Review Comment: https://issues.apache.org/jira/browse/IGNITE-24592 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org