sanpwc commented on code in PR #4684:
URL: https://github.com/apache/ignite-3/pull/4684#discussion_r1832869228


##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -1086,16 +1097,64 @@ private void idleSafeTimeSync() {
         }
     }
 
-    private void sendSafeTimeSyncIfReplicaReady(CompletableFuture<Replica> 
replicaFuture) {
-        if (isCompletedSuccessfully(replicaFuture)) {
-            Replica replica = replicaFuture.join();
+    private void sendSafeTimeSyncIfReplicaReady(CompletableFuture<Replica> 
replicaFuture, HybridTimestamp proposedSafeTime) {
+        if (!isCompletedSuccessfully(replicaFuture)) {
+            return;
+        }
 
-            ReplicaSafeTimeSyncRequest req = 
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
-                    .groupId(toReplicationGroupIdMessage(replica.groupId()))
-                    .build();
+        Replica replica = replicaFuture.join();
 
-            replica.processRequest(req, localNodeId);
+        if (!shouldAdvanceIdleSafeTime(replica, proposedSafeTime)) {
+            // If previous attempt might still be waiting on the Metastorage 
SafeTime, we should not send the command ourselves as this
+            // might be an indicator that Metastorage SafeTime has stuck for 
some time; if we send the command, it will have to add its
+            // future, increasing (most probably, uselessly) heap pressure.
+            return;
         }
+
+        getOrCreateContext(replica).lastIdleSafeTimeProposal = 
proposedSafeTime;
+
+        ReplicaSafeTimeSyncRequest req = 
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
+                .groupId(toReplicationGroupIdMessage(replica.groupId()))
+                .proposedSafeTime(proposedSafeTime)
+                .build();
+
+        replica.processRequest(req, localNodeId).whenComplete((res, ex) -> {
+            if (ex != null) {
+                LOG.error("Could not advance safe time for {} to {}", 
replica.groupId(), proposedSafeTime);

Review Comment:
   Should we also log an exception? Debug will be difficult without that I 
guess.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -1086,16 +1097,64 @@ private void idleSafeTimeSync() {
         }
     }
 
-    private void sendSafeTimeSyncIfReplicaReady(CompletableFuture<Replica> 
replicaFuture) {
-        if (isCompletedSuccessfully(replicaFuture)) {
-            Replica replica = replicaFuture.join();
+    private void sendSafeTimeSyncIfReplicaReady(CompletableFuture<Replica> 
replicaFuture, HybridTimestamp proposedSafeTime) {
+        if (!isCompletedSuccessfully(replicaFuture)) {
+            return;
+        }
 
-            ReplicaSafeTimeSyncRequest req = 
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
-                    .groupId(toReplicationGroupIdMessage(replica.groupId()))
-                    .build();
+        Replica replica = replicaFuture.join();
 
-            replica.processRequest(req, localNodeId);
+        if (!shouldAdvanceIdleSafeTime(replica, proposedSafeTime)) {
+            // If previous attempt might still be waiting on the Metastorage 
SafeTime, we should not send the command ourselves as this
+            // might be an indicator that Metastorage SafeTime has stuck for 
some time; if we send the command, it will have to add its
+            // future, increasing (most probably, uselessly) heap pressure.
+            return;
         }
+
+        getOrCreateContext(replica).lastIdleSafeTimeProposal = 
proposedSafeTime;
+
+        ReplicaSafeTimeSyncRequest req = 
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
+                .groupId(toReplicationGroupIdMessage(replica.groupId()))
+                .proposedSafeTime(proposedSafeTime)
+                .build();
+
+        replica.processRequest(req, localNodeId).whenComplete((res, ex) -> {
+            if (ex != null) {
+                LOG.error("Could not advance safe time for {} to {}", 
replica.groupId(), proposedSafeTime);
+            }
+        });
+    }
+
+    private boolean shouldAdvanceIdleSafeTime(Replica replica, HybridTimestamp 
proposedSafeTime) {
+        if (placementDriver.getCurrentPrimaryReplica(replica.groupId(), 
proposedSafeTime) != null) {
+            // We know a primary is guaranteed to be valid at the proposed 
safe time, so, when processing the SafeTime propagation request,
+            // we'll not need to wait for Metastorage SafeTime, so processing 
will be cheap.
+            return true;

Review Comment:
   Curious, why not to awaite previous request to succeed if 
getPrimaryReplica(proceposedSafeTime) != null?



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -259,6 +264,7 @@ public ReplicaManager(
             RaftManager raftManager,
             RaftGroupOptionsConfigurer partitionRaftConfigurer,
             LogStorageFactoryCreator volatileLogStorageFactoryCreator,
+            ClusterTime clusterTime,

Review Comment:
   We should be consistent. Either all params should be mentioned or none.



##########
modules/replicator/build.gradle:
##########
@@ -37,6 +37,7 @@ dependencies {
     implementation project(':ignite-placement-driver-api')
     implementation project(':ignite-failure-handler')
     implementation project(':ignite-partition-distribution')
+    implementation project(':ignite-metastorage-api')

Review Comment:
   > Is this still bad?
   
   I guess so. Logically replicator should not depend on MG in any kind.



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -1086,16 +1097,64 @@ private void idleSafeTimeSync() {
         }
     }
 
-    private void sendSafeTimeSyncIfReplicaReady(CompletableFuture<Replica> 
replicaFuture) {
-        if (isCompletedSuccessfully(replicaFuture)) {
-            Replica replica = replicaFuture.join();
+    private void sendSafeTimeSyncIfReplicaReady(CompletableFuture<Replica> 
replicaFuture, HybridTimestamp proposedSafeTime) {
+        if (!isCompletedSuccessfully(replicaFuture)) {
+            return;
+        }
 
-            ReplicaSafeTimeSyncRequest req = 
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
-                    .groupId(toReplicationGroupIdMessage(replica.groupId()))
-                    .build();
+        Replica replica = replicaFuture.join();
 
-            replica.processRequest(req, localNodeId);
+        if (!shouldAdvanceIdleSafeTime(replica, proposedSafeTime)) {
+            // If previous attempt might still be waiting on the Metastorage 
SafeTime, we should not send the command ourselves as this
+            // might be an indicator that Metastorage SafeTime has stuck for 
some time; if we send the command, it will have to add its
+            // future, increasing (most probably, uselessly) heap pressure.
+            return;
         }
+
+        getOrCreateContext(replica).lastIdleSafeTimeProposal = 
proposedSafeTime;
+
+        ReplicaSafeTimeSyncRequest req = 
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
+                .groupId(toReplicationGroupIdMessage(replica.groupId()))
+                .proposedSafeTime(proposedSafeTime)
+                .build();
+
+        replica.processRequest(req, localNodeId).whenComplete((res, ex) -> {
+            if (ex != null) {
+                LOG.error("Could not advance safe time for {} to {}", 
replica.groupId(), proposedSafeTime);
+            }
+        });
+    }
+
+    private boolean shouldAdvanceIdleSafeTime(Replica replica, HybridTimestamp 
proposedSafeTime) {
+        if (placementDriver.getCurrentPrimaryReplica(replica.groupId(), 
proposedSafeTime) != null) {
+            // We know a primary is guaranteed to be valid at the proposed 
safe time, so, when processing the SafeTime propagation request,
+            // we'll not need to wait for Metastorage SafeTime, so processing 
will be cheap.
+            return true;
+        }
+
+        // Ok, an attempt to advance idle safe time might cause a wait on 
Metastorage SafeTime.
+        // Let's see: if Metastorage SafeTime is still not enough to avoid 
waiting on it even for our PREVIOUS attempt to advance
+        // partition SafeTime, then it makes sense to skip the current attempt 
(as the future from the previous attempt is still there
+        // and takes memory; if we make the current attempt, we are guaranteed 
to add another future). There will be next attempt
+        // to advance the partition SafeTime.
+
+        ReplicaContext context = getOrCreateContext(replica);
+
+        HybridTimestamp lastIdleSafeTimeProposal = 
context.lastIdleSafeTimeProposal;
+        if (lastIdleSafeTimeProposal == null) {
+            // No previous attempt, we have to do it ourselves.
+            return true;
+        }
+
+        // This is the Metastorage SafeTime that was needed to be achieved for 
previous attempt to check that this node is still a primary.
+        // If it's already achieved, then previous attempt is unblocked (most 
probably already finished), so we should proceed.
+        HybridTimestamp requiredMsSafeTime = 
lastIdleSafeTimeProposal.addPhysicalTime(clockService.maxClockSkewMillis());
+
+        return clusterTime.currentSafeTime().compareTo(requiredMsSafeTime) >= 
0;
+    }
+
+    private ReplicaContext getOrCreateContext(Replica replica) {
+        return contexts.computeIfAbsent(replica.groupId(), k -> new 
ReplicaContext());

Review Comment:
   Who is responsible for contexts map cleanup?



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -1086,16 +1097,64 @@ private void idleSafeTimeSync() {
         }
     }
 
-    private void sendSafeTimeSyncIfReplicaReady(CompletableFuture<Replica> 
replicaFuture) {
-        if (isCompletedSuccessfully(replicaFuture)) {
-            Replica replica = replicaFuture.join();
+    private void sendSafeTimeSyncIfReplicaReady(CompletableFuture<Replica> 
replicaFuture, HybridTimestamp proposedSafeTime) {
+        if (!isCompletedSuccessfully(replicaFuture)) {
+            return;
+        }
 
-            ReplicaSafeTimeSyncRequest req = 
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
-                    .groupId(toReplicationGroupIdMessage(replica.groupId()))
-                    .build();
+        Replica replica = replicaFuture.join();
 
-            replica.processRequest(req, localNodeId);
+        if (!shouldAdvanceIdleSafeTime(replica, proposedSafeTime)) {
+            // If previous attempt might still be waiting on the Metastorage 
SafeTime, we should not send the command ourselves as this
+            // might be an indicator that Metastorage SafeTime has stuck for 
some time; if we send the command, it will have to add its
+            // future, increasing (most probably, uselessly) heap pressure.
+            return;
         }
+
+        getOrCreateContext(replica).lastIdleSafeTimeProposal = 
proposedSafeTime;

Review Comment:
   In that case, seems that the only reason why we don't have races is the fact 
that scheduledIdleSafeTimeSyncExecutor is single treaded. It it's true, could 
you please add a comment somewhere near scheduledIdleSafeTimeSyncExecutor 
initialization denoting that it **should** be single threaded. 



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -1086,16 +1097,64 @@ private void idleSafeTimeSync() {
         }
     }
 
-    private void sendSafeTimeSyncIfReplicaReady(CompletableFuture<Replica> 
replicaFuture) {
-        if (isCompletedSuccessfully(replicaFuture)) {
-            Replica replica = replicaFuture.join();
+    private void sendSafeTimeSyncIfReplicaReady(CompletableFuture<Replica> 
replicaFuture, HybridTimestamp proposedSafeTime) {
+        if (!isCompletedSuccessfully(replicaFuture)) {
+            return;
+        }
 
-            ReplicaSafeTimeSyncRequest req = 
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
-                    .groupId(toReplicationGroupIdMessage(replica.groupId()))
-                    .build();
+        Replica replica = replicaFuture.join();
 
-            replica.processRequest(req, localNodeId);
+        if (!shouldAdvanceIdleSafeTime(replica, proposedSafeTime)) {
+            // If previous attempt might still be waiting on the Metastorage 
SafeTime, we should not send the command ourselves as this
+            // might be an indicator that Metastorage SafeTime has stuck for 
some time; if we send the command, it will have to add its
+            // future, increasing (most probably, uselessly) heap pressure.
+            return;
         }
+
+        getOrCreateContext(replica).lastIdleSafeTimeProposal = 
proposedSafeTime;
+
+        ReplicaSafeTimeSyncRequest req = 
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
+                .groupId(toReplicationGroupIdMessage(replica.groupId()))
+                .proposedSafeTime(proposedSafeTime)
+                .build();
+
+        replica.processRequest(req, localNodeId).whenComplete((res, ex) -> {
+            if (ex != null) {
+                LOG.error("Could not advance safe time for {} to {}", 
replica.groupId(), proposedSafeTime);
+            }
+        });
+    }
+
+    private boolean shouldAdvanceIdleSafeTime(Replica replica, HybridTimestamp 
proposedSafeTime) {
+        if (placementDriver.getCurrentPrimaryReplica(replica.groupId(), 
proposedSafeTime) != null) {

Review Comment:
   I believe that we should move this check to the bottom and calculate 
proposed safeTime in front of it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to