sashapolo commented on code in PR #5187:
URL: https://github.com/apache/ignite-3/pull/5187#discussion_r1949280970


##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -505,6 +518,9 @@ private CompletableFuture<?> 
createZonePartitionReplicationNode(
                 rebalanceRetryDelayConfiguration
         );
 
+        var safeTimeTracker = new 
SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE);

Review Comment:
   These values are only needed for the `raftGroupListener`. Shall we move it 
into the same lambda below? Also, I think the lambda itself should be extracted 
into a method



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java:
##########
@@ -103,18 +134,23 @@ public void 
onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
     private void processWriteCommand(CommandClosure<WriteCommand> clo) {
         Command command = clo.command();
 
-        if (command instanceof FinishTxCommand) {
-            FinishTxCommand cmd = (FinishTxCommand) command;
+        long commandIndex = clo.index();
+        long commandTerm = clo.term();
+        @Nullable HybridTimestamp safeTimestamp = clo.safeTimestamp();

Review Comment:
   I don't think `@Nullable` annotation is needed here, it should be deduced 
automatically



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java:
##########
@@ -124,6 +160,23 @@ private void 
processWriteCommand(CommandClosure<WriteCommand> clo) {
 
             clo.result(null);
         }
+
+        // result == null means that the command either was not handled by 
anyone (and clo.result() is called) or
+        // that it was delegated to a table processor (which called 
clo.result()).
+        if (result != null) {
+            if (Boolean.TRUE.equals(result.get2())) {

Review Comment:
   This is weird, why did you write the comparison this way?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTxFinishMarker.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.listener.ReplicaListener;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Contains transaction finishing logic for partitions' {@link 
ReplicaListener} implementations.
+ */
+public class ReplicaTxFinishMarker {
+    private final TxManager txManager;
+
+    public ReplicaTxFinishMarker(TxManager txManager) {
+        this.txManager = txManager;
+    }
+
+    /**
+     * Marks the transaction as finished in local tx state map.
+     *
+     * @param txId Transaction id.
+     * @param txState Transaction state, must be either {@link 
TxState#COMMITTED} or {@link TxState#ABORTED}.
+     * @param commitTimestamp Commit timestamp ({@code null} when aborting).
+     */
+    public void markFinished(UUID txId, TxState txState, @Nullable 
HybridTimestamp commitTimestamp) {
+        assert isFinalState(txState) : "Unexpected state [txId=" + txId + ", 
txState=" + txState + ']';
+
+        txManager.updateTxMeta(txId, old -> new TxStateMeta(

Review Comment:
   Please see my comments in the other similar class



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import 
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandBuilder;
+import 
org.apache.ignite.internal.partition.replicator.raft.UnexpectedTransactionStateException;
+import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
+import 
org.apache.ignite.internal.partition.replicator.schemacompat.CompatValidationResult;
+import 
org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.schema.SchemaSyncService;
+import org.apache.ignite.internal.tx.IncompatibleSchemaAbortException;
+import 
org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException;
+import org.apache.ignite.internal.tx.TransactionResult;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Handles {@link TxFinishReplicaRequest}.
+ */
+public class TxFinishReplicaRequestHandler {
+    private static final IgniteLogger LOG = 
Loggers.forClass(TxFinishReplicaRequestHandler.class);
+
+    /** Factory to create RAFT command messages. */
+    private static final PartitionReplicationMessagesFactory 
PARTITION_REPLICATION_MESSAGES_FACTORY =
+            new PartitionReplicationMessagesFactory();
+
+    /** Factory for creating replica command messages. */
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
+    private final TxStatePartitionStorage txStatePartitionStorage;
+    private final ClockService clockService;
+    private final TxManager txManager;
+    private final ReplicationGroupId replicationGroupId;
+
+    private final SchemaCompatibilityValidator schemaCompatValidator;
+    private final ReliableCatalogVersions reliableCatalogVersions;
+    private final ReplicationRaftCommandApplicator raftCommandApplicator;
+    private final ReplicaTxFinishMarker replicaTxFinishMarker;
+
+
+    /** Constructor. */
+    public TxFinishReplicaRequestHandler(
+            TxStatePartitionStorage txStatePartitionStorage,
+            ClockService clockService,
+            TxManager txManager,
+            ValidationSchemasSource validationSchemasSource,
+            SchemaSyncService schemaSyncService,
+            CatalogService catalogService,
+            RaftCommandRunner raftCommandRunner,
+            ReplicationGroupId replicationGroupId
+    ) {
+        this.txStatePartitionStorage = txStatePartitionStorage;
+        this.clockService = clockService;
+        this.txManager = txManager;
+        this.replicationGroupId = replicationGroupId;
+
+        schemaCompatValidator = new 
SchemaCompatibilityValidator(validationSchemasSource, catalogService, 
schemaSyncService);
+        reliableCatalogVersions = new 
ReliableCatalogVersions(schemaSyncService, catalogService);
+        raftCommandApplicator = new 
ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
+        replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager);
+    }
+
+    /**
+     * Processes transaction finish request.
+     * <ol>
+     *     <li>Get commit timestamp from finish replica request.</li>
+     *     <li>If attempting a commit, validate commit (and, if not valid, 
switch to abort)</li>
+     *     <li>Run specific raft {@code FinishTxCommand} command, that will 
apply txn state to corresponding txStateStorage.</li>
+     *     <li>Send cleanup requests to all enlisted primary replicas.</li>
+     * </ol>
+     *
+     * @param request Transaction finish request.
+     * @return future result of the operation.
+     */
+    public CompletableFuture<TransactionResult> handle(TxFinishReplicaRequest 
request) {
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use 
ZonePartitionIdMessage and remove cast
+        Map<TablePartitionId, String> enlistedGroups = 
asTablePartitionIdStringMap(request.groups());
+
+        UUID txId = request.txId();
+
+        if (request.commit()) {
+            HybridTimestamp commitTimestamp = request.commitTimestamp();
+
+            return schemaCompatValidator.validateCommit(txId, 
enlistedGroups.keySet(), commitTimestamp)
+                    .thenCompose(validationResult ->
+                            finishAndCleanup(
+                                    enlistedGroups,
+                                    validationResult.isSuccessful(),
+                                    validationResult.isSuccessful() ? 
commitTimestamp : null,
+                                    txId
+                            ).thenApply(txResult -> {
+                                
throwIfSchemaValidationOnCommitFailed(validationResult, txResult);
+                                return txResult;
+                            }));
+        } else {
+            // Aborting.
+            return finishAndCleanup(enlistedGroups, false, null, txId);
+        }
+    }
+
+    private static Map<TablePartitionId, String> 
asTablePartitionIdStringMap(Map<TablePartitionIdMessage, String> messages) {
+        var result = new HashMap<TablePartitionId, String>(messages.size());
+
+        for (Entry<TablePartitionIdMessage, String> e : messages.entrySet()) {
+            result.put(e.getKey().asTablePartitionId(), e.getValue());
+        }
+
+        return result;
+    }
+
+    private CompletableFuture<TransactionResult> finishAndCleanup(
+            Map<TablePartitionId, String> enlistedPartitions,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp,
+            UUID txId
+    ) {
+        // Read TX state from the storage, we will need this state to check if 
the locks are released.
+        // Since this state is written only on the transaction finish (see 
PartitionListener.handleFinishTxCommand),
+        // the value of txMeta can be either null or COMMITTED/ABORTED. No 
other values is expected.

Review Comment:
   I don't understand. We don't expect any other values, but yet we later 
explicitly check for them in `transactionAlreadyFinished`. What am I missing?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import 
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandBuilder;
+import 
org.apache.ignite.internal.partition.replicator.raft.UnexpectedTransactionStateException;
+import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
+import 
org.apache.ignite.internal.partition.replicator.schemacompat.CompatValidationResult;
+import 
org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.schema.SchemaSyncService;
+import org.apache.ignite.internal.tx.IncompatibleSchemaAbortException;
+import 
org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException;
+import org.apache.ignite.internal.tx.TransactionResult;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Handles {@link TxFinishReplicaRequest}.
+ */
+public class TxFinishReplicaRequestHandler {
+    private static final IgniteLogger LOG = 
Loggers.forClass(TxFinishReplicaRequestHandler.class);
+
+    /** Factory to create RAFT command messages. */
+    private static final PartitionReplicationMessagesFactory 
PARTITION_REPLICATION_MESSAGES_FACTORY =
+            new PartitionReplicationMessagesFactory();
+
+    /** Factory for creating replica command messages. */
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
+    private final TxStatePartitionStorage txStatePartitionStorage;
+    private final ClockService clockService;
+    private final TxManager txManager;
+    private final ReplicationGroupId replicationGroupId;
+
+    private final SchemaCompatibilityValidator schemaCompatValidator;
+    private final ReliableCatalogVersions reliableCatalogVersions;
+    private final ReplicationRaftCommandApplicator raftCommandApplicator;
+    private final ReplicaTxFinishMarker replicaTxFinishMarker;
+
+
+    /** Constructor. */
+    public TxFinishReplicaRequestHandler(
+            TxStatePartitionStorage txStatePartitionStorage,
+            ClockService clockService,
+            TxManager txManager,
+            ValidationSchemasSource validationSchemasSource,
+            SchemaSyncService schemaSyncService,
+            CatalogService catalogService,
+            RaftCommandRunner raftCommandRunner,
+            ReplicationGroupId replicationGroupId
+    ) {
+        this.txStatePartitionStorage = txStatePartitionStorage;
+        this.clockService = clockService;
+        this.txManager = txManager;
+        this.replicationGroupId = replicationGroupId;
+
+        schemaCompatValidator = new 
SchemaCompatibilityValidator(validationSchemasSource, catalogService, 
schemaSyncService);
+        reliableCatalogVersions = new 
ReliableCatalogVersions(schemaSyncService, catalogService);
+        raftCommandApplicator = new 
ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
+        replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager);
+    }
+
+    /**
+     * Processes transaction finish request.
+     * <ol>
+     *     <li>Get commit timestamp from finish replica request.</li>
+     *     <li>If attempting a commit, validate commit (and, if not valid, 
switch to abort)</li>
+     *     <li>Run specific raft {@code FinishTxCommand} command, that will 
apply txn state to corresponding txStateStorage.</li>
+     *     <li>Send cleanup requests to all enlisted primary replicas.</li>
+     * </ol>
+     *
+     * @param request Transaction finish request.
+     * @return future result of the operation.
+     */
+    public CompletableFuture<TransactionResult> handle(TxFinishReplicaRequest 
request) {
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use 
ZonePartitionIdMessage and remove cast
+        Map<TablePartitionId, String> enlistedGroups = 
asTablePartitionIdStringMap(request.groups());
+
+        UUID txId = request.txId();
+
+        if (request.commit()) {
+            HybridTimestamp commitTimestamp = request.commitTimestamp();
+
+            return schemaCompatValidator.validateCommit(txId, 
enlistedGroups.keySet(), commitTimestamp)
+                    .thenCompose(validationResult ->
+                            finishAndCleanup(
+                                    enlistedGroups,
+                                    validationResult.isSuccessful(),
+                                    validationResult.isSuccessful() ? 
commitTimestamp : null,
+                                    txId
+                            ).thenApply(txResult -> {
+                                
throwIfSchemaValidationOnCommitFailed(validationResult, txResult);
+                                return txResult;
+                            }));
+        } else {
+            // Aborting.
+            return finishAndCleanup(enlistedGroups, false, null, txId);
+        }
+    }
+
+    private static Map<TablePartitionId, String> 
asTablePartitionIdStringMap(Map<TablePartitionIdMessage, String> messages) {
+        var result = new HashMap<TablePartitionId, String>(messages.size());
+
+        for (Entry<TablePartitionIdMessage, String> e : messages.entrySet()) {
+            result.put(e.getKey().asTablePartitionId(), e.getValue());
+        }
+
+        return result;
+    }
+
+    private CompletableFuture<TransactionResult> finishAndCleanup(
+            Map<TablePartitionId, String> enlistedPartitions,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp,
+            UUID txId
+    ) {
+        // Read TX state from the storage, we will need this state to check if 
the locks are released.
+        // Since this state is written only on the transaction finish (see 
PartitionListener.handleFinishTxCommand),
+        // the value of txMeta can be either null or COMMITTED/ABORTED. No 
other values is expected.
+        TxMeta txMeta = txStatePartitionStorage.get(txId);
+
+        // Check whether a transaction has already been finished.
+        boolean transactionAlreadyFinished = txMeta != null && 
isFinalState(txMeta.txState());
+
+        if (transactionAlreadyFinished) {
+            // - The Coordinator calls use same tx state over retries, both 
abort and commit are possible.
+            // - Server side recovery may only change tx state to aborted.
+            // - The Coordinator itself should prevent user calls with 
different proposed state to the one,
+            //   that was already triggered (e.g. the client side -> 
txCoordinator.commitAsync(); txCoordinator.rollbackAsync()).
+            // - A coordinator might send a commit, then die, but the commit 
message might still arrive at the commit partition primary.
+            //   If it arrived with a delay, another node might come across a 
write intent/lock from that tx
+            //   and realize that the coordinator is no longer available and 
start tx recovery.
+            //   The original commit message might arrive later than the 
recovery one,
+            //   hence a 'commit over rollback' case.
+            // The possible states that a 'commit' is allowed to see:
+            // - null (if it's the first change state attempt)
+            // - committed (if it was already updated in the previous attempt)
+            // - aborted (if it was aborted by the initiate recovery logic,
+            //   though this is a very unlikely case because initiate recovery 
will only roll back the tx if coordinator is dead).
+            //
+            // Within 'roll back' it's allowed to see:
+            // - null (if it's the first change state attempt)
+            // - aborted  (if it was already updated in the previous attempt 
or the result of a concurrent recovery)
+            // - commit (if initiate recovery has started, but a delayed 
message from the coordinator finally arrived and executed earlier).
+
+            // Let the client know a transaction has finished with a different 
outcome.
+            if (commit != (txMeta.txState() == COMMITTED)) {
+                LOG.error("Failed to finish a transaction that is already 
finished [txId={}, expectedState={}, actualState={}].",
+                        txId,
+                        commit ? COMMITTED : ABORTED,
+                        txMeta.txState()
+                );
+
+                throw new MismatchingTransactionOutcomeInternalException(
+                        "Failed to change the outcome of a finished 
transaction [txId=" + txId + ", txState=" + txMeta.txState() + "].",
+                        new TransactionResult(txMeta.txState(), 
txMeta.commitTimestamp())
+                );
+            }
+
+            return completedFuture(new TransactionResult(txMeta.txState(), 
txMeta.commitTimestamp()));
+        }
+
+        return finishTransaction(enlistedPartitions.keySet(), txId, commit, 
commitTimestamp)
+                .thenCompose(txResult ->
+                    txManager.cleanup(replicationGroupId, enlistedPartitions, 
commit, commitTimestamp, txId)
+                            .thenApply(v -> txResult)
+                );
+    }
+
+    private static void 
throwIfSchemaValidationOnCommitFailed(CompatValidationResult validationResult, 
TransactionResult txResult) {
+        if (!validationResult.isSuccessful()) {
+            if (validationResult.isTableDropped()) {
+                throw new IncompatibleSchemaAbortException(
+                        format("Commit failed because a table was already 
dropped [table={}]", validationResult.failedTableName()),
+                        txResult
+                );
+            } else {
+                throw new IncompatibleSchemaAbortException(
+                        format(
+                                "Commit failed because schema is not 
forward-compatible "
+                                        + "[fromSchemaVersion={}, 
toSchemaVersion={}, table={}, details={}]",
+                                validationResult.fromSchemaVersion(),
+                                validationResult.toSchemaVersion(),
+                                validationResult.failedTableName(),
+                                validationResult.details()
+                        ),
+                        txResult
+                );
+            }
+        }
+    }
+
+    /**
+     * Finishes a transaction. This operation is idempotent.
+     *
+     * @param partitionIds Collection of enlisted partition groups.
+     * @param txId Transaction id.
+     * @param commit True is the transaction is committed, false otherwise.
+     * @param commitTimestamp Commit timestamp, if applicable.
+     * @return Future to wait of the finish.
+     */
+    private CompletableFuture<TransactionResult> finishTransaction(
+            Collection<TablePartitionId> partitionIds,
+            UUID txId,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        assert !(commit && commitTimestamp == null) : "Cannot commit without 
the timestamp.";
+
+        HybridTimestamp tsForCatalogVersion = commit ? commitTimestamp : 
clockService.now();
+
+        return reliableCatalogVersionFor(tsForCatalogVersion)
+                .thenCompose(catalogVersion -> applyFinishCommand(
+                                txId,
+                                commit,
+                                commitTimestamp,
+                                catalogVersion,
+                                toPartitionIdMessage(partitionIds)
+                        )
+                )
+                .handle((txOutcome, ex) -> {
+                    if (ex != null) {
+                        // RAFT 'finish' command failed because the state has 
already been written by someone else.
+                        // In that case we throw a corresponding exception.
+                        if (ex instanceof UnexpectedTransactionStateException) 
{
+                            UnexpectedTransactionStateException utse = 
(UnexpectedTransactionStateException) ex;
+                            TransactionResult result = 
utse.transactionResult();
+
+                            replicaTxFinishMarker.markFinished(txId, 
result.transactionState(), result.commitTimestamp());
+
+                            throw new 
MismatchingTransactionOutcomeInternalException(utse.getMessage(), 
utse.transactionResult());
+                        }
+                        // Otherwise we convert from the internal exception to 
the client one.
+                        throw new TransactionException(commit ? TX_COMMIT_ERR 
: TX_ROLLBACK_ERR, ex);
+                    }
+
+                    TransactionResult result = (TransactionResult) txOutcome;
+
+                    replicaTxFinishMarker.markFinished(txId, 
result.transactionState(), result.commitTimestamp());
+
+                    return result;
+                });
+    }
+
+    private CompletableFuture<Integer> 
reliableCatalogVersionFor(HybridTimestamp ts) {
+        return reliableCatalogVersions.reliableCatalogVersionFor(ts);
+    }
+
+    private CompletableFuture<Object> applyFinishCommand(
+            UUID transactionId,
+            boolean commit,
+            HybridTimestamp commitTimestamp,

Review Comment:
   ```suggestion
               @Nullable HybridTimestamp commitTimestamp,
   ```



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ResultWrapper.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import org.apache.ignite.internal.raft.Command;
+
+/**
+ * Wrapper for the update(All)Command processing result that besides result 
itself stores actual command that was processed.
+ */
+public class ResultWrapper<T> {
+    private final Command command;
+    private final T result;
+
+    public ResultWrapper(Command command, T result) {
+        this.command = command;
+        this.result = result;
+    }
+
+    public Command getCommand() {

Review Comment:
   Why is this needed? Did you extract it from somewhere and will be used in 
some upcoming code?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTxFinishMarker.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft;
+
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Contains transaction finishing logic for partitions Raft listener 
implementations.
+ */
+public class RaftTxFinishMarker {
+    private final TxManager txManager;
+
+    public RaftTxFinishMarker(TxManager txManager) {
+        this.txManager = txManager;
+    }
+
+    /**
+     * Marks the transaction as finished in local tx state map.
+     *
+     * @param txId Transaction id.
+     * @param commit Whether this is a commit.
+     * @param commitTimestamp Commit timestamp ({@code null} when aborting).
+     * @param commitPartitionId Commit partition ID.
+     */
+    public void markFinished(
+            UUID txId,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp,
+            @Nullable TablePartitionId commitPartitionId
+    ) {
+        txManager.updateTxMeta(txId, old -> new TxStateMeta(
+                commit ? COMMITTED : ABORTED,

Review Comment:
   Shall we pass this state directly, instead of the flag?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ResultWrapper.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import org.apache.ignite.internal.raft.Command;
+
+/**
+ * Wrapper for the update(All)Command processing result that besides result 
itself stores actual command that was processed.

Review Comment:
   If this is only for update commands, should we call it `UpdateResultWrapper`?



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -735,7 +735,7 @@ public CompletableFuture<Replica> startReplica(
             Function<RaftGroupService, ReplicaListener> listenerFactory,
             SnapshotStorageFactory snapshotStorageFactory,
             PeersAndLearners newConfiguration,
-            RaftGroupListener raftGroupListener,
+            Supplier<RaftGroupListener> raftGroupListenerFactory,

Review Comment:
   Why is this change needed? You immediately call `.get` on it



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTxFinishMarker.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft;
+
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Contains transaction finishing logic for partitions Raft listener 
implementations.
+ */
+public class RaftTxFinishMarker {
+    private final TxManager txManager;
+
+    public RaftTxFinishMarker(TxManager txManager) {
+        this.txManager = txManager;
+    }
+
+    /**
+     * Marks the transaction as finished in local tx state map.
+     *
+     * @param txId Transaction id.
+     * @param commit Whether this is a commit.
+     * @param commitTimestamp Commit timestamp ({@code null} when aborting).
+     * @param commitPartitionId Commit partition ID.
+     */
+    public void markFinished(
+            UUID txId,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp,
+            @Nullable TablePartitionId commitPartitionId
+    ) {
+        txManager.updateTxMeta(txId, old -> new TxStateMeta(
+                commit ? COMMITTED : ABORTED,
+                old == null ? null : old.txCoordinatorId(),
+                old == null ? commitPartitionId : old.commitPartitionId(),
+                commit ? commitTimestamp : null,

Review Comment:
   This check is redundant



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTxFinishMarker.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft;
+
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Contains transaction finishing logic for partitions Raft listener 
implementations.
+ */
+public class RaftTxFinishMarker {
+    private final TxManager txManager;
+
+    public RaftTxFinishMarker(TxManager txManager) {
+        this.txManager = txManager;
+    }
+
+    /**
+     * Marks the transaction as finished in local tx state map.
+     *
+     * @param txId Transaction id.
+     * @param commit Whether this is a commit.
+     * @param commitTimestamp Commit timestamp ({@code null} when aborting).
+     * @param commitPartitionId Commit partition ID.
+     */
+    public void markFinished(
+            UUID txId,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp,
+            @Nullable TablePartitionId commitPartitionId
+    ) {
+        txManager.updateTxMeta(txId, old -> new TxStateMeta(
+                commit ? COMMITTED : ABORTED,
+                old == null ? null : old.txCoordinatorId(),

Review Comment:
   I would suggest to extract one big `if (old == null)`



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java:
##########
@@ -103,18 +134,23 @@ public void 
onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
     private void processWriteCommand(CommandClosure<WriteCommand> clo) {
         Command command = clo.command();
 
-        if (command instanceof FinishTxCommand) {
-            FinishTxCommand cmd = (FinishTxCommand) command;
+        long commandIndex = clo.index();
+        long commandTerm = clo.term();
+        @Nullable HybridTimestamp safeTimestamp = clo.safeTimestamp();
+        assert safeTimestamp == null || command instanceof 
SafeTimePropagatingCommand : command;
+
+        IgniteBiTuple<Serializable, Boolean> result = null;

Review Comment:
   Can we do something with this ugly `IgniteBiTuple`?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import 
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandBuilder;
+import 
org.apache.ignite.internal.partition.replicator.raft.UnexpectedTransactionStateException;
+import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
+import 
org.apache.ignite.internal.partition.replicator.schemacompat.CompatValidationResult;
+import 
org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.schema.SchemaSyncService;
+import org.apache.ignite.internal.tx.IncompatibleSchemaAbortException;
+import 
org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException;
+import org.apache.ignite.internal.tx.TransactionResult;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Handles {@link TxFinishReplicaRequest}.
+ */
+public class TxFinishReplicaRequestHandler {
+    private static final IgniteLogger LOG = 
Loggers.forClass(TxFinishReplicaRequestHandler.class);
+
+    /** Factory to create RAFT command messages. */
+    private static final PartitionReplicationMessagesFactory 
PARTITION_REPLICATION_MESSAGES_FACTORY =
+            new PartitionReplicationMessagesFactory();
+
+    /** Factory for creating replica command messages. */
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
+    private final TxStatePartitionStorage txStatePartitionStorage;
+    private final ClockService clockService;
+    private final TxManager txManager;
+    private final ReplicationGroupId replicationGroupId;
+
+    private final SchemaCompatibilityValidator schemaCompatValidator;
+    private final ReliableCatalogVersions reliableCatalogVersions;
+    private final ReplicationRaftCommandApplicator raftCommandApplicator;
+    private final ReplicaTxFinishMarker replicaTxFinishMarker;
+
+
+    /** Constructor. */
+    public TxFinishReplicaRequestHandler(
+            TxStatePartitionStorage txStatePartitionStorage,
+            ClockService clockService,
+            TxManager txManager,
+            ValidationSchemasSource validationSchemasSource,
+            SchemaSyncService schemaSyncService,
+            CatalogService catalogService,
+            RaftCommandRunner raftCommandRunner,
+            ReplicationGroupId replicationGroupId
+    ) {
+        this.txStatePartitionStorage = txStatePartitionStorage;
+        this.clockService = clockService;
+        this.txManager = txManager;
+        this.replicationGroupId = replicationGroupId;
+
+        schemaCompatValidator = new 
SchemaCompatibilityValidator(validationSchemasSource, catalogService, 
schemaSyncService);
+        reliableCatalogVersions = new 
ReliableCatalogVersions(schemaSyncService, catalogService);
+        raftCommandApplicator = new 
ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
+        replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager);
+    }
+
+    /**
+     * Processes transaction finish request.
+     * <ol>
+     *     <li>Get commit timestamp from finish replica request.</li>
+     *     <li>If attempting a commit, validate commit (and, if not valid, 
switch to abort)</li>
+     *     <li>Run specific raft {@code FinishTxCommand} command, that will 
apply txn state to corresponding txStateStorage.</li>
+     *     <li>Send cleanup requests to all enlisted primary replicas.</li>
+     * </ol>
+     *
+     * @param request Transaction finish request.
+     * @return future result of the operation.
+     */
+    public CompletableFuture<TransactionResult> handle(TxFinishReplicaRequest 
request) {
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use 
ZonePartitionIdMessage and remove cast
+        Map<TablePartitionId, String> enlistedGroups = 
asTablePartitionIdStringMap(request.groups());
+
+        UUID txId = request.txId();
+
+        if (request.commit()) {
+            HybridTimestamp commitTimestamp = request.commitTimestamp();
+
+            return schemaCompatValidator.validateCommit(txId, 
enlistedGroups.keySet(), commitTimestamp)
+                    .thenCompose(validationResult ->
+                            finishAndCleanup(
+                                    enlistedGroups,
+                                    validationResult.isSuccessful(),
+                                    validationResult.isSuccessful() ? 
commitTimestamp : null,
+                                    txId
+                            ).thenApply(txResult -> {
+                                
throwIfSchemaValidationOnCommitFailed(validationResult, txResult);
+                                return txResult;
+                            }));
+        } else {
+            // Aborting.
+            return finishAndCleanup(enlistedGroups, false, null, txId);
+        }
+    }
+
+    private static Map<TablePartitionId, String> 
asTablePartitionIdStringMap(Map<TablePartitionIdMessage, String> messages) {
+        var result = new HashMap<TablePartitionId, String>(messages.size());
+
+        for (Entry<TablePartitionIdMessage, String> e : messages.entrySet()) {
+            result.put(e.getKey().asTablePartitionId(), e.getValue());
+        }
+
+        return result;
+    }
+
+    private CompletableFuture<TransactionResult> finishAndCleanup(
+            Map<TablePartitionId, String> enlistedPartitions,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp,
+            UUID txId
+    ) {
+        // Read TX state from the storage, we will need this state to check if 
the locks are released.
+        // Since this state is written only on the transaction finish (see 
PartitionListener.handleFinishTxCommand),
+        // the value of txMeta can be either null or COMMITTED/ABORTED. No 
other values is expected.

Review Comment:
   ```suggestion
           // the value of txMeta can be either null or COMMITTED/ABORTED. No 
other values are expected.
   ```



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import 
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandBuilder;
+import 
org.apache.ignite.internal.partition.replicator.raft.UnexpectedTransactionStateException;
+import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
+import 
org.apache.ignite.internal.partition.replicator.schemacompat.CompatValidationResult;
+import 
org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.schema.SchemaSyncService;
+import org.apache.ignite.internal.tx.IncompatibleSchemaAbortException;
+import 
org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException;
+import org.apache.ignite.internal.tx.TransactionResult;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Handles {@link TxFinishReplicaRequest}.
+ */
+public class TxFinishReplicaRequestHandler {
+    private static final IgniteLogger LOG = 
Loggers.forClass(TxFinishReplicaRequestHandler.class);
+
+    /** Factory to create RAFT command messages. */
+    private static final PartitionReplicationMessagesFactory 
PARTITION_REPLICATION_MESSAGES_FACTORY =
+            new PartitionReplicationMessagesFactory();
+
+    /** Factory for creating replica command messages. */
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
+    private final TxStatePartitionStorage txStatePartitionStorage;
+    private final ClockService clockService;
+    private final TxManager txManager;
+    private final ReplicationGroupId replicationGroupId;
+
+    private final SchemaCompatibilityValidator schemaCompatValidator;
+    private final ReliableCatalogVersions reliableCatalogVersions;
+    private final ReplicationRaftCommandApplicator raftCommandApplicator;
+    private final ReplicaTxFinishMarker replicaTxFinishMarker;
+
+
+    /** Constructor. */
+    public TxFinishReplicaRequestHandler(
+            TxStatePartitionStorage txStatePartitionStorage,
+            ClockService clockService,
+            TxManager txManager,
+            ValidationSchemasSource validationSchemasSource,
+            SchemaSyncService schemaSyncService,
+            CatalogService catalogService,
+            RaftCommandRunner raftCommandRunner,
+            ReplicationGroupId replicationGroupId
+    ) {
+        this.txStatePartitionStorage = txStatePartitionStorage;
+        this.clockService = clockService;
+        this.txManager = txManager;
+        this.replicationGroupId = replicationGroupId;
+
+        schemaCompatValidator = new 
SchemaCompatibilityValidator(validationSchemasSource, catalogService, 
schemaSyncService);
+        reliableCatalogVersions = new 
ReliableCatalogVersions(schemaSyncService, catalogService);
+        raftCommandApplicator = new 
ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
+        replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager);
+    }
+
+    /**
+     * Processes transaction finish request.
+     * <ol>
+     *     <li>Get commit timestamp from finish replica request.</li>
+     *     <li>If attempting a commit, validate commit (and, if not valid, 
switch to abort)</li>
+     *     <li>Run specific raft {@code FinishTxCommand} command, that will 
apply txn state to corresponding txStateStorage.</li>
+     *     <li>Send cleanup requests to all enlisted primary replicas.</li>
+     * </ol>
+     *
+     * @param request Transaction finish request.
+     * @return future result of the operation.
+     */
+    public CompletableFuture<TransactionResult> handle(TxFinishReplicaRequest 
request) {
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use 
ZonePartitionIdMessage and remove cast
+        Map<TablePartitionId, String> enlistedGroups = 
asTablePartitionIdStringMap(request.groups());
+
+        UUID txId = request.txId();
+
+        if (request.commit()) {
+            HybridTimestamp commitTimestamp = request.commitTimestamp();
+
+            return schemaCompatValidator.validateCommit(txId, 
enlistedGroups.keySet(), commitTimestamp)
+                    .thenCompose(validationResult ->
+                            finishAndCleanup(
+                                    enlistedGroups,
+                                    validationResult.isSuccessful(),
+                                    validationResult.isSuccessful() ? 
commitTimestamp : null,
+                                    txId
+                            ).thenApply(txResult -> {
+                                
throwIfSchemaValidationOnCommitFailed(validationResult, txResult);
+                                return txResult;
+                            }));
+        } else {
+            // Aborting.
+            return finishAndCleanup(enlistedGroups, false, null, txId);
+        }
+    }
+
+    private static Map<TablePartitionId, String> 
asTablePartitionIdStringMap(Map<TablePartitionIdMessage, String> messages) {
+        var result = new HashMap<TablePartitionId, String>(messages.size());

Review Comment:
   you need to use `capacity` here



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import 
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandBuilder;
+import 
org.apache.ignite.internal.partition.replicator.raft.UnexpectedTransactionStateException;
+import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
+import 
org.apache.ignite.internal.partition.replicator.schemacompat.CompatValidationResult;
+import 
org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.schema.SchemaSyncService;
+import org.apache.ignite.internal.tx.IncompatibleSchemaAbortException;
+import 
org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException;
+import org.apache.ignite.internal.tx.TransactionResult;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Handles {@link TxFinishReplicaRequest}.
+ */
+public class TxFinishReplicaRequestHandler {
+    private static final IgniteLogger LOG = 
Loggers.forClass(TxFinishReplicaRequestHandler.class);
+
+    /** Factory to create RAFT command messages. */
+    private static final PartitionReplicationMessagesFactory 
PARTITION_REPLICATION_MESSAGES_FACTORY =
+            new PartitionReplicationMessagesFactory();
+
+    /** Factory for creating replica command messages. */
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
+    private final TxStatePartitionStorage txStatePartitionStorage;
+    private final ClockService clockService;
+    private final TxManager txManager;
+    private final ReplicationGroupId replicationGroupId;
+
+    private final SchemaCompatibilityValidator schemaCompatValidator;
+    private final ReliableCatalogVersions reliableCatalogVersions;
+    private final ReplicationRaftCommandApplicator raftCommandApplicator;
+    private final ReplicaTxFinishMarker replicaTxFinishMarker;
+
+
+    /** Constructor. */
+    public TxFinishReplicaRequestHandler(
+            TxStatePartitionStorage txStatePartitionStorage,
+            ClockService clockService,
+            TxManager txManager,
+            ValidationSchemasSource validationSchemasSource,
+            SchemaSyncService schemaSyncService,
+            CatalogService catalogService,
+            RaftCommandRunner raftCommandRunner,
+            ReplicationGroupId replicationGroupId
+    ) {
+        this.txStatePartitionStorage = txStatePartitionStorage;
+        this.clockService = clockService;
+        this.txManager = txManager;
+        this.replicationGroupId = replicationGroupId;
+
+        schemaCompatValidator = new 
SchemaCompatibilityValidator(validationSchemasSource, catalogService, 
schemaSyncService);
+        reliableCatalogVersions = new 
ReliableCatalogVersions(schemaSyncService, catalogService);
+        raftCommandApplicator = new 
ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
+        replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager);
+    }
+
+    /**
+     * Processes transaction finish request.
+     * <ol>
+     *     <li>Get commit timestamp from finish replica request.</li>
+     *     <li>If attempting a commit, validate commit (and, if not valid, 
switch to abort)</li>
+     *     <li>Run specific raft {@code FinishTxCommand} command, that will 
apply txn state to corresponding txStateStorage.</li>

Review Comment:
   What do you mean by `specific`?



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java:
##########
@@ -404,6 +420,99 @@ void testDataRebalance(boolean truncateRaftLog) throws 
Exception {
         assertThat(kvView2.getAll(null, data2.keySet()), is(data2));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void txFinishCommandGetsReplicated(boolean commit) throws Exception {
+        startCluster(3);
+
+        // Create a zone with a single partition on every node.
+        int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size());
+
+        int tableId1 = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1);
+        int tableId2 = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME2);
+
+        var zonePartitionId = new ZonePartitionId(zoneId, 0);
+
+        setupTableIdToZoneIdConverter(zonePartitionId, new 
TablePartitionId(tableId1, 0), new TablePartitionId(tableId2, 0));
+
+        cluster.forEach(Node::waitForMetadataCompletenessAtNow);
+
+        Node node = cluster.get(0);
+
+        setPrimaryReplica(node, zonePartitionId);
+
+        KeyValueView<Integer, Integer> kvView1 = 
node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, 
Integer.class);
+        KeyValueView<Integer, Integer> kvView2 = 
node.tableManager.table(TEST_TABLE_NAME2).keyValueView(Integer.class, 
Integer.class);
+
+        InternalTransaction transaction = 
unwrapInternalTransaction(node.transactions().begin());
+        kvView1.put(transaction, 42, 69);
+        kvView2.put(transaction, 142, 169);
+        if (commit) {
+            transaction.commit();
+        } else {
+            transaction.rollback();
+        }
+
+        for (Node currentNode : cluster) {
+            assertTrue(waitForCondition(
+                    () -> 
!txStatesInPartitionStorage(currentNode.txStatePartitionStorage(zoneId, 
0)).isEmpty(),
+                    SECONDS.toMillis(10)
+            ));
+        }
+
+        List<CountExpectation> expectations = new ArrayList<>();
+        for (int i = 0; i < cluster.size(); i++) {
+            Node currentNode = cluster.get(i);
+            expectations.add(new CountExpectation(

Review Comment:
   Why do we need this `CountExpectation` class? Can we just add a bunch of 
`Executables` directly?



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java:
##########
@@ -404,6 +420,99 @@ void testDataRebalance(boolean truncateRaftLog) throws 
Exception {
         assertThat(kvView2.getAll(null, data2.keySet()), is(data2));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void txFinishCommandGetsReplicated(boolean commit) throws Exception {
+        startCluster(3);
+
+        // Create a zone with a single partition on every node.
+        int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size());
+
+        int tableId1 = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1);
+        int tableId2 = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME2);
+
+        var zonePartitionId = new ZonePartitionId(zoneId, 0);
+
+        setupTableIdToZoneIdConverter(zonePartitionId, new 
TablePartitionId(tableId1, 0), new TablePartitionId(tableId2, 0));
+
+        cluster.forEach(Node::waitForMetadataCompletenessAtNow);
+
+        Node node = cluster.get(0);
+
+        setPrimaryReplica(node, zonePartitionId);
+
+        KeyValueView<Integer, Integer> kvView1 = 
node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, 
Integer.class);
+        KeyValueView<Integer, Integer> kvView2 = 
node.tableManager.table(TEST_TABLE_NAME2).keyValueView(Integer.class, 
Integer.class);
+
+        InternalTransaction transaction = 
unwrapInternalTransaction(node.transactions().begin());

Review Comment:
   Why do you need this unwrapping?



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java:
##########
@@ -404,6 +420,99 @@ void testDataRebalance(boolean truncateRaftLog) throws 
Exception {
         assertThat(kvView2.getAll(null, data2.keySet()), is(data2));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void txFinishCommandGetsReplicated(boolean commit) throws Exception {

Review Comment:
   I would suggest to extract this test in a separate class, because it will 
become super huge if we will be adding all commands here. But I don't insist.



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import 
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandBuilder;
+import 
org.apache.ignite.internal.partition.replicator.raft.UnexpectedTransactionStateException;
+import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
+import 
org.apache.ignite.internal.partition.replicator.schemacompat.CompatValidationResult;
+import 
org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.schema.SchemaSyncService;
+import org.apache.ignite.internal.tx.IncompatibleSchemaAbortException;
+import 
org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException;
+import org.apache.ignite.internal.tx.TransactionResult;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Handles {@link TxFinishReplicaRequest}.
+ */
+public class TxFinishReplicaRequestHandler {
+    private static final IgniteLogger LOG = 
Loggers.forClass(TxFinishReplicaRequestHandler.class);
+
+    /** Factory to create RAFT command messages. */
+    private static final PartitionReplicationMessagesFactory 
PARTITION_REPLICATION_MESSAGES_FACTORY =
+            new PartitionReplicationMessagesFactory();
+
+    /** Factory for creating replica command messages. */
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
+    private final TxStatePartitionStorage txStatePartitionStorage;
+    private final ClockService clockService;
+    private final TxManager txManager;
+    private final ReplicationGroupId replicationGroupId;
+
+    private final SchemaCompatibilityValidator schemaCompatValidator;
+    private final ReliableCatalogVersions reliableCatalogVersions;
+    private final ReplicationRaftCommandApplicator raftCommandApplicator;
+    private final ReplicaTxFinishMarker replicaTxFinishMarker;
+
+
+    /** Constructor. */
+    public TxFinishReplicaRequestHandler(
+            TxStatePartitionStorage txStatePartitionStorage,
+            ClockService clockService,
+            TxManager txManager,
+            ValidationSchemasSource validationSchemasSource,
+            SchemaSyncService schemaSyncService,
+            CatalogService catalogService,
+            RaftCommandRunner raftCommandRunner,
+            ReplicationGroupId replicationGroupId
+    ) {
+        this.txStatePartitionStorage = txStatePartitionStorage;
+        this.clockService = clockService;
+        this.txManager = txManager;
+        this.replicationGroupId = replicationGroupId;
+
+        schemaCompatValidator = new 
SchemaCompatibilityValidator(validationSchemasSource, catalogService, 
schemaSyncService);
+        reliableCatalogVersions = new 
ReliableCatalogVersions(schemaSyncService, catalogService);
+        raftCommandApplicator = new 
ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
+        replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager);
+    }
+
+    /**
+     * Processes transaction finish request.
+     * <ol>
+     *     <li>Get commit timestamp from finish replica request.</li>
+     *     <li>If attempting a commit, validate commit (and, if not valid, 
switch to abort)</li>
+     *     <li>Run specific raft {@code FinishTxCommand} command, that will 
apply txn state to corresponding txStateStorage.</li>
+     *     <li>Send cleanup requests to all enlisted primary replicas.</li>
+     * </ol>
+     *
+     * @param request Transaction finish request.
+     * @return future result of the operation.
+     */
+    public CompletableFuture<TransactionResult> handle(TxFinishReplicaRequest 
request) {
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use 
ZonePartitionIdMessage and remove cast
+        Map<TablePartitionId, String> enlistedGroups = 
asTablePartitionIdStringMap(request.groups());
+
+        UUID txId = request.txId();
+
+        if (request.commit()) {
+            HybridTimestamp commitTimestamp = request.commitTimestamp();
+
+            return schemaCompatValidator.validateCommit(txId, 
enlistedGroups.keySet(), commitTimestamp)
+                    .thenCompose(validationResult ->
+                            finishAndCleanup(
+                                    enlistedGroups,
+                                    validationResult.isSuccessful(),
+                                    validationResult.isSuccessful() ? 
commitTimestamp : null,
+                                    txId
+                            ).thenApply(txResult -> {
+                                
throwIfSchemaValidationOnCommitFailed(validationResult, txResult);
+                                return txResult;
+                            }));
+        } else {
+            // Aborting.
+            return finishAndCleanup(enlistedGroups, false, null, txId);
+        }
+    }
+
+    private static Map<TablePartitionId, String> 
asTablePartitionIdStringMap(Map<TablePartitionIdMessage, String> messages) {
+        var result = new HashMap<TablePartitionId, String>(messages.size());
+
+        for (Entry<TablePartitionIdMessage, String> e : messages.entrySet()) {
+            result.put(e.getKey().asTablePartitionId(), e.getValue());
+        }
+
+        return result;
+    }
+
+    private CompletableFuture<TransactionResult> finishAndCleanup(
+            Map<TablePartitionId, String> enlistedPartitions,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp,
+            UUID txId
+    ) {
+        // Read TX state from the storage, we will need this state to check if 
the locks are released.
+        // Since this state is written only on the transaction finish (see 
PartitionListener.handleFinishTxCommand),
+        // the value of txMeta can be either null or COMMITTED/ABORTED. No 
other values is expected.
+        TxMeta txMeta = txStatePartitionStorage.get(txId);
+
+        // Check whether a transaction has already been finished.
+        boolean transactionAlreadyFinished = txMeta != null && 
isFinalState(txMeta.txState());
+
+        if (transactionAlreadyFinished) {
+            // - The Coordinator calls use same tx state over retries, both 
abort and commit are possible.
+            // - Server side recovery may only change tx state to aborted.
+            // - The Coordinator itself should prevent user calls with 
different proposed state to the one,
+            //   that was already triggered (e.g. the client side -> 
txCoordinator.commitAsync(); txCoordinator.rollbackAsync()).
+            // - A coordinator might send a commit, then die, but the commit 
message might still arrive at the commit partition primary.
+            //   If it arrived with a delay, another node might come across a 
write intent/lock from that tx
+            //   and realize that the coordinator is no longer available and 
start tx recovery.
+            //   The original commit message might arrive later than the 
recovery one,
+            //   hence a 'commit over rollback' case.
+            // The possible states that a 'commit' is allowed to see:
+            // - null (if it's the first change state attempt)
+            // - committed (if it was already updated in the previous attempt)
+            // - aborted (if it was aborted by the initiate recovery logic,
+            //   though this is a very unlikely case because initiate recovery 
will only roll back the tx if coordinator is dead).
+            //
+            // Within 'roll back' it's allowed to see:
+            // - null (if it's the first change state attempt)
+            // - aborted  (if it was already updated in the previous attempt 
or the result of a concurrent recovery)
+            // - commit (if initiate recovery has started, but a delayed 
message from the coordinator finally arrived and executed earlier).
+
+            // Let the client know a transaction has finished with a different 
outcome.
+            if (commit != (txMeta.txState() == COMMITTED)) {
+                LOG.error("Failed to finish a transaction that is already 
finished [txId={}, expectedState={}, actualState={}].",
+                        txId,
+                        commit ? COMMITTED : ABORTED,
+                        txMeta.txState()
+                );
+
+                throw new MismatchingTransactionOutcomeInternalException(
+                        "Failed to change the outcome of a finished 
transaction [txId=" + txId + ", txState=" + txMeta.txState() + "].",
+                        new TransactionResult(txMeta.txState(), 
txMeta.commitTimestamp())
+                );
+            }
+
+            return completedFuture(new TransactionResult(txMeta.txState(), 
txMeta.commitTimestamp()));
+        }
+
+        return finishTransaction(enlistedPartitions.keySet(), txId, commit, 
commitTimestamp)
+                .thenCompose(txResult ->
+                    txManager.cleanup(replicationGroupId, enlistedPartitions, 
commit, commitTimestamp, txId)
+                            .thenApply(v -> txResult)
+                );
+    }
+
+    private static void 
throwIfSchemaValidationOnCommitFailed(CompatValidationResult validationResult, 
TransactionResult txResult) {
+        if (!validationResult.isSuccessful()) {
+            if (validationResult.isTableDropped()) {
+                throw new IncompatibleSchemaAbortException(
+                        format("Commit failed because a table was already 
dropped [table={}]", validationResult.failedTableName()),
+                        txResult
+                );
+            } else {
+                throw new IncompatibleSchemaAbortException(
+                        format(
+                                "Commit failed because schema is not 
forward-compatible "
+                                        + "[fromSchemaVersion={}, 
toSchemaVersion={}, table={}, details={}]",
+                                validationResult.fromSchemaVersion(),
+                                validationResult.toSchemaVersion(),
+                                validationResult.failedTableName(),
+                                validationResult.details()
+                        ),
+                        txResult
+                );
+            }
+        }
+    }
+
+    /**
+     * Finishes a transaction. This operation is idempotent.
+     *
+     * @param partitionIds Collection of enlisted partition groups.
+     * @param txId Transaction id.
+     * @param commit True is the transaction is committed, false otherwise.
+     * @param commitTimestamp Commit timestamp, if applicable.
+     * @return Future to wait of the finish.
+     */
+    private CompletableFuture<TransactionResult> finishTransaction(
+            Collection<TablePartitionId> partitionIds,
+            UUID txId,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        assert !(commit && commitTimestamp == null) : "Cannot commit without 
the timestamp.";
+
+        HybridTimestamp tsForCatalogVersion = commit ? commitTimestamp : 
clockService.now();
+
+        return reliableCatalogVersionFor(tsForCatalogVersion)
+                .thenCompose(catalogVersion -> applyFinishCommand(
+                                txId,
+                                commit,
+                                commitTimestamp,
+                                catalogVersion,
+                                toPartitionIdMessage(partitionIds)
+                        )

Review Comment:
   We can save us some indentation here:
   ```
   .thenCompose(catalogVersion -> applyFinishCommand(
           txId,
           commit,
           commitTimestamp,
           catalogVersion,
           toPartitionIdMessage(partitionIds)
   ))
   ```



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java:
##########
@@ -124,6 +160,23 @@ private void 
processWriteCommand(CommandClosure<WriteCommand> clo) {
 
             clo.result(null);
         }
+
+        // result == null means that the command either was not handled by 
anyone (and clo.result() is called) or
+        // that it was delegated to a table processor (which called 
clo.result()).
+        if (result != null) {
+            if (Boolean.TRUE.equals(result.get2())) {

Review Comment:
   Also, this and the previous conditions can be united



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to