sanpwc commented on code in PR #4543:
URL: https://github.com/apache/ignite-3/pull/4543#discussion_r1799835792


##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java:
##########
@@ -556,167 +557,172 @@ private <R extends NetworkMessage> CompletableFuture<R> 
sendWithRetry(
     ) {
         var future = new CompletableFuture<R>();
 
-        sendWithRetry(peer, requestFactory, currentTimeMillis() + 
configuration.retryTimeout().value(), future, 1);
+        var context = new RetryContext(peer, requestFactory, 
currentTimeMillis() + configuration.retryTimeout().value(), 0);
+
+        sendWithRetry(future, context);
 
         return future;
     }
 
     /**
      * Retries a request until success or timeout.
      *
-     * @param peer Initial target peer, request can be sent to a random peer 
if the target peer is unavailable.
-     * @param requestFactory Factory for creating requests to the target peer.
-     * @param stopTime Stop time.
-     * @param fut The future.
+     * @param fut Result future.
+     * @param retryContext Context.
      * @param <R> Response type.
-     * @param retryCount Number of retries made. sendWithRetry method has a 
recursion nature, in case of recoverable exceptions or peer
-     *     unavailability it'll be scheduled for a next attempt. Generally a 
request will be retried until success or timeout.
      */
-    private <R extends NetworkMessage> void sendWithRetry(
-            Peer peer,
-            Function<Peer, ? extends NetworkMessage> requestFactory,
-            long stopTime,
-            CompletableFuture<R> fut,
-            int retryCount
-
-    ) {
+    private <R extends NetworkMessage> void sendWithRetry(CompletableFuture<R> 
fut, RetryContext retryContext) {
         if (!busyLock.enterBusy()) {
             fut.cancel(true);
 
             return;
         }
 
         try {
-            if (currentTimeMillis() >= stopTime) {
-                fut.completeExceptionally(
-                        new TimeoutException(format("Send with retry timed out 
[retryCount = {}, groupId = {}].", retryCount, groupId)));
+            if (currentTimeMillis() >= retryContext.stopTime()) {
+                fut.completeExceptionally(new TimeoutException(format(
+                        "Send with retry timed out [retryCount = {}, groupId = 
{}].",
+                        retryContext.retryCount(),
+                        groupId
+                )));
 
                 return;
             }
 
-            NetworkMessage request = requestFactory.apply(peer);
+            NetworkMessage request = retryContext.request();
 
-            resolvePeer(peer)
+            resolvePeer(retryContext.targetPeer())
                     .thenCompose(node -> 
cluster.messagingService().invoke(node, request, 
configuration.responseTimeout().value()))
                     .whenComplete((resp, err) -> {
                         if (LOG.isTraceEnabled()) {
                             LOG.trace("sendWithRetry req={} resp={} from={} 
to={} err={}",
                                     request,
                                     resp,
                                     
cluster.topologyService().localMember().address(),
-                                    peer.consistentId(),
+                                    retryContext.targetPeer().consistentId(),
                                     err == null ? null : err.getMessage());
                         }
 
-                        if (err != null) {
-                            handleThrowable(err, peer, request, 
requestFactory, stopTime, fut, retryCount);
-                        } else if (resp instanceof ErrorResponse) {
-                            handleErrorResponse((ErrorResponse) resp, peer, 
request, requestFactory, stopTime, fut, retryCount);
-                        } else if (resp instanceof SMErrorResponse) {
-                            handleSmErrorResponse((SMErrorResponse) resp, fut);
-                        } else {
-                            leader = peer; // The OK response was received 
from a leader.
-
-                            fut.complete((R) resp);
+                        try {
+                            if (err != null) {
+                                handleThrowable(fut, err, retryContext);
+                            } else if (resp instanceof ErrorResponse) {
+                                handleErrorResponse(fut, (ErrorResponse) resp, 
retryContext);
+                            } else if (resp instanceof SMErrorResponse) {
+                                handleSmErrorResponse(fut, (SMErrorResponse) 
resp);
+                            } else {
+                                leader = retryContext.targetPeer(); // The OK 
response was received from a leader.
+
+                                fut.complete((R) resp);
+                            }
+                        } catch (Throwable e) {
+                            fut.completeExceptionally(e);

Review Comment:
   Well, following will contradicts with what I previously wrote. Sorry for 
that. For now I believe that we should:
   - Retry the operation until timeout.
   - Exclude dead nodes.
   - If all nodes are touched - clean up excluded nodes and retry.
   - On success - clean up excluded nodes.
   
   Example 1:
   0. peers(A,B,C)
   1. touch A -> ENODESHUTDOWN -> excluded(A)
   2. touch random(B,C) -> B -> ENODESHUTDOWN -> excluded(B)
   3. touch random(C) -> C -> ENODESHUTDOWN -> excluded(C) -> excluded.cleanup()
   4. touch random(A,B,C)
   5. ...
   
   Example 2: 
   0. peers(A,B,C)
   1. touch A -> ENODESHUTDOWN -> excluded(A)
   2. touch random(B,C) -> B -> leader -> excluded.cleanup()
   
   Why it's important?
   It's quite common to update peers on raft rebalance. E.g. raft was moved 
from (A,B,C) to (D,E,F)
   Raft client on A wasn't updated yet, and thus knows only about A peer. 
Instead of failing fast on A initial touch we should wait until timeout for 
raft client update. Within update we will see new peers (D,E,F) and will try to 
send request there.
   
   An open question is how to propagate failure context into TimeoutException. 
Probably we may extend TimeoutMessage with peers -> response mapping. Not sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to