rpuch commented on code in PR #4821: URL: https://github.com/apache/ignite-3/pull/4821#discussion_r1883812229
########## modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java: ########## @@ -836,7 +836,7 @@ private RaftGroupOptions groupOptionsForPartition(boolean isVolatileStorage, @Nu } raftGroupOptions.snapshotStorageFactory(snapshotFactory); - + raftGroupOptions.maxClockSkew((int) clockService.maxClockSkewMillis()); Review Comment: Why is `maxClockSkew` type `int`? We usually represent 'times in millis' as longs, this allows to avoid casting ########## modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java: ########## @@ -110,6 +110,35 @@ public void update(T newValue, @Nullable R futureResult) { } } + /** + * Strict update with reordering check. Always called from the same updater thread. + * + * @param newValue New value. + * @param futureResult A result that will be used to complete a future returned by the + * {@link PendingComparableValuesTracker#waitFor(Comparable)}. + */ + public void updateStrict(T newValue, @Nullable R futureResult) { + if (!busyLock.readLock().tryLock()) { + throw new TrackerClosedException(); + } + + try { + Map.Entry<T, @Nullable R> current = this.current; + + IgniteBiTuple<T, @Nullable R> newEntry = new IgniteBiTuple<>(newValue, futureResult); + + // Entries from the same batch receive equal safe timestamps. Review Comment: This class is generic, it's about some comparable values and not just about timestamps. So the comment about 'safe timestamps' looks strange here. Also, what batches are mentioned here? Looks like some details leaked here from the context where this method is used. ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java: ########## @@ -58,6 +58,11 @@ public class RaftGroupOptions { */ private @Nullable Long externallyEnforcedConfigIndex; + /** + * Max clock skew in the replication group in milliseconds. + */ + private int maxClockSkew; Review Comment: ```suggestion private int maxClockSkewMs; ``` This will allow to make usages obvious wrt the unit ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java: ########## @@ -300,10 +279,10 @@ private void onWriteBusy(Iterator<CommandClosure<WriteCommand>> iterator) { * @param commandIndex Index of the RAFT command. * @param commandTerm Term of the RAFT command. */ - private UpdateCommandResult handleUpdateCommand(UpdateCommand cmd, long commandIndex, long commandTerm) { + private UpdateCommandResult handleUpdateCommand(UpdateCommand cmd, long commandIndex, long commandTerm, boolean[] applied) { // Skips the write command because the storage has already executed it. if (commandIndex <= storage.lastAppliedIndex()) { - return new UpdateCommandResult(true, isPrimaryInGroupTopology()); + return null; Review Comment: Is it ok to return `null` here? The method is not declared as returning something nullable. If it's ok to return `null`, please annotate the method and/or add a comment explaining why it's ok ########## modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java: ########## @@ -1131,7 +1131,6 @@ private void sendSafeTimeSyncIfReplicaReady(CompletableFuture<Replica> replicaFu ReplicaSafeTimeSyncRequest req = REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest() .groupId(toReplicationGroupIdMessage(replica.groupId())) - .proposedSafeTime(proposedSafeTime) Review Comment: Let's remove `proposedSafeTime` from parameters of this method and all methods that pass it here ########## modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxObservableTimePropagationTest.java: ########## @@ -17,34 +17,49 @@ package org.apache.ignite.distributed; +import static java.util.stream.Collectors.toList; import static org.apache.ignite.distributed.ItTxTestCluster.NODE_PORT_BASE; import static org.apache.ignite.internal.tx.impl.ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; -import java.util.Collection; +import java.util.List; import org.apache.ignite.internal.TestHybridClock; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; +import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.table.TxInfrastructureTest; +import org.apache.ignite.internal.table.distributed.raft.PartitionListener; +import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.testframework.SystemPropertiesExtension; import org.apache.ignite.internal.testframework.WithSystemProperty; import org.apache.ignite.internal.tx.TxStateMeta; -import org.apache.ignite.internal.tx.impl.TxManagerImpl; +import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.raft.jraft.RaftGroupService; +import org.apache.ignite.raft.jraft.Status; +import org.apache.ignite.raft.jraft.core.NodeImpl; import org.apache.ignite.table.RecordView; import org.apache.ignite.table.Tuple; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.ExtendWith; /** - * Tests if commit timestamp is propagated to observable time correctly. + * Tests if commit timestamp and safe timestamp are monotonically grow on leader change. Review Comment: ```suggestion * Tests if commit timestamp and safe timestamp monotonically grow on leader change. ``` ########## modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java: ########## @@ -285,8 +289,10 @@ private class LogEntryAndClosureHandler implements EventHandler<LogEntryAndClosu // task list for batch private final List<LogEntryAndClosure> tasks = new ArrayList<>(NodeImpl.this.raftOptions.getApplyBatch()); + private @Nullable HybridTimestamp safeTs = null; Review Comment: Safe time doesn't seem to belong to the Raft protocol. Should we track it elsewhere, not in JRaft core code? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java: ########## @@ -1956,15 +1935,13 @@ private CompletableFuture<WriteIntentSwitchReplicatedInfo> applyWriteIntentSwitc indexIdsAtRwTxBeginTs(transactionId) ); - CompletableFuture<Object> resultFuture = new CompletableFuture<>(); - - applyCmdWithRetryOnSafeTimeReorderException(wiSwitchCmd, resultFuture); - - return resultFuture + return applyCmdWithExceptionHandling(wiSwitchCmd) .exceptionally(e -> { LOG.warn("Failed to complete transaction cleanup command [txId=" + transactionId + ']', e); - return nullCompletedFuture(); + ExceptionUtils.sneakyThrow(e); Review Comment: If the exception is rethrown anyway, we don't need `exceptionally()`. But why is this changed to throwing the exception? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/marshaller/PartitionCommandsMarshallerImpl.java: ########## @@ -31,39 +32,51 @@ public PartitionCommandsMarshallerImpl(MessageSerializationRegistry serializatio super(serializationRegistry, cache); } + @Override + public void patch(ByteBuffer raw, HybridTimestamp safeTs) { + ByteBuffer dup = raw.duplicate().order(ORDER); + dup.putLong(4, safeTs.longValue()); + } + @Override protected void beforeWriteMessage(Object o, ByteBuffer buffer) { int requiredCatalogVersion = o instanceof CatalogVersionAware ? ((CatalogVersionAware) o).requiredCatalogVersion() - : NO_VERSION_REQUIRED; + : PartitionCommandsMarshaller.NO_VERSION_REQUIRED; stream.setBuffer(buffer); - stream.writeInt(requiredCatalogVersion); + stream.writeFixedInt(requiredCatalogVersion); Review Comment: Please add a comment on why fixed ints are used (probably, to make it patchable?) ########## modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java: ########## @@ -121,6 +119,8 @@ public interface TimeoutNowRequest extends Message { String peerId(); long term(); + + @Nullable HybridTimestamp timestamp(); Review Comment: Why is it nullable? `Replicator` seems to always send a timeout ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java: ########## @@ -627,19 +592,29 @@ public MvPartitionStorage getMvStorage() { return storage.getStorage(); } + /** + * Returns safe timestamp. + */ + @TestOnly + public PendingComparableValuesTracker<HybridTimestamp, Void> getSafeTime() { Review Comment: We usually don't use `get` prefixes. How about `safeTime()` or `safeTimeTracker()` (to distinguish it from actual safe time timestamps)? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java: ########## @@ -31,28 +32,40 @@ public PartitionCommandsMarshallerImpl(MessageSerializationRegistry serializatio super(serializationRegistry, cache); } + @Override + public void patch(ByteBuffer raw, HybridTimestamp safeTs) { + ByteBuffer dup = raw.duplicate().order(ORDER); + dup.putLong(4, safeTs.longValue()); + } + @Override protected void beforeWriteMessage(Object o, ByteBuffer buffer) { int requiredCatalogVersion = o instanceof CatalogVersionAware ? ((CatalogVersionAware) o).requiredCatalogVersion() : NO_VERSION_REQUIRED; stream.setBuffer(buffer); - stream.writeInt(requiredCatalogVersion); + stream.writeFixedInt(requiredCatalogVersion); + stream.writeFixedLong(0); Review Comment: Please add a comment saying that it allocates space for safe time ########## modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java: ########## @@ -256,14 +266,19 @@ private <T extends Command> T copyCommand(T cmd) { .commit(finishTxCommand.commit()) .partitionIds(finishTxCommand.partitionIds()) .commitTimestamp(finishTxCommand.commitTimestamp()) + .initiatorTime(finishTxCommand.initiatorTime()) + .safeTime(finishTxCommand.safeTime()) .build(); } else if (cmd instanceof WriteIntentSwitchCommand) { WriteIntentSwitchCommand writeIntentSwitchCommand = (WriteIntentSwitchCommand) cmd; return (T) PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommand() .txId(writeIntentSwitchCommand.txId()) + .initiatorTime(clock.now()) .commit(writeIntentSwitchCommand.commit()) .commitTimestamp(writeIntentSwitchCommand.commitTimestamp()) + .initiatorTime(writeIntentSwitchCommand.initiatorTime()) Review Comment: `initiatorTime` is set twice ########## modules/transactions/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java: ########## @@ -94,6 +99,10 @@ public boolean isPrimaryInPeersAndLearners() { return primaryInPeersAndLearners; } + public long safeTimestamp() { Review Comment: Please add a javadoc explaining what this is ########## modules/metastorage/build.gradle: ########## @@ -38,6 +39,7 @@ dependencies { implementation project(':ignite-failure-handler') implementation project(':ignite-metrics') implementation project(':ignite-system-disaster-recovery-api') + implementation project(':ignite-catalog') Review Comment: And this one? ########## modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java: ########## @@ -110,6 +110,35 @@ public void update(T newValue, @Nullable R futureResult) { } } + /** + * Strict update with reordering check. Always called from the same updater thread. Review Comment: Please explain in the javadoc what 'strict' means ########## modules/metastorage/build.gradle: ########## @@ -28,6 +28,7 @@ dependencies { implementation project(':ignite-configuration-system') implementation project(':ignite-cluster-management') implementation project(':ignite-network-api') + implementation project(':ignite-network') Review Comment: Why was this added? ########## modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java: ########## @@ -889,7 +905,7 @@ public boolean bootstrap(final BootstrapOptions opts) throws InterruptedExceptio final long bootstrapLogTerm = opts.getLastLogIndex() > 0 ? 1 : 0; final LogId bootstrapId = new LogId(opts.getLastLogIndex(), bootstrapLogTerm); this.options = opts.getNodeOptions() == null ? new NodeOptions() : opts.getNodeOptions(); - this.clock = options.getClock(); + this.clock = this.options.getClock() == null ? new HybridClockImpl() : this.options.getClock(); Review Comment: Why do we need a possibility to have different clocks? If there is no clock in options, should we just skip all safetime-related logic? ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java: ########## @@ -447,6 +447,10 @@ public boolean startRaftNode( // Thread pools are shared by all raft groups. NodeOptions nodeOptions = opts.copy(); + // Then a new election starts on a node, it has local physical time higher than last generated safe ts Review Comment: ```suggestion // When a new election starts on a node, it has local physical time higher than last generated safe ts ``` ########## modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java: ########## @@ -44,7 +51,16 @@ public interface HybridClock { HybridTimestamp now(); /** - * Gets a current timestamp. It is a fast way to get timestamp because it doesn't have to tick the logical part of the clock. + * Creates a timestamp for new event. A timestamp is guarantied to be unique and monotonically grown and follow the causal. + * + * @param causal The causal timestamp. + * + * @return The hybrid timestamp. + */ + HybridTimestamp now(HybridTimestamp causal); Review Comment: Do I understand correctly that this new method is logically equivalent to `update()` followed by `now()`? If so, is it worth adding at all? It is only used in one place, and the corresponding `nowLong()` is not used externally at all. ########## modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java: ########## @@ -119,6 +120,8 @@ public void testRemoveCommand() throws Exception { .rowUuid(UUID.randomUUID()) .txId(TestTransactionIds.newTransactionId()) .txCoordinatorId(UUID.randomUUID()) + .txCoordinatorId(UUID.randomUUID()) Review Comment: A duplicated line ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java: ########## @@ -241,30 +215,46 @@ private void onWriteBusy(Iterator<CommandClosure<WriteCommand>> iterator) { storage.acquirePartitionSnapshotsReadLock(); try { + boolean[] applied = {false}; + if (command instanceof UpdateCommand) { - result = handleUpdateCommand((UpdateCommand) command, commandIndex, commandTerm); + result = handleUpdateCommand((UpdateCommand) command, commandIndex, commandTerm, applied); } else if (command instanceof UpdateAllCommand) { - result = handleUpdateAllCommand((UpdateAllCommand) command, commandIndex, commandTerm); + result = handleUpdateAllCommand((UpdateAllCommand) command, commandIndex, commandTerm, applied); } else if (command instanceof FinishTxCommand) { - result = handleFinishTxCommand((FinishTxCommand) command, commandIndex, commandTerm); + result = handleFinishTxCommand((FinishTxCommand) command, commandIndex, commandTerm, applied); } else if (command instanceof WriteIntentSwitchCommand) { - handleWriteIntentSwitchCommand((WriteIntentSwitchCommand) command, commandIndex, commandTerm); + handleWriteIntentSwitchCommand((WriteIntentSwitchCommand) command, commandIndex, commandTerm, applied); } else if (command instanceof SafeTimeSyncCommand) { - handleSafeTimeSyncCommand((SafeTimeSyncCommand) command, commandIndex, commandTerm); + handleSafeTimeSyncCommand((SafeTimeSyncCommand) command, commandIndex, commandTerm, applied); } else if (command instanceof BuildIndexCommand) { - handleBuildIndexCommand((BuildIndexCommand) command, commandIndex, commandTerm); + handleBuildIndexCommand((BuildIndexCommand) command, commandIndex, commandTerm, applied); } else if (command instanceof PrimaryReplicaChangeCommand) { - handlePrimaryReplicaChangeCommand((PrimaryReplicaChangeCommand) command, commandIndex, commandTerm); + handlePrimaryReplicaChangeCommand((PrimaryReplicaChangeCommand) command, commandIndex, commandTerm, applied); } else if (command instanceof VacuumTxStatesCommand) { - handleVacuumTxStatesCommand((VacuumTxStatesCommand) command, commandIndex, commandTerm); + handleVacuumTxStatesCommand((VacuumTxStatesCommand) command, commandIndex, commandTerm, applied); } else if (command instanceof UpdateMinimumActiveTxBeginTimeCommand) { - handleUpdateMinimalActiveTxTimeCommand((UpdateMinimumActiveTxBeginTimeCommand) command, commandIndex, commandTerm); + handleUpdateMinimalActiveTxTimeCommand((UpdateMinimumActiveTxBeginTimeCommand) command, commandIndex, commandTerm, + applied); } else { assert false : "Command was not found [cmd=" + command + ']'; } + + if (applied[0]) { Review Comment: This makes the code uglier than it was, just to work-around the necessity to avoid triggering the 'strict' check for monotonicity of safe time and index. I don't know how to improve this for now, but maybe we should think about it ########## modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java: ########## @@ -110,6 +110,35 @@ public void update(T newValue, @Nullable R futureResult) { } } + /** + * Strict update with reordering check. Always called from the same updater thread. + * + * @param newValue New value. + * @param futureResult A result that will be used to complete a future returned by the + * {@link PendingComparableValuesTracker#waitFor(Comparable)}. + */ + public void updateStrict(T newValue, @Nullable R futureResult) { Review Comment: It seems that this method cannot be used concurrently with other updating methods as there is no synchonization on `current` update. This looks dangerous as someone could try to use both methods. If this method is used in the context where the object is only updated via `updateStrict` and never via conventional `update`, maybe it would be better to just create another tracker class having just `updateStrict` (but not `update`) method? Or a `StrictPendingComparableValuesTracker` with just `update` working in a strict way? ########## modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java: ########## @@ -296,6 +302,23 @@ public void onEvent(final LogEntryAndClosure event, final long sequence, final b return; } + // Patch the command. + if (event.done instanceof CommandClosure) { + CommandClosure<?> cmd = (CommandClosure<?>) event.done; + Command command = cmd.command(); + + // Tick once per batch. + if (safeTs == null) { Review Comment: It seems that `safeTs` field is only used in `LogEntryAndClosureHandler`, but it's declared in `NodeImpl`. How about moving it to `LogEntryAndClosureHandler`? ########## modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java: ########## @@ -151,6 +154,8 @@ public void testUpdateAllCommand() throws Exception { .messageRowsToUpdate(rowsToUpdate) .txId(UUID.randomUUID()) .txCoordinatorId(UUID.randomUUID()) + .txCoordinatorId(UUID.randomUUID()) Review Comment: Another duplicated line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org