vldpyatkov commented on code in PR #5383:
URL: https://github.com/apache/ignite-3/pull/5383#discussion_r2018698606


##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java:
##########
@@ -391,24 +413,47 @@ public static TableNotFoundException 
tableIdNotFoundException(Integer tableId) {
      * @param in Unpacker.
      * @param out Packer.
      * @param resources Resource registry.
+     * @param txManager Tx manager.
      * @return Transaction, if present, or null.
      */
     public static @Nullable InternalTransaction readTx(
             ClientMessageUnpacker in,
             ClientMessagePacker out,
-            ClientResourceRegistry resources
+            ClientResourceRegistry resources,
+            @Nullable TxManager txManager
     ) {
         if (in.tryUnpackNil()) {
             return null;
         }
 
         try {
-            var tx = 
resources.get(in.unpackLong()).get(InternalTransaction.class);
+            long id = in.unpackLong();
+            if (id == 0) {

Review Comment:
   Probably we also have to check features here.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java:
##########
@@ -546,4 +546,6 @@ Publisher<BinaryRow> lookup(
      * @return Streamer receiver runner.
      */
     StreamerReceiverRunner streamerReceiverRunner();
+
+    boolean mergeEnlistment(int partId, String consistentId, long token, 
InternalTransaction tx, boolean commit);

Review Comment:
   This method looks alien for the table class.
   Would it be possible to move this logic into the transaction class?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistry.java:
##########
@@ -136,20 +150,20 @@ void abortAllRegistered() {
     }
 
     void unregister(InternalTransaction tx) {
-        Long expirationTime = expirationTimeByTx.remove(tx);
+        unregister(tx, physicalExpirationTimeMillis(tx.startTimestamp(), 
tx.getTimeout()));
+    }
 
-        if (expirationTime != null) {
-            txsByExpirationTime.computeIfPresent(expirationTime, (k, txOrSet) 
-> {
-                if (txOrSet instanceof Set) {
-                    Set<InternalTransaction> set = (Set<InternalTransaction>) 
txOrSet;
+    void unregister(InternalTransaction tx, long expirationTime) {
+        txsByExpirationTime.computeIfPresent(expirationTime, (k, txOrSet) -> {
+            if (txOrSet instanceof Set) {
+                Set<InternalTransaction> set = (Set<InternalTransaction>) 
txOrSet;
 
-                    set.remove(tx);
+                set.remove(tx);
 
-                    return set.size() == 1 ? set.iterator().next() : set;
-                } else {
-                    return null;
-                }
-            });
-        }
+                return set.size() == 1 ? set.iterator().next() : set;
+            } else {
+                return null;

Review Comment:
   Could you add an assert here that the removed transaction equals this tx?



##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java:
##########
@@ -291,20 +300,32 @@ public String toString() {
      *
      * @param tx Transaction.
      * @param out Packer.
+     * @param ctx Write context.
      */
-    public static void writeTx(@Nullable Transaction tx, PayloadOutputChannel 
out) {
+    public static void writeTx(@Nullable Transaction tx, PayloadOutputChannel 
out, @Nullable WriteContext ctx) {
+        // TODO write table id here.

Review Comment:
   TODO without a ticket.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -888,6 +873,48 @@ public int pending() {
         return startedTxs.intValue() - finishedTxs.intValue();
     }
 
+    @Override
+    public InternalTransaction beginRemote(UUID txId, TablePartitionId 
commitPartId, UUID coord, long token, long timeout) {
+        assert commitPartId.tableId() > 0 && commitPartId.partitionId() >= 0 : 
"Illegal condition for direct mapping: " + commitPartId;
+
+        // Switch to default timeout if needed.
+        timeout = timeout == USE_CONFIGURED_TIMEOUT_DEFAULT ? 
txConfig.readWriteTimeout().value() : timeout;

Review Comment:
   So a remote transaction timeout is always more than the original one.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -888,6 +873,48 @@ public int pending() {
         return startedTxs.intValue() - finishedTxs.intValue();
     }
 
+    @Override
+    public InternalTransaction beginRemote(UUID txId, TablePartitionId 
commitPartId, UUID coord, long token, long timeout) {
+        assert commitPartId.tableId() > 0 && commitPartId.partitionId() >= 0 : 
"Illegal condition for direct mapping: " + commitPartId;
+
+        // Switch to default timeout if needed.
+        timeout = timeout == USE_CONFIGURED_TIMEOUT_DEFAULT ? 
txConfig.readWriteTimeout().value() : timeout;
+
+        // Adjust the timeout so local expiration happens after coordinator 
expiration.
+        var tx = new RemoteReadWriteTransaction(txId, commitPartId, coord, 
token, topologyService.localMember(),

Review Comment:
   I would like to see a usual class instead of an inlined one.



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TransactionTracker.java:
##########
@@ -24,7 +24,7 @@
  */
 public interface TransactionTracker {
     /** Registers given transaction within the tracker. */ 
-    boolean register(UUID txId, boolean readOnly);
+    boolean register(UUID txId, boolean readOnly); // TODO FORCE READ ONLY HERE

Review Comment:
   Unclear comment.



##########
modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java:
##########
@@ -176,4 +320,89 @@ static IgniteException 
unsupportedTxTypeException(Transaction tx) {
     private void setState(int state) {
         this.state.compareAndExchange(STATE_OPEN, state);
     }
+
+    private void checkEnlistPossible() {
+        if (finishFut.get() != null) {
+            throw new TransactionException(TX_ALREADY_FINISHED_ERR, 
format("Transaction is already finished [tx={}].", this));
+        }
+    }
+
+    /**
+     * Enlists a write operation in direct mapping.
+     *
+     * @param opChannel Operation channge.
+     * @param ctx The context.
+     *
+     * @return The future.
+     */
+    public CompletableFuture<Void> enlistFuture(ClientChannel opChannel, 
WriteContext ctx) {
+        // Check if direct mapping is applicable.
+        if (ctx.pm != null && 
ctx.pm.node().equals(opChannel.protocolContext().clusterNode().name()) && 
hasCommitPartition()) {
+            if (!enlistPartitionLock.readLock().tryLock()) {
+                throw new TransactionException(TX_ALREADY_FINISHED_ERR, 
format("Transaction is already finished [tx={}].", this));
+            }
+
+            checkEnlistPossible();
+
+            boolean[] first = {false};
+
+            TablePartitionId tablePartitionId = new 
TablePartitionId(ctx.pm.tableId(), ctx.pm.partition());
+
+            CompletableFuture<IgniteBiTuple<String, Long>> fut = 
enlisted.compute(tablePartitionId, (k, v) -> {
+                if (v == null) {
+                    first[0] = true;
+                    return new CompletableFuture<>();
+                } else {
+                    return v;
+                }
+            });
+
+            enlistPartitionLock.readLock().unlock();
+
+            // Re-check after unlock.
+            checkEnlistPossible();
+
+            if (first[0]) {
+                ctx.enlistmentToken = 0L;
+                // For the first request return completed future.
+                return nullCompletedFuture();
+            } else {
+                return fut.thenAccept(tup -> ctx.enlistmentToken = tup.get2());
+            }
+        }
+
+        return nullCompletedFuture();
+    }
+
+    /**
+     * Tries to finish existing enlistment.
+     *
+     * @param pm Partition mapping.
+     * @param consistentId Consistent id.
+     * @param token Enlistment token.
+     */
+    public void tryFinishEnlist(PartitionMapping pm, String consistentId, long 
token) {
+        if (!hasCommitPartition()) {
+            return;
+        }
+
+        // TODO avoid new object.

Review Comment:
   Need a ticket here or remove this comment.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java:
##########
@@ -546,4 +546,6 @@ Publisher<BinaryRow> lookup(
      * @return Streamer receiver runner.
      */
     StreamerReceiverRunner streamerReceiverRunner();
+
+    boolean mergeEnlistment(int partId, String consistentId, long token, 
InternalTransaction tx, boolean commit);

Review Comment:
   Also, this method required a javadoc.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -1172,6 +1174,7 @@ public CompletableFuture<Void> upsert(BinaryRowEx row, 
@Nullable InternalTransac
                         .timestamp(txo.startTimestamp())
                         .full(txo.implicit())
                         .coordinatorId(txo.coordinatorId())
+                        .skipDelayedAck(txo.remote())

Review Comment:
   Why do we use it only for upsert operations?



##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java:
##########
@@ -391,24 +413,47 @@ public static TableNotFoundException 
tableIdNotFoundException(Integer tableId) {
      * @param in Unpacker.
      * @param out Packer.
      * @param resources Resource registry.
+     * @param txManager Tx manager.
      * @return Transaction, if present, or null.
      */
     public static @Nullable InternalTransaction readTx(
             ClientMessageUnpacker in,
             ClientMessagePacker out,
-            ClientResourceRegistry resources
+            ClientResourceRegistry resources,
+            @Nullable TxManager txManager
     ) {
         if (in.tryUnpackNil()) {
             return null;
         }
 
         try {
-            var tx = 
resources.get(in.unpackLong()).get(InternalTransaction.class);
+            long id = in.unpackLong();
+            if (id == 0) {
+                long token = in.unpackLong();
+                UUID txId = in.unpackUuid();
+                int commitTableId = in.unpackInt();
+                int commitPart = in.unpackInt();
+                UUID coord = in.unpackUuid();
+                long timeout = in.unpackLong();
+
+                InternalTransaction remote = txManager.beginRemote(txId, new 
TablePartitionId(commitTableId, commitPart),
+                        coord, token, timeout);
+
+                // Remote transaction will be synchronously rolled back if the 
timeout has exceeded.
+                if (remote.isRolledBackWithTimeoutExceeded()) {
+                    throw new 
TransactionException(TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR,
+                            "Transaction is already finished [tx=" + remote + 
"].");
+                }
+
+                return remote;
+            }
+
+            var tx = resources.get(id).get(InternalTransaction.class);
 
             if (tx != null && tx.isReadOnly()) {
                 // For read-only tx, override observable timestamp that we 
send to the client:
                 // use readTimestamp() instead of now().
-                out.meta(tx.readTimestamp());
+                out.meta(tx.readTimestamp()); // TODO REMOVE

Review Comment:
   https://issues.apache.org/jira/browse/IGNITE-24592



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to