sashapolo commented on code in PR #5187: URL: https://github.com/apache/ignite-3/pull/5187#discussion_r1949280970
########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java: ########## @@ -505,6 +518,9 @@ private CompletableFuture<?> createZonePartitionReplicationNode( rebalanceRetryDelayConfiguration ); + var safeTimeTracker = new SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE); Review Comment: These values are only needed for the `raftGroupListener`. Shall we move it into the same lambda below? Also, I think the lambda itself should be extracted into a method ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java: ########## @@ -103,18 +134,23 @@ public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) { private void processWriteCommand(CommandClosure<WriteCommand> clo) { Command command = clo.command(); - if (command instanceof FinishTxCommand) { - FinishTxCommand cmd = (FinishTxCommand) command; + long commandIndex = clo.index(); + long commandTerm = clo.term(); + @Nullable HybridTimestamp safeTimestamp = clo.safeTimestamp(); Review Comment: I don't think `@Nullable` annotation is needed here, it should be deduced automatically ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java: ########## @@ -124,6 +160,23 @@ private void processWriteCommand(CommandClosure<WriteCommand> clo) { clo.result(null); } + + // result == null means that the command either was not handled by anyone (and clo.result() is called) or + // that it was delegated to a table processor (which called clo.result()). + if (result != null) { + if (Boolean.TRUE.equals(result.get2())) { Review Comment: This is weird, why did you write the comparison this way? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTxFinishMarker.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static org.apache.ignite.internal.tx.TxState.COMMITTED; +import static org.apache.ignite.internal.tx.TxState.isFinalState; + +import java.util.UUID; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.replicator.listener.ReplicaListener; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.TxState; +import org.apache.ignite.internal.tx.TxStateMeta; +import org.jetbrains.annotations.Nullable; + +/** + * Contains transaction finishing logic for partitions' {@link ReplicaListener} implementations. + */ +public class ReplicaTxFinishMarker { + private final TxManager txManager; + + public ReplicaTxFinishMarker(TxManager txManager) { + this.txManager = txManager; + } + + /** + * Marks the transaction as finished in local tx state map. + * + * @param txId Transaction id. + * @param txState Transaction state, must be either {@link TxState#COMMITTED} or {@link TxState#ABORTED}. + * @param commitTimestamp Commit timestamp ({@code null} when aborting). + */ + public void markFinished(UUID txId, TxState txState, @Nullable HybridTimestamp commitTimestamp) { + assert isFinalState(txState) : "Unexpected state [txId=" + txId + ", txState=" + txState + ']'; + + txManager.updateTxMeta(txId, old -> new TxStateMeta( Review Comment: Please see my comments in the other similar class ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java: ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; +import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage; +import static org.apache.ignite.internal.tx.TxState.ABORTED; +import static org.apache.ignite.internal.tx.TxState.COMMITTED; +import static org.apache.ignite.internal.tx.TxState.isFinalState; +import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR; +import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.hlc.ClockService; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory; +import org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandBuilder; +import org.apache.ignite.internal.partition.replicator.raft.UnexpectedTransactionStateException; +import org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource; +import org.apache.ignite.internal.partition.replicator.schemacompat.CompatValidationResult; +import org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator; +import org.apache.ignite.internal.raft.service.RaftCommandRunner; +import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; +import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage; +import org.apache.ignite.internal.schema.SchemaSyncService; +import org.apache.ignite.internal.tx.IncompatibleSchemaAbortException; +import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException; +import org.apache.ignite.internal.tx.TransactionResult; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.TxMeta; +import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest; +import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage; +import org.apache.ignite.tx.TransactionException; +import org.jetbrains.annotations.Nullable; + +/** + * Handles {@link TxFinishReplicaRequest}. + */ +public class TxFinishReplicaRequestHandler { + private static final IgniteLogger LOG = Loggers.forClass(TxFinishReplicaRequestHandler.class); + + /** Factory to create RAFT command messages. */ + private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = + new PartitionReplicationMessagesFactory(); + + /** Factory for creating replica command messages. */ + private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory(); + + private final TxStatePartitionStorage txStatePartitionStorage; + private final ClockService clockService; + private final TxManager txManager; + private final ReplicationGroupId replicationGroupId; + + private final SchemaCompatibilityValidator schemaCompatValidator; + private final ReliableCatalogVersions reliableCatalogVersions; + private final ReplicationRaftCommandApplicator raftCommandApplicator; + private final ReplicaTxFinishMarker replicaTxFinishMarker; + + + /** Constructor. */ + public TxFinishReplicaRequestHandler( + TxStatePartitionStorage txStatePartitionStorage, + ClockService clockService, + TxManager txManager, + ValidationSchemasSource validationSchemasSource, + SchemaSyncService schemaSyncService, + CatalogService catalogService, + RaftCommandRunner raftCommandRunner, + ReplicationGroupId replicationGroupId + ) { + this.txStatePartitionStorage = txStatePartitionStorage; + this.clockService = clockService; + this.txManager = txManager; + this.replicationGroupId = replicationGroupId; + + schemaCompatValidator = new SchemaCompatibilityValidator(validationSchemasSource, catalogService, schemaSyncService); + reliableCatalogVersions = new ReliableCatalogVersions(schemaSyncService, catalogService); + raftCommandApplicator = new ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId); + replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager); + } + + /** + * Processes transaction finish request. + * <ol> + * <li>Get commit timestamp from finish replica request.</li> + * <li>If attempting a commit, validate commit (and, if not valid, switch to abort)</li> + * <li>Run specific raft {@code FinishTxCommand} command, that will apply txn state to corresponding txStateStorage.</li> + * <li>Send cleanup requests to all enlisted primary replicas.</li> + * </ol> + * + * @param request Transaction finish request. + * @return future result of the operation. + */ + public CompletableFuture<TransactionResult> handle(TxFinishReplicaRequest request) { + // TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use ZonePartitionIdMessage and remove cast + Map<TablePartitionId, String> enlistedGroups = asTablePartitionIdStringMap(request.groups()); + + UUID txId = request.txId(); + + if (request.commit()) { + HybridTimestamp commitTimestamp = request.commitTimestamp(); + + return schemaCompatValidator.validateCommit(txId, enlistedGroups.keySet(), commitTimestamp) + .thenCompose(validationResult -> + finishAndCleanup( + enlistedGroups, + validationResult.isSuccessful(), + validationResult.isSuccessful() ? commitTimestamp : null, + txId + ).thenApply(txResult -> { + throwIfSchemaValidationOnCommitFailed(validationResult, txResult); + return txResult; + })); + } else { + // Aborting. + return finishAndCleanup(enlistedGroups, false, null, txId); + } + } + + private static Map<TablePartitionId, String> asTablePartitionIdStringMap(Map<TablePartitionIdMessage, String> messages) { + var result = new HashMap<TablePartitionId, String>(messages.size()); + + for (Entry<TablePartitionIdMessage, String> e : messages.entrySet()) { + result.put(e.getKey().asTablePartitionId(), e.getValue()); + } + + return result; + } + + private CompletableFuture<TransactionResult> finishAndCleanup( + Map<TablePartitionId, String> enlistedPartitions, + boolean commit, + @Nullable HybridTimestamp commitTimestamp, + UUID txId + ) { + // Read TX state from the storage, we will need this state to check if the locks are released. + // Since this state is written only on the transaction finish (see PartitionListener.handleFinishTxCommand), + // the value of txMeta can be either null or COMMITTED/ABORTED. No other values is expected. Review Comment: I don't understand. We don't expect any other values, but yet we later explicitly check for them in `transactionAlreadyFinished`. What am I missing? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java: ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; +import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage; +import static org.apache.ignite.internal.tx.TxState.ABORTED; +import static org.apache.ignite.internal.tx.TxState.COMMITTED; +import static org.apache.ignite.internal.tx.TxState.isFinalState; +import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR; +import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.hlc.ClockService; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory; +import org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandBuilder; +import org.apache.ignite.internal.partition.replicator.raft.UnexpectedTransactionStateException; +import org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource; +import org.apache.ignite.internal.partition.replicator.schemacompat.CompatValidationResult; +import org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator; +import org.apache.ignite.internal.raft.service.RaftCommandRunner; +import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; +import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage; +import org.apache.ignite.internal.schema.SchemaSyncService; +import org.apache.ignite.internal.tx.IncompatibleSchemaAbortException; +import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException; +import org.apache.ignite.internal.tx.TransactionResult; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.TxMeta; +import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest; +import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage; +import org.apache.ignite.tx.TransactionException; +import org.jetbrains.annotations.Nullable; + +/** + * Handles {@link TxFinishReplicaRequest}. + */ +public class TxFinishReplicaRequestHandler { + private static final IgniteLogger LOG = Loggers.forClass(TxFinishReplicaRequestHandler.class); + + /** Factory to create RAFT command messages. */ + private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = + new PartitionReplicationMessagesFactory(); + + /** Factory for creating replica command messages. */ + private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory(); + + private final TxStatePartitionStorage txStatePartitionStorage; + private final ClockService clockService; + private final TxManager txManager; + private final ReplicationGroupId replicationGroupId; + + private final SchemaCompatibilityValidator schemaCompatValidator; + private final ReliableCatalogVersions reliableCatalogVersions; + private final ReplicationRaftCommandApplicator raftCommandApplicator; + private final ReplicaTxFinishMarker replicaTxFinishMarker; + + + /** Constructor. */ + public TxFinishReplicaRequestHandler( + TxStatePartitionStorage txStatePartitionStorage, + ClockService clockService, + TxManager txManager, + ValidationSchemasSource validationSchemasSource, + SchemaSyncService schemaSyncService, + CatalogService catalogService, + RaftCommandRunner raftCommandRunner, + ReplicationGroupId replicationGroupId + ) { + this.txStatePartitionStorage = txStatePartitionStorage; + this.clockService = clockService; + this.txManager = txManager; + this.replicationGroupId = replicationGroupId; + + schemaCompatValidator = new SchemaCompatibilityValidator(validationSchemasSource, catalogService, schemaSyncService); + reliableCatalogVersions = new ReliableCatalogVersions(schemaSyncService, catalogService); + raftCommandApplicator = new ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId); + replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager); + } + + /** + * Processes transaction finish request. + * <ol> + * <li>Get commit timestamp from finish replica request.</li> + * <li>If attempting a commit, validate commit (and, if not valid, switch to abort)</li> + * <li>Run specific raft {@code FinishTxCommand} command, that will apply txn state to corresponding txStateStorage.</li> + * <li>Send cleanup requests to all enlisted primary replicas.</li> + * </ol> + * + * @param request Transaction finish request. + * @return future result of the operation. + */ + public CompletableFuture<TransactionResult> handle(TxFinishReplicaRequest request) { + // TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use ZonePartitionIdMessage and remove cast + Map<TablePartitionId, String> enlistedGroups = asTablePartitionIdStringMap(request.groups()); + + UUID txId = request.txId(); + + if (request.commit()) { + HybridTimestamp commitTimestamp = request.commitTimestamp(); + + return schemaCompatValidator.validateCommit(txId, enlistedGroups.keySet(), commitTimestamp) + .thenCompose(validationResult -> + finishAndCleanup( + enlistedGroups, + validationResult.isSuccessful(), + validationResult.isSuccessful() ? commitTimestamp : null, + txId + ).thenApply(txResult -> { + throwIfSchemaValidationOnCommitFailed(validationResult, txResult); + return txResult; + })); + } else { + // Aborting. + return finishAndCleanup(enlistedGroups, false, null, txId); + } + } + + private static Map<TablePartitionId, String> asTablePartitionIdStringMap(Map<TablePartitionIdMessage, String> messages) { + var result = new HashMap<TablePartitionId, String>(messages.size()); + + for (Entry<TablePartitionIdMessage, String> e : messages.entrySet()) { + result.put(e.getKey().asTablePartitionId(), e.getValue()); + } + + return result; + } + + private CompletableFuture<TransactionResult> finishAndCleanup( + Map<TablePartitionId, String> enlistedPartitions, + boolean commit, + @Nullable HybridTimestamp commitTimestamp, + UUID txId + ) { + // Read TX state from the storage, we will need this state to check if the locks are released. + // Since this state is written only on the transaction finish (see PartitionListener.handleFinishTxCommand), + // the value of txMeta can be either null or COMMITTED/ABORTED. No other values is expected. + TxMeta txMeta = txStatePartitionStorage.get(txId); + + // Check whether a transaction has already been finished. + boolean transactionAlreadyFinished = txMeta != null && isFinalState(txMeta.txState()); + + if (transactionAlreadyFinished) { + // - The Coordinator calls use same tx state over retries, both abort and commit are possible. + // - Server side recovery may only change tx state to aborted. + // - The Coordinator itself should prevent user calls with different proposed state to the one, + // that was already triggered (e.g. the client side -> txCoordinator.commitAsync(); txCoordinator.rollbackAsync()). + // - A coordinator might send a commit, then die, but the commit message might still arrive at the commit partition primary. + // If it arrived with a delay, another node might come across a write intent/lock from that tx + // and realize that the coordinator is no longer available and start tx recovery. + // The original commit message might arrive later than the recovery one, + // hence a 'commit over rollback' case. + // The possible states that a 'commit' is allowed to see: + // - null (if it's the first change state attempt) + // - committed (if it was already updated in the previous attempt) + // - aborted (if it was aborted by the initiate recovery logic, + // though this is a very unlikely case because initiate recovery will only roll back the tx if coordinator is dead). + // + // Within 'roll back' it's allowed to see: + // - null (if it's the first change state attempt) + // - aborted (if it was already updated in the previous attempt or the result of a concurrent recovery) + // - commit (if initiate recovery has started, but a delayed message from the coordinator finally arrived and executed earlier). + + // Let the client know a transaction has finished with a different outcome. + if (commit != (txMeta.txState() == COMMITTED)) { + LOG.error("Failed to finish a transaction that is already finished [txId={}, expectedState={}, actualState={}].", + txId, + commit ? COMMITTED : ABORTED, + txMeta.txState() + ); + + throw new MismatchingTransactionOutcomeInternalException( + "Failed to change the outcome of a finished transaction [txId=" + txId + ", txState=" + txMeta.txState() + "].", + new TransactionResult(txMeta.txState(), txMeta.commitTimestamp()) + ); + } + + return completedFuture(new TransactionResult(txMeta.txState(), txMeta.commitTimestamp())); + } + + return finishTransaction(enlistedPartitions.keySet(), txId, commit, commitTimestamp) + .thenCompose(txResult -> + txManager.cleanup(replicationGroupId, enlistedPartitions, commit, commitTimestamp, txId) + .thenApply(v -> txResult) + ); + } + + private static void throwIfSchemaValidationOnCommitFailed(CompatValidationResult validationResult, TransactionResult txResult) { + if (!validationResult.isSuccessful()) { + if (validationResult.isTableDropped()) { + throw new IncompatibleSchemaAbortException( + format("Commit failed because a table was already dropped [table={}]", validationResult.failedTableName()), + txResult + ); + } else { + throw new IncompatibleSchemaAbortException( + format( + "Commit failed because schema is not forward-compatible " + + "[fromSchemaVersion={}, toSchemaVersion={}, table={}, details={}]", + validationResult.fromSchemaVersion(), + validationResult.toSchemaVersion(), + validationResult.failedTableName(), + validationResult.details() + ), + txResult + ); + } + } + } + + /** + * Finishes a transaction. This operation is idempotent. + * + * @param partitionIds Collection of enlisted partition groups. + * @param txId Transaction id. + * @param commit True is the transaction is committed, false otherwise. + * @param commitTimestamp Commit timestamp, if applicable. + * @return Future to wait of the finish. + */ + private CompletableFuture<TransactionResult> finishTransaction( + Collection<TablePartitionId> partitionIds, + UUID txId, + boolean commit, + @Nullable HybridTimestamp commitTimestamp + ) { + assert !(commit && commitTimestamp == null) : "Cannot commit without the timestamp."; + + HybridTimestamp tsForCatalogVersion = commit ? commitTimestamp : clockService.now(); + + return reliableCatalogVersionFor(tsForCatalogVersion) + .thenCompose(catalogVersion -> applyFinishCommand( + txId, + commit, + commitTimestamp, + catalogVersion, + toPartitionIdMessage(partitionIds) + ) + ) + .handle((txOutcome, ex) -> { + if (ex != null) { + // RAFT 'finish' command failed because the state has already been written by someone else. + // In that case we throw a corresponding exception. + if (ex instanceof UnexpectedTransactionStateException) { + UnexpectedTransactionStateException utse = (UnexpectedTransactionStateException) ex; + TransactionResult result = utse.transactionResult(); + + replicaTxFinishMarker.markFinished(txId, result.transactionState(), result.commitTimestamp()); + + throw new MismatchingTransactionOutcomeInternalException(utse.getMessage(), utse.transactionResult()); + } + // Otherwise we convert from the internal exception to the client one. + throw new TransactionException(commit ? TX_COMMIT_ERR : TX_ROLLBACK_ERR, ex); + } + + TransactionResult result = (TransactionResult) txOutcome; + + replicaTxFinishMarker.markFinished(txId, result.transactionState(), result.commitTimestamp()); + + return result; + }); + } + + private CompletableFuture<Integer> reliableCatalogVersionFor(HybridTimestamp ts) { + return reliableCatalogVersions.reliableCatalogVersionFor(ts); + } + + private CompletableFuture<Object> applyFinishCommand( + UUID transactionId, + boolean commit, + HybridTimestamp commitTimestamp, Review Comment: ```suggestion @Nullable HybridTimestamp commitTimestamp, ``` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ResultWrapper.java: ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import org.apache.ignite.internal.raft.Command; + +/** + * Wrapper for the update(All)Command processing result that besides result itself stores actual command that was processed. + */ +public class ResultWrapper<T> { + private final Command command; + private final T result; + + public ResultWrapper(Command command, T result) { + this.command = command; + this.result = result; + } + + public Command getCommand() { Review Comment: Why is this needed? Did you extract it from somewhere and will be used in some upcoming code? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTxFinishMarker.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.raft; + +import static org.apache.ignite.internal.tx.TxState.ABORTED; +import static org.apache.ignite.internal.tx.TxState.COMMITTED; + +import java.util.UUID; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.TxStateMeta; +import org.jetbrains.annotations.Nullable; + +/** + * Contains transaction finishing logic for partitions Raft listener implementations. + */ +public class RaftTxFinishMarker { + private final TxManager txManager; + + public RaftTxFinishMarker(TxManager txManager) { + this.txManager = txManager; + } + + /** + * Marks the transaction as finished in local tx state map. + * + * @param txId Transaction id. + * @param commit Whether this is a commit. + * @param commitTimestamp Commit timestamp ({@code null} when aborting). + * @param commitPartitionId Commit partition ID. + */ + public void markFinished( + UUID txId, + boolean commit, + @Nullable HybridTimestamp commitTimestamp, + @Nullable TablePartitionId commitPartitionId + ) { + txManager.updateTxMeta(txId, old -> new TxStateMeta( + commit ? COMMITTED : ABORTED, Review Comment: Shall we pass this state directly, instead of the flag? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ResultWrapper.java: ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import org.apache.ignite.internal.raft.Command; + +/** + * Wrapper for the update(All)Command processing result that besides result itself stores actual command that was processed. Review Comment: If this is only for update commands, should we call it `UpdateResultWrapper`? ########## modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java: ########## @@ -735,7 +735,7 @@ public CompletableFuture<Replica> startReplica( Function<RaftGroupService, ReplicaListener> listenerFactory, SnapshotStorageFactory snapshotStorageFactory, PeersAndLearners newConfiguration, - RaftGroupListener raftGroupListener, + Supplier<RaftGroupListener> raftGroupListenerFactory, Review Comment: Why is this change needed? You immediately call `.get` on it ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTxFinishMarker.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.raft; + +import static org.apache.ignite.internal.tx.TxState.ABORTED; +import static org.apache.ignite.internal.tx.TxState.COMMITTED; + +import java.util.UUID; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.TxStateMeta; +import org.jetbrains.annotations.Nullable; + +/** + * Contains transaction finishing logic for partitions Raft listener implementations. + */ +public class RaftTxFinishMarker { + private final TxManager txManager; + + public RaftTxFinishMarker(TxManager txManager) { + this.txManager = txManager; + } + + /** + * Marks the transaction as finished in local tx state map. + * + * @param txId Transaction id. + * @param commit Whether this is a commit. + * @param commitTimestamp Commit timestamp ({@code null} when aborting). + * @param commitPartitionId Commit partition ID. + */ + public void markFinished( + UUID txId, + boolean commit, + @Nullable HybridTimestamp commitTimestamp, + @Nullable TablePartitionId commitPartitionId + ) { + txManager.updateTxMeta(txId, old -> new TxStateMeta( + commit ? COMMITTED : ABORTED, + old == null ? null : old.txCoordinatorId(), + old == null ? commitPartitionId : old.commitPartitionId(), + commit ? commitTimestamp : null, Review Comment: This check is redundant ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTxFinishMarker.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.raft; + +import static org.apache.ignite.internal.tx.TxState.ABORTED; +import static org.apache.ignite.internal.tx.TxState.COMMITTED; + +import java.util.UUID; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.TxStateMeta; +import org.jetbrains.annotations.Nullable; + +/** + * Contains transaction finishing logic for partitions Raft listener implementations. + */ +public class RaftTxFinishMarker { + private final TxManager txManager; + + public RaftTxFinishMarker(TxManager txManager) { + this.txManager = txManager; + } + + /** + * Marks the transaction as finished in local tx state map. + * + * @param txId Transaction id. + * @param commit Whether this is a commit. + * @param commitTimestamp Commit timestamp ({@code null} when aborting). + * @param commitPartitionId Commit partition ID. + */ + public void markFinished( + UUID txId, + boolean commit, + @Nullable HybridTimestamp commitTimestamp, + @Nullable TablePartitionId commitPartitionId + ) { + txManager.updateTxMeta(txId, old -> new TxStateMeta( + commit ? COMMITTED : ABORTED, + old == null ? null : old.txCoordinatorId(), Review Comment: I would suggest to extract one big `if (old == null)` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java: ########## @@ -103,18 +134,23 @@ public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) { private void processWriteCommand(CommandClosure<WriteCommand> clo) { Command command = clo.command(); - if (command instanceof FinishTxCommand) { - FinishTxCommand cmd = (FinishTxCommand) command; + long commandIndex = clo.index(); + long commandTerm = clo.term(); + @Nullable HybridTimestamp safeTimestamp = clo.safeTimestamp(); + assert safeTimestamp == null || command instanceof SafeTimePropagatingCommand : command; + + IgniteBiTuple<Serializable, Boolean> result = null; Review Comment: Can we do something with this ugly `IgniteBiTuple`? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java: ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; +import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage; +import static org.apache.ignite.internal.tx.TxState.ABORTED; +import static org.apache.ignite.internal.tx.TxState.COMMITTED; +import static org.apache.ignite.internal.tx.TxState.isFinalState; +import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR; +import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.hlc.ClockService; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory; +import org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandBuilder; +import org.apache.ignite.internal.partition.replicator.raft.UnexpectedTransactionStateException; +import org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource; +import org.apache.ignite.internal.partition.replicator.schemacompat.CompatValidationResult; +import org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator; +import org.apache.ignite.internal.raft.service.RaftCommandRunner; +import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; +import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage; +import org.apache.ignite.internal.schema.SchemaSyncService; +import org.apache.ignite.internal.tx.IncompatibleSchemaAbortException; +import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException; +import org.apache.ignite.internal.tx.TransactionResult; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.TxMeta; +import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest; +import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage; +import org.apache.ignite.tx.TransactionException; +import org.jetbrains.annotations.Nullable; + +/** + * Handles {@link TxFinishReplicaRequest}. + */ +public class TxFinishReplicaRequestHandler { + private static final IgniteLogger LOG = Loggers.forClass(TxFinishReplicaRequestHandler.class); + + /** Factory to create RAFT command messages. */ + private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = + new PartitionReplicationMessagesFactory(); + + /** Factory for creating replica command messages. */ + private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory(); + + private final TxStatePartitionStorage txStatePartitionStorage; + private final ClockService clockService; + private final TxManager txManager; + private final ReplicationGroupId replicationGroupId; + + private final SchemaCompatibilityValidator schemaCompatValidator; + private final ReliableCatalogVersions reliableCatalogVersions; + private final ReplicationRaftCommandApplicator raftCommandApplicator; + private final ReplicaTxFinishMarker replicaTxFinishMarker; + + + /** Constructor. */ + public TxFinishReplicaRequestHandler( + TxStatePartitionStorage txStatePartitionStorage, + ClockService clockService, + TxManager txManager, + ValidationSchemasSource validationSchemasSource, + SchemaSyncService schemaSyncService, + CatalogService catalogService, + RaftCommandRunner raftCommandRunner, + ReplicationGroupId replicationGroupId + ) { + this.txStatePartitionStorage = txStatePartitionStorage; + this.clockService = clockService; + this.txManager = txManager; + this.replicationGroupId = replicationGroupId; + + schemaCompatValidator = new SchemaCompatibilityValidator(validationSchemasSource, catalogService, schemaSyncService); + reliableCatalogVersions = new ReliableCatalogVersions(schemaSyncService, catalogService); + raftCommandApplicator = new ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId); + replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager); + } + + /** + * Processes transaction finish request. + * <ol> + * <li>Get commit timestamp from finish replica request.</li> + * <li>If attempting a commit, validate commit (and, if not valid, switch to abort)</li> + * <li>Run specific raft {@code FinishTxCommand} command, that will apply txn state to corresponding txStateStorage.</li> + * <li>Send cleanup requests to all enlisted primary replicas.</li> + * </ol> + * + * @param request Transaction finish request. + * @return future result of the operation. + */ + public CompletableFuture<TransactionResult> handle(TxFinishReplicaRequest request) { + // TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use ZonePartitionIdMessage and remove cast + Map<TablePartitionId, String> enlistedGroups = asTablePartitionIdStringMap(request.groups()); + + UUID txId = request.txId(); + + if (request.commit()) { + HybridTimestamp commitTimestamp = request.commitTimestamp(); + + return schemaCompatValidator.validateCommit(txId, enlistedGroups.keySet(), commitTimestamp) + .thenCompose(validationResult -> + finishAndCleanup( + enlistedGroups, + validationResult.isSuccessful(), + validationResult.isSuccessful() ? commitTimestamp : null, + txId + ).thenApply(txResult -> { + throwIfSchemaValidationOnCommitFailed(validationResult, txResult); + return txResult; + })); + } else { + // Aborting. + return finishAndCleanup(enlistedGroups, false, null, txId); + } + } + + private static Map<TablePartitionId, String> asTablePartitionIdStringMap(Map<TablePartitionIdMessage, String> messages) { + var result = new HashMap<TablePartitionId, String>(messages.size()); + + for (Entry<TablePartitionIdMessage, String> e : messages.entrySet()) { + result.put(e.getKey().asTablePartitionId(), e.getValue()); + } + + return result; + } + + private CompletableFuture<TransactionResult> finishAndCleanup( + Map<TablePartitionId, String> enlistedPartitions, + boolean commit, + @Nullable HybridTimestamp commitTimestamp, + UUID txId + ) { + // Read TX state from the storage, we will need this state to check if the locks are released. + // Since this state is written only on the transaction finish (see PartitionListener.handleFinishTxCommand), + // the value of txMeta can be either null or COMMITTED/ABORTED. No other values is expected. Review Comment: ```suggestion // the value of txMeta can be either null or COMMITTED/ABORTED. No other values are expected. ``` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java: ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; +import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage; +import static org.apache.ignite.internal.tx.TxState.ABORTED; +import static org.apache.ignite.internal.tx.TxState.COMMITTED; +import static org.apache.ignite.internal.tx.TxState.isFinalState; +import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR; +import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.hlc.ClockService; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory; +import org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandBuilder; +import org.apache.ignite.internal.partition.replicator.raft.UnexpectedTransactionStateException; +import org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource; +import org.apache.ignite.internal.partition.replicator.schemacompat.CompatValidationResult; +import org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator; +import org.apache.ignite.internal.raft.service.RaftCommandRunner; +import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; +import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage; +import org.apache.ignite.internal.schema.SchemaSyncService; +import org.apache.ignite.internal.tx.IncompatibleSchemaAbortException; +import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException; +import org.apache.ignite.internal.tx.TransactionResult; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.TxMeta; +import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest; +import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage; +import org.apache.ignite.tx.TransactionException; +import org.jetbrains.annotations.Nullable; + +/** + * Handles {@link TxFinishReplicaRequest}. + */ +public class TxFinishReplicaRequestHandler { + private static final IgniteLogger LOG = Loggers.forClass(TxFinishReplicaRequestHandler.class); + + /** Factory to create RAFT command messages. */ + private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = + new PartitionReplicationMessagesFactory(); + + /** Factory for creating replica command messages. */ + private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory(); + + private final TxStatePartitionStorage txStatePartitionStorage; + private final ClockService clockService; + private final TxManager txManager; + private final ReplicationGroupId replicationGroupId; + + private final SchemaCompatibilityValidator schemaCompatValidator; + private final ReliableCatalogVersions reliableCatalogVersions; + private final ReplicationRaftCommandApplicator raftCommandApplicator; + private final ReplicaTxFinishMarker replicaTxFinishMarker; + + + /** Constructor. */ + public TxFinishReplicaRequestHandler( + TxStatePartitionStorage txStatePartitionStorage, + ClockService clockService, + TxManager txManager, + ValidationSchemasSource validationSchemasSource, + SchemaSyncService schemaSyncService, + CatalogService catalogService, + RaftCommandRunner raftCommandRunner, + ReplicationGroupId replicationGroupId + ) { + this.txStatePartitionStorage = txStatePartitionStorage; + this.clockService = clockService; + this.txManager = txManager; + this.replicationGroupId = replicationGroupId; + + schemaCompatValidator = new SchemaCompatibilityValidator(validationSchemasSource, catalogService, schemaSyncService); + reliableCatalogVersions = new ReliableCatalogVersions(schemaSyncService, catalogService); + raftCommandApplicator = new ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId); + replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager); + } + + /** + * Processes transaction finish request. + * <ol> + * <li>Get commit timestamp from finish replica request.</li> + * <li>If attempting a commit, validate commit (and, if not valid, switch to abort)</li> + * <li>Run specific raft {@code FinishTxCommand} command, that will apply txn state to corresponding txStateStorage.</li> + * <li>Send cleanup requests to all enlisted primary replicas.</li> + * </ol> + * + * @param request Transaction finish request. + * @return future result of the operation. + */ + public CompletableFuture<TransactionResult> handle(TxFinishReplicaRequest request) { + // TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use ZonePartitionIdMessage and remove cast + Map<TablePartitionId, String> enlistedGroups = asTablePartitionIdStringMap(request.groups()); + + UUID txId = request.txId(); + + if (request.commit()) { + HybridTimestamp commitTimestamp = request.commitTimestamp(); + + return schemaCompatValidator.validateCommit(txId, enlistedGroups.keySet(), commitTimestamp) + .thenCompose(validationResult -> + finishAndCleanup( + enlistedGroups, + validationResult.isSuccessful(), + validationResult.isSuccessful() ? commitTimestamp : null, + txId + ).thenApply(txResult -> { + throwIfSchemaValidationOnCommitFailed(validationResult, txResult); + return txResult; + })); + } else { + // Aborting. + return finishAndCleanup(enlistedGroups, false, null, txId); + } + } + + private static Map<TablePartitionId, String> asTablePartitionIdStringMap(Map<TablePartitionIdMessage, String> messages) { + var result = new HashMap<TablePartitionId, String>(messages.size()); Review Comment: you need to use `capacity` here ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java: ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; +import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage; +import static org.apache.ignite.internal.tx.TxState.ABORTED; +import static org.apache.ignite.internal.tx.TxState.COMMITTED; +import static org.apache.ignite.internal.tx.TxState.isFinalState; +import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR; +import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.hlc.ClockService; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory; +import org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandBuilder; +import org.apache.ignite.internal.partition.replicator.raft.UnexpectedTransactionStateException; +import org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource; +import org.apache.ignite.internal.partition.replicator.schemacompat.CompatValidationResult; +import org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator; +import org.apache.ignite.internal.raft.service.RaftCommandRunner; +import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; +import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage; +import org.apache.ignite.internal.schema.SchemaSyncService; +import org.apache.ignite.internal.tx.IncompatibleSchemaAbortException; +import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException; +import org.apache.ignite.internal.tx.TransactionResult; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.TxMeta; +import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest; +import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage; +import org.apache.ignite.tx.TransactionException; +import org.jetbrains.annotations.Nullable; + +/** + * Handles {@link TxFinishReplicaRequest}. + */ +public class TxFinishReplicaRequestHandler { + private static final IgniteLogger LOG = Loggers.forClass(TxFinishReplicaRequestHandler.class); + + /** Factory to create RAFT command messages. */ + private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = + new PartitionReplicationMessagesFactory(); + + /** Factory for creating replica command messages. */ + private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory(); + + private final TxStatePartitionStorage txStatePartitionStorage; + private final ClockService clockService; + private final TxManager txManager; + private final ReplicationGroupId replicationGroupId; + + private final SchemaCompatibilityValidator schemaCompatValidator; + private final ReliableCatalogVersions reliableCatalogVersions; + private final ReplicationRaftCommandApplicator raftCommandApplicator; + private final ReplicaTxFinishMarker replicaTxFinishMarker; + + + /** Constructor. */ + public TxFinishReplicaRequestHandler( + TxStatePartitionStorage txStatePartitionStorage, + ClockService clockService, + TxManager txManager, + ValidationSchemasSource validationSchemasSource, + SchemaSyncService schemaSyncService, + CatalogService catalogService, + RaftCommandRunner raftCommandRunner, + ReplicationGroupId replicationGroupId + ) { + this.txStatePartitionStorage = txStatePartitionStorage; + this.clockService = clockService; + this.txManager = txManager; + this.replicationGroupId = replicationGroupId; + + schemaCompatValidator = new SchemaCompatibilityValidator(validationSchemasSource, catalogService, schemaSyncService); + reliableCatalogVersions = new ReliableCatalogVersions(schemaSyncService, catalogService); + raftCommandApplicator = new ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId); + replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager); + } + + /** + * Processes transaction finish request. + * <ol> + * <li>Get commit timestamp from finish replica request.</li> + * <li>If attempting a commit, validate commit (and, if not valid, switch to abort)</li> + * <li>Run specific raft {@code FinishTxCommand} command, that will apply txn state to corresponding txStateStorage.</li> Review Comment: What do you mean by `specific`? ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java: ########## @@ -404,6 +420,99 @@ void testDataRebalance(boolean truncateRaftLog) throws Exception { assertThat(kvView2.getAll(null, data2.keySet()), is(data2)); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void txFinishCommandGetsReplicated(boolean commit) throws Exception { + startCluster(3); + + // Create a zone with a single partition on every node. + int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size()); + + int tableId1 = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1); + int tableId2 = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME2); + + var zonePartitionId = new ZonePartitionId(zoneId, 0); + + setupTableIdToZoneIdConverter(zonePartitionId, new TablePartitionId(tableId1, 0), new TablePartitionId(tableId2, 0)); + + cluster.forEach(Node::waitForMetadataCompletenessAtNow); + + Node node = cluster.get(0); + + setPrimaryReplica(node, zonePartitionId); + + KeyValueView<Integer, Integer> kvView1 = node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, Integer.class); + KeyValueView<Integer, Integer> kvView2 = node.tableManager.table(TEST_TABLE_NAME2).keyValueView(Integer.class, Integer.class); + + InternalTransaction transaction = unwrapInternalTransaction(node.transactions().begin()); + kvView1.put(transaction, 42, 69); + kvView2.put(transaction, 142, 169); + if (commit) { + transaction.commit(); + } else { + transaction.rollback(); + } + + for (Node currentNode : cluster) { + assertTrue(waitForCondition( + () -> !txStatesInPartitionStorage(currentNode.txStatePartitionStorage(zoneId, 0)).isEmpty(), + SECONDS.toMillis(10) + )); + } + + List<CountExpectation> expectations = new ArrayList<>(); + for (int i = 0; i < cluster.size(); i++) { + Node currentNode = cluster.get(i); + expectations.add(new CountExpectation( Review Comment: Why do we need this `CountExpectation` class? Can we just add a bunch of `Executables` directly? ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java: ########## @@ -404,6 +420,99 @@ void testDataRebalance(boolean truncateRaftLog) throws Exception { assertThat(kvView2.getAll(null, data2.keySet()), is(data2)); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void txFinishCommandGetsReplicated(boolean commit) throws Exception { + startCluster(3); + + // Create a zone with a single partition on every node. + int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size()); + + int tableId1 = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1); + int tableId2 = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME2); + + var zonePartitionId = new ZonePartitionId(zoneId, 0); + + setupTableIdToZoneIdConverter(zonePartitionId, new TablePartitionId(tableId1, 0), new TablePartitionId(tableId2, 0)); + + cluster.forEach(Node::waitForMetadataCompletenessAtNow); + + Node node = cluster.get(0); + + setPrimaryReplica(node, zonePartitionId); + + KeyValueView<Integer, Integer> kvView1 = node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, Integer.class); + KeyValueView<Integer, Integer> kvView2 = node.tableManager.table(TEST_TABLE_NAME2).keyValueView(Integer.class, Integer.class); + + InternalTransaction transaction = unwrapInternalTransaction(node.transactions().begin()); Review Comment: Why do you need this unwrapping? ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java: ########## @@ -404,6 +420,99 @@ void testDataRebalance(boolean truncateRaftLog) throws Exception { assertThat(kvView2.getAll(null, data2.keySet()), is(data2)); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void txFinishCommandGetsReplicated(boolean commit) throws Exception { Review Comment: I would suggest to extract this test in a separate class, because it will become super huge if we will be adding all commands here. But I don't insist. ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java: ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; +import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage; +import static org.apache.ignite.internal.tx.TxState.ABORTED; +import static org.apache.ignite.internal.tx.TxState.COMMITTED; +import static org.apache.ignite.internal.tx.TxState.isFinalState; +import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR; +import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.hlc.ClockService; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory; +import org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandBuilder; +import org.apache.ignite.internal.partition.replicator.raft.UnexpectedTransactionStateException; +import org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource; +import org.apache.ignite.internal.partition.replicator.schemacompat.CompatValidationResult; +import org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator; +import org.apache.ignite.internal.raft.service.RaftCommandRunner; +import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; +import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage; +import org.apache.ignite.internal.schema.SchemaSyncService; +import org.apache.ignite.internal.tx.IncompatibleSchemaAbortException; +import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException; +import org.apache.ignite.internal.tx.TransactionResult; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.TxMeta; +import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest; +import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage; +import org.apache.ignite.tx.TransactionException; +import org.jetbrains.annotations.Nullable; + +/** + * Handles {@link TxFinishReplicaRequest}. + */ +public class TxFinishReplicaRequestHandler { + private static final IgniteLogger LOG = Loggers.forClass(TxFinishReplicaRequestHandler.class); + + /** Factory to create RAFT command messages. */ + private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = + new PartitionReplicationMessagesFactory(); + + /** Factory for creating replica command messages. */ + private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory(); + + private final TxStatePartitionStorage txStatePartitionStorage; + private final ClockService clockService; + private final TxManager txManager; + private final ReplicationGroupId replicationGroupId; + + private final SchemaCompatibilityValidator schemaCompatValidator; + private final ReliableCatalogVersions reliableCatalogVersions; + private final ReplicationRaftCommandApplicator raftCommandApplicator; + private final ReplicaTxFinishMarker replicaTxFinishMarker; + + + /** Constructor. */ + public TxFinishReplicaRequestHandler( + TxStatePartitionStorage txStatePartitionStorage, + ClockService clockService, + TxManager txManager, + ValidationSchemasSource validationSchemasSource, + SchemaSyncService schemaSyncService, + CatalogService catalogService, + RaftCommandRunner raftCommandRunner, + ReplicationGroupId replicationGroupId + ) { + this.txStatePartitionStorage = txStatePartitionStorage; + this.clockService = clockService; + this.txManager = txManager; + this.replicationGroupId = replicationGroupId; + + schemaCompatValidator = new SchemaCompatibilityValidator(validationSchemasSource, catalogService, schemaSyncService); + reliableCatalogVersions = new ReliableCatalogVersions(schemaSyncService, catalogService); + raftCommandApplicator = new ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId); + replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager); + } + + /** + * Processes transaction finish request. + * <ol> + * <li>Get commit timestamp from finish replica request.</li> + * <li>If attempting a commit, validate commit (and, if not valid, switch to abort)</li> + * <li>Run specific raft {@code FinishTxCommand} command, that will apply txn state to corresponding txStateStorage.</li> + * <li>Send cleanup requests to all enlisted primary replicas.</li> + * </ol> + * + * @param request Transaction finish request. + * @return future result of the operation. + */ + public CompletableFuture<TransactionResult> handle(TxFinishReplicaRequest request) { + // TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use ZonePartitionIdMessage and remove cast + Map<TablePartitionId, String> enlistedGroups = asTablePartitionIdStringMap(request.groups()); + + UUID txId = request.txId(); + + if (request.commit()) { + HybridTimestamp commitTimestamp = request.commitTimestamp(); + + return schemaCompatValidator.validateCommit(txId, enlistedGroups.keySet(), commitTimestamp) + .thenCompose(validationResult -> + finishAndCleanup( + enlistedGroups, + validationResult.isSuccessful(), + validationResult.isSuccessful() ? commitTimestamp : null, + txId + ).thenApply(txResult -> { + throwIfSchemaValidationOnCommitFailed(validationResult, txResult); + return txResult; + })); + } else { + // Aborting. + return finishAndCleanup(enlistedGroups, false, null, txId); + } + } + + private static Map<TablePartitionId, String> asTablePartitionIdStringMap(Map<TablePartitionIdMessage, String> messages) { + var result = new HashMap<TablePartitionId, String>(messages.size()); + + for (Entry<TablePartitionIdMessage, String> e : messages.entrySet()) { + result.put(e.getKey().asTablePartitionId(), e.getValue()); + } + + return result; + } + + private CompletableFuture<TransactionResult> finishAndCleanup( + Map<TablePartitionId, String> enlistedPartitions, + boolean commit, + @Nullable HybridTimestamp commitTimestamp, + UUID txId + ) { + // Read TX state from the storage, we will need this state to check if the locks are released. + // Since this state is written only on the transaction finish (see PartitionListener.handleFinishTxCommand), + // the value of txMeta can be either null or COMMITTED/ABORTED. No other values is expected. + TxMeta txMeta = txStatePartitionStorage.get(txId); + + // Check whether a transaction has already been finished. + boolean transactionAlreadyFinished = txMeta != null && isFinalState(txMeta.txState()); + + if (transactionAlreadyFinished) { + // - The Coordinator calls use same tx state over retries, both abort and commit are possible. + // - Server side recovery may only change tx state to aborted. + // - The Coordinator itself should prevent user calls with different proposed state to the one, + // that was already triggered (e.g. the client side -> txCoordinator.commitAsync(); txCoordinator.rollbackAsync()). + // - A coordinator might send a commit, then die, but the commit message might still arrive at the commit partition primary. + // If it arrived with a delay, another node might come across a write intent/lock from that tx + // and realize that the coordinator is no longer available and start tx recovery. + // The original commit message might arrive later than the recovery one, + // hence a 'commit over rollback' case. + // The possible states that a 'commit' is allowed to see: + // - null (if it's the first change state attempt) + // - committed (if it was already updated in the previous attempt) + // - aborted (if it was aborted by the initiate recovery logic, + // though this is a very unlikely case because initiate recovery will only roll back the tx if coordinator is dead). + // + // Within 'roll back' it's allowed to see: + // - null (if it's the first change state attempt) + // - aborted (if it was already updated in the previous attempt or the result of a concurrent recovery) + // - commit (if initiate recovery has started, but a delayed message from the coordinator finally arrived and executed earlier). + + // Let the client know a transaction has finished with a different outcome. + if (commit != (txMeta.txState() == COMMITTED)) { + LOG.error("Failed to finish a transaction that is already finished [txId={}, expectedState={}, actualState={}].", + txId, + commit ? COMMITTED : ABORTED, + txMeta.txState() + ); + + throw new MismatchingTransactionOutcomeInternalException( + "Failed to change the outcome of a finished transaction [txId=" + txId + ", txState=" + txMeta.txState() + "].", + new TransactionResult(txMeta.txState(), txMeta.commitTimestamp()) + ); + } + + return completedFuture(new TransactionResult(txMeta.txState(), txMeta.commitTimestamp())); + } + + return finishTransaction(enlistedPartitions.keySet(), txId, commit, commitTimestamp) + .thenCompose(txResult -> + txManager.cleanup(replicationGroupId, enlistedPartitions, commit, commitTimestamp, txId) + .thenApply(v -> txResult) + ); + } + + private static void throwIfSchemaValidationOnCommitFailed(CompatValidationResult validationResult, TransactionResult txResult) { + if (!validationResult.isSuccessful()) { + if (validationResult.isTableDropped()) { + throw new IncompatibleSchemaAbortException( + format("Commit failed because a table was already dropped [table={}]", validationResult.failedTableName()), + txResult + ); + } else { + throw new IncompatibleSchemaAbortException( + format( + "Commit failed because schema is not forward-compatible " + + "[fromSchemaVersion={}, toSchemaVersion={}, table={}, details={}]", + validationResult.fromSchemaVersion(), + validationResult.toSchemaVersion(), + validationResult.failedTableName(), + validationResult.details() + ), + txResult + ); + } + } + } + + /** + * Finishes a transaction. This operation is idempotent. + * + * @param partitionIds Collection of enlisted partition groups. + * @param txId Transaction id. + * @param commit True is the transaction is committed, false otherwise. + * @param commitTimestamp Commit timestamp, if applicable. + * @return Future to wait of the finish. + */ + private CompletableFuture<TransactionResult> finishTransaction( + Collection<TablePartitionId> partitionIds, + UUID txId, + boolean commit, + @Nullable HybridTimestamp commitTimestamp + ) { + assert !(commit && commitTimestamp == null) : "Cannot commit without the timestamp."; + + HybridTimestamp tsForCatalogVersion = commit ? commitTimestamp : clockService.now(); + + return reliableCatalogVersionFor(tsForCatalogVersion) + .thenCompose(catalogVersion -> applyFinishCommand( + txId, + commit, + commitTimestamp, + catalogVersion, + toPartitionIdMessage(partitionIds) + ) Review Comment: We can save us some indentation here: ``` .thenCompose(catalogVersion -> applyFinishCommand( txId, commit, commitTimestamp, catalogVersion, toPartitionIdMessage(partitionIds) )) ``` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java: ########## @@ -124,6 +160,23 @@ private void processWriteCommand(CommandClosure<WriteCommand> clo) { clo.result(null); } + + // result == null means that the command either was not handled by anyone (and clo.result() is called) or + // that it was delegated to a table processor (which called clo.result()). + if (result != null) { + if (Boolean.TRUE.equals(result.get2())) { Review Comment: Also, this and the previous conditions can be united -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org