rpuch commented on code in PR #4821:
URL: https://github.com/apache/ignite-3/pull/4821#discussion_r1883812229


##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -836,7 +836,7 @@ private RaftGroupOptions groupOptionsForPartition(boolean 
isVolatileStorage, @Nu
         }
 
         raftGroupOptions.snapshotStorageFactory(snapshotFactory);
-
+        raftGroupOptions.maxClockSkew((int) clockService.maxClockSkewMillis());

Review Comment:
   Why is `maxClockSkew` type `int`? We usually represent 'times in millis' as 
longs, this allows to avoid casting



##########
modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java:
##########
@@ -110,6 +110,35 @@ public void update(T newValue, @Nullable R futureResult) {
         }
     }
 
+    /**
+     * Strict update with reordering check. Always called from the same 
updater thread.
+     *
+     * @param newValue New value.
+     * @param futureResult A result that will be used to complete a future 
returned by the
+     *         {@link PendingComparableValuesTracker#waitFor(Comparable)}.
+     */
+    public void updateStrict(T newValue, @Nullable R futureResult) {
+        if (!busyLock.readLock().tryLock()) {
+            throw new TrackerClosedException();
+        }
+
+        try {
+            Map.Entry<T, @Nullable R> current = this.current;
+
+            IgniteBiTuple<T, @Nullable R> newEntry = new 
IgniteBiTuple<>(newValue, futureResult);
+
+            // Entries from the same batch receive equal safe timestamps.

Review Comment:
   This class is generic, it's about some comparable values and not just about 
timestamps. So the comment about 'safe timestamps' looks strange here. Also, 
what batches are mentioned here? Looks like some details leaked here from the 
context where this method is used.



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java:
##########
@@ -58,6 +58,11 @@ public class RaftGroupOptions {
      */
     private @Nullable Long externallyEnforcedConfigIndex;
 
+    /**
+     * Max clock skew in the replication group in milliseconds.
+     */
+    private int maxClockSkew;

Review Comment:
   ```suggestion
       private int maxClockSkewMs;
   ```
   This will allow to make usages obvious wrt the unit



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -300,10 +279,10 @@ private void 
onWriteBusy(Iterator<CommandClosure<WriteCommand>> iterator) {
      * @param commandIndex Index of the RAFT command.
      * @param commandTerm Term of the RAFT command.
      */
-    private UpdateCommandResult handleUpdateCommand(UpdateCommand cmd, long 
commandIndex, long commandTerm) {
+    private UpdateCommandResult handleUpdateCommand(UpdateCommand cmd, long 
commandIndex, long commandTerm, boolean[] applied) {
         // Skips the write command because the storage has already executed it.
         if (commandIndex <= storage.lastAppliedIndex()) {
-            return new UpdateCommandResult(true, isPrimaryInGroupTopology());
+            return null;

Review Comment:
   Is it ok to return `null` here? The method is not declared as returning 
something nullable. If it's ok to return `null`, please annotate the method 
and/or add a comment explaining why it's ok



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -1131,7 +1131,6 @@ private void 
sendSafeTimeSyncIfReplicaReady(CompletableFuture<Replica> replicaFu
 
         ReplicaSafeTimeSyncRequest req = 
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
                 .groupId(toReplicationGroupIdMessage(replica.groupId()))
-                .proposedSafeTime(proposedSafeTime)

Review Comment:
   Let's remove `proposedSafeTime` from parameters of this method and all 
methods that pass it here



##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxObservableTimePropagationTest.java:
##########
@@ -17,34 +17,49 @@
 
 package org.apache.ignite.distributed;
 
+import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.distributed.ItTxTestCluster.NODE_PORT_BASE;
 import static 
org.apache.ignite.internal.tx.impl.ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
-import java.util.Collection;
+import java.util.List;
 import org.apache.ignite.internal.TestHybridClock;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.table.TxInfrastructureTest;
+import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
 import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.internal.tx.TxStateMeta;
-import org.apache.ignite.internal.tx.impl.TxManagerImpl;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Tuple;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
- * Tests if commit timestamp is propagated to observable time correctly.
+ * Tests if commit timestamp and safe timestamp are monotonically grow on 
leader change.

Review Comment:
   ```suggestion
    * Tests if commit timestamp and safe timestamp monotonically grow on leader 
change.
   ```



##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java:
##########
@@ -285,8 +289,10 @@ private class LogEntryAndClosureHandler implements 
EventHandler<LogEntryAndClosu
         // task list for batch
         private final List<LogEntryAndClosure> tasks = new 
ArrayList<>(NodeImpl.this.raftOptions.getApplyBatch());
 
+        private @Nullable HybridTimestamp safeTs = null;

Review Comment:
   Safe time doesn't seem to belong to the Raft protocol. Should we track it 
elsewhere, not in JRaft core code?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1956,15 +1935,13 @@ private 
CompletableFuture<WriteIntentSwitchReplicatedInfo> applyWriteIntentSwitc
                 indexIdsAtRwTxBeginTs(transactionId)
         );
 
-        CompletableFuture<Object> resultFuture = new CompletableFuture<>();
-
-        applyCmdWithRetryOnSafeTimeReorderException(wiSwitchCmd, resultFuture);
-
-        return resultFuture
+        return applyCmdWithExceptionHandling(wiSwitchCmd)
                 .exceptionally(e -> {
                     LOG.warn("Failed to complete transaction cleanup command 
[txId=" + transactionId + ']', e);
 
-                    return nullCompletedFuture();
+                    ExceptionUtils.sneakyThrow(e);

Review Comment:
   If the exception is rethrown anyway, we don't need `exceptionally()`. But 
why is this changed to throwing the exception?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/marshaller/PartitionCommandsMarshallerImpl.java:
##########
@@ -31,39 +32,51 @@ public 
PartitionCommandsMarshallerImpl(MessageSerializationRegistry serializatio
         super(serializationRegistry, cache);
     }
 
+    @Override
+    public void patch(ByteBuffer raw, HybridTimestamp safeTs) {
+        ByteBuffer dup = raw.duplicate().order(ORDER);
+        dup.putLong(4, safeTs.longValue());
+    }
+
     @Override
     protected void beforeWriteMessage(Object o, ByteBuffer buffer) {
         int requiredCatalogVersion = o instanceof CatalogVersionAware
                 ? ((CatalogVersionAware) o).requiredCatalogVersion()
-                : NO_VERSION_REQUIRED;
+                : PartitionCommandsMarshaller.NO_VERSION_REQUIRED;
 
         stream.setBuffer(buffer);
-        stream.writeInt(requiredCatalogVersion);
+        stream.writeFixedInt(requiredCatalogVersion);

Review Comment:
   Please add a comment on why fixed ints are used (probably, to make it 
patchable?)



##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java:
##########
@@ -121,6 +119,8 @@ public interface TimeoutNowRequest extends Message {
         String peerId();
 
         long term();
+
+        @Nullable HybridTimestamp timestamp();

Review Comment:
   Why is it nullable? `Replicator` seems to always send a timeout



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -627,19 +592,29 @@ public MvPartitionStorage getMvStorage() {
         return storage.getStorage();
     }
 
+    /**
+     * Returns safe timestamp.
+     */
+    @TestOnly
+    public PendingComparableValuesTracker<HybridTimestamp, Void> getSafeTime() 
{

Review Comment:
   We usually don't use `get` prefixes. How about `safeTime()` or 
`safeTimeTracker()` (to distinguish it from actual safe time timestamps)?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java:
##########
@@ -31,28 +32,40 @@ public 
PartitionCommandsMarshallerImpl(MessageSerializationRegistry serializatio
         super(serializationRegistry, cache);
     }
 
+    @Override
+    public void patch(ByteBuffer raw, HybridTimestamp safeTs) {
+        ByteBuffer dup = raw.duplicate().order(ORDER);
+        dup.putLong(4, safeTs.longValue());
+    }
+
     @Override
     protected void beforeWriteMessage(Object o, ByteBuffer buffer) {
         int requiredCatalogVersion = o instanceof CatalogVersionAware
                 ? ((CatalogVersionAware) o).requiredCatalogVersion()
                 : NO_VERSION_REQUIRED;
 
         stream.setBuffer(buffer);
-        stream.writeInt(requiredCatalogVersion);
+        stream.writeFixedInt(requiredCatalogVersion);
+        stream.writeFixedLong(0);

Review Comment:
   Please add a comment saying that it allocates space for safe time



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java:
##########
@@ -256,14 +266,19 @@ private <T extends Command> T copyCommand(T cmd) {
                     .commit(finishTxCommand.commit())
                     .partitionIds(finishTxCommand.partitionIds())
                     .commitTimestamp(finishTxCommand.commitTimestamp())
+                    .initiatorTime(finishTxCommand.initiatorTime())
+                    .safeTime(finishTxCommand.safeTime())
                     .build();
         } else if (cmd instanceof WriteIntentSwitchCommand) {
             WriteIntentSwitchCommand writeIntentSwitchCommand = 
(WriteIntentSwitchCommand) cmd;
 
             return (T) 
PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommand()
                     .txId(writeIntentSwitchCommand.txId())
+                    .initiatorTime(clock.now())
                     .commit(writeIntentSwitchCommand.commit())
                     
.commitTimestamp(writeIntentSwitchCommand.commitTimestamp())
+                    .initiatorTime(writeIntentSwitchCommand.initiatorTime())

Review Comment:
   `initiatorTime` is set twice



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java:
##########
@@ -94,6 +99,10 @@ public boolean isPrimaryInPeersAndLearners() {
         return primaryInPeersAndLearners;
     }
 
+    public long safeTimestamp() {

Review Comment:
   Please add a javadoc explaining what this is



##########
modules/metastorage/build.gradle:
##########
@@ -38,6 +39,7 @@ dependencies {
     implementation project(':ignite-failure-handler')
     implementation project(':ignite-metrics')
     implementation project(':ignite-system-disaster-recovery-api')
+    implementation project(':ignite-catalog')

Review Comment:
   And this one?



##########
modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java:
##########
@@ -110,6 +110,35 @@ public void update(T newValue, @Nullable R futureResult) {
         }
     }
 
+    /**
+     * Strict update with reordering check. Always called from the same 
updater thread.

Review Comment:
   Please explain in the javadoc what 'strict' means



##########
modules/metastorage/build.gradle:
##########
@@ -28,6 +28,7 @@ dependencies {
     implementation project(':ignite-configuration-system')
     implementation project(':ignite-cluster-management')
     implementation project(':ignite-network-api')
+    implementation project(':ignite-network')

Review Comment:
   Why was this added?



##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java:
##########
@@ -889,7 +905,7 @@ public boolean bootstrap(final BootstrapOptions opts) 
throws InterruptedExceptio
         final long bootstrapLogTerm = opts.getLastLogIndex() > 0 ? 1 : 0;
         final LogId bootstrapId = new LogId(opts.getLastLogIndex(), 
bootstrapLogTerm);
         this.options = opts.getNodeOptions() == null ? new NodeOptions() : 
opts.getNodeOptions();
-        this.clock = options.getClock();
+        this.clock = this.options.getClock() == null ? new HybridClockImpl() : 
this.options.getClock();

Review Comment:
   Why do we need a possibility to have different clocks? If there is no clock 
in options, should we just skip all safetime-related logic?



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java:
##########
@@ -447,6 +447,10 @@ public boolean startRaftNode(
             // Thread pools are shared by all raft groups.
             NodeOptions nodeOptions = opts.copy();
 
+            // Then a new election starts on a node, it has local physical 
time higher than last generated safe ts

Review Comment:
   ```suggestion
               // When a new election starts on a node, it has local physical 
time higher than last generated safe ts
   ```



##########
modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java:
##########
@@ -44,7 +51,16 @@ public interface HybridClock {
     HybridTimestamp now();
 
     /**
-     * Gets a current timestamp. It is a fast way to get timestamp because it 
doesn't have to tick the logical part of the clock.
+     * Creates a timestamp for new event. A timestamp is guarantied to be 
unique and monotonically grown and follow the causal.
+     *
+     * @param causal The causal timestamp.
+     *
+     * @return The hybrid timestamp.
+     */
+    HybridTimestamp now(HybridTimestamp causal);

Review Comment:
   Do I understand correctly that this new method is logically equivalent to 
`update()` followed by `now()`? If so, is it worth adding at all? It is only 
used in one place, and the corresponding `nowLong()` is not used externally at 
all.



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java:
##########
@@ -119,6 +120,8 @@ public void testRemoveCommand() throws Exception {
                 .rowUuid(UUID.randomUUID())
                 .txId(TestTransactionIds.newTransactionId())
                 .txCoordinatorId(UUID.randomUUID())
+                .txCoordinatorId(UUID.randomUUID())

Review Comment:
   A duplicated line



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -241,30 +215,46 @@ private void 
onWriteBusy(Iterator<CommandClosure<WriteCommand>> iterator) {
             storage.acquirePartitionSnapshotsReadLock();
 
             try {
+                boolean[] applied = {false};
+
                 if (command instanceof UpdateCommand) {
-                    result = handleUpdateCommand((UpdateCommand) command, 
commandIndex, commandTerm);
+                    result = handleUpdateCommand((UpdateCommand) command, 
commandIndex, commandTerm, applied);
                 } else if (command instanceof UpdateAllCommand) {
-                    result = handleUpdateAllCommand((UpdateAllCommand) 
command, commandIndex, commandTerm);
+                    result = handleUpdateAllCommand((UpdateAllCommand) 
command, commandIndex, commandTerm, applied);
                 } else if (command instanceof FinishTxCommand) {
-                    result = handleFinishTxCommand((FinishTxCommand) command, 
commandIndex, commandTerm);
+                    result = handleFinishTxCommand((FinishTxCommand) command, 
commandIndex, commandTerm, applied);
                 } else if (command instanceof WriteIntentSwitchCommand) {
-                    handleWriteIntentSwitchCommand((WriteIntentSwitchCommand) 
command, commandIndex, commandTerm);
+                    handleWriteIntentSwitchCommand((WriteIntentSwitchCommand) 
command, commandIndex, commandTerm, applied);
                 } else if (command instanceof SafeTimeSyncCommand) {
-                    handleSafeTimeSyncCommand((SafeTimeSyncCommand) command, 
commandIndex, commandTerm);
+                    handleSafeTimeSyncCommand((SafeTimeSyncCommand) command, 
commandIndex, commandTerm, applied);
                 } else if (command instanceof BuildIndexCommand) {
-                    handleBuildIndexCommand((BuildIndexCommand) command, 
commandIndex, commandTerm);
+                    handleBuildIndexCommand((BuildIndexCommand) command, 
commandIndex, commandTerm, applied);
                 } else if (command instanceof PrimaryReplicaChangeCommand) {
-                    
handlePrimaryReplicaChangeCommand((PrimaryReplicaChangeCommand) command, 
commandIndex, commandTerm);
+                    
handlePrimaryReplicaChangeCommand((PrimaryReplicaChangeCommand) command, 
commandIndex, commandTerm, applied);
                 } else if (command instanceof VacuumTxStatesCommand) {
-                    handleVacuumTxStatesCommand((VacuumTxStatesCommand) 
command, commandIndex, commandTerm);
+                    handleVacuumTxStatesCommand((VacuumTxStatesCommand) 
command, commandIndex, commandTerm, applied);
                 } else if (command instanceof 
UpdateMinimumActiveTxBeginTimeCommand) {
-                    
handleUpdateMinimalActiveTxTimeCommand((UpdateMinimumActiveTxBeginTimeCommand) 
command, commandIndex, commandTerm);
+                    
handleUpdateMinimalActiveTxTimeCommand((UpdateMinimumActiveTxBeginTimeCommand) 
command, commandIndex, commandTerm,
+                            applied);
                 } else {
                     assert false : "Command was not found [cmd=" + command + 
']';
                 }
+
+                if (applied[0]) {

Review Comment:
   This makes the code uglier than it was, just to work-around the necessity to 
avoid triggering the 'strict' check for monotonicity of safe time and index. I 
don't know how to improve this for now, but maybe we should think about it



##########
modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java:
##########
@@ -110,6 +110,35 @@ public void update(T newValue, @Nullable R futureResult) {
         }
     }
 
+    /**
+     * Strict update with reordering check. Always called from the same 
updater thread.
+     *
+     * @param newValue New value.
+     * @param futureResult A result that will be used to complete a future 
returned by the
+     *         {@link PendingComparableValuesTracker#waitFor(Comparable)}.
+     */
+    public void updateStrict(T newValue, @Nullable R futureResult) {

Review Comment:
   It seems that this method cannot be used concurrently with other updating 
methods as there is no synchonization on `current` update. This looks dangerous 
as someone could try to use both methods. If this method is used in the context 
where the object is only updated via `updateStrict` and never via conventional 
`update`, maybe it would be better to just create another tracker class having 
just `updateStrict` (but not `update`) method? Or a 
`StrictPendingComparableValuesTracker` with just `update` working in a strict 
way?



##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java:
##########
@@ -296,6 +302,23 @@ public void onEvent(final LogEntryAndClosure event, final 
long sequence, final b
                 return;
             }
 
+            // Patch the command.
+            if (event.done instanceof CommandClosure) {
+                CommandClosure<?> cmd = (CommandClosure<?>) event.done;
+                Command command = cmd.command();
+
+                // Tick once per batch.
+                if (safeTs == null) {

Review Comment:
   It seems that `safeTs` field is only used in `LogEntryAndClosureHandler`, 
but it's declared in `NodeImpl`. How about moving it to 
`LogEntryAndClosureHandler`?



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java:
##########
@@ -151,6 +154,8 @@ public void testUpdateAllCommand() throws Exception {
                 .messageRowsToUpdate(rowsToUpdate)
                 .txId(UUID.randomUUID())
                 .txCoordinatorId(UUID.randomUUID())
+                .txCoordinatorId(UUID.randomUUID())

Review Comment:
   Another duplicated line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to