sanpwc commented on code in PR #4543: URL: https://github.com/apache/ignite-3/pull/4543#discussion_r1799835792
########## modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java: ########## @@ -556,167 +557,172 @@ private <R extends NetworkMessage> CompletableFuture<R> sendWithRetry( ) { var future = new CompletableFuture<R>(); - sendWithRetry(peer, requestFactory, currentTimeMillis() + configuration.retryTimeout().value(), future, 1); + var context = new RetryContext(peer, requestFactory, currentTimeMillis() + configuration.retryTimeout().value(), 0); + + sendWithRetry(future, context); return future; } /** * Retries a request until success or timeout. * - * @param peer Initial target peer, request can be sent to a random peer if the target peer is unavailable. - * @param requestFactory Factory for creating requests to the target peer. - * @param stopTime Stop time. - * @param fut The future. + * @param fut Result future. + * @param retryContext Context. * @param <R> Response type. - * @param retryCount Number of retries made. sendWithRetry method has a recursion nature, in case of recoverable exceptions or peer - * unavailability it'll be scheduled for a next attempt. Generally a request will be retried until success or timeout. */ - private <R extends NetworkMessage> void sendWithRetry( - Peer peer, - Function<Peer, ? extends NetworkMessage> requestFactory, - long stopTime, - CompletableFuture<R> fut, - int retryCount - - ) { + private <R extends NetworkMessage> void sendWithRetry(CompletableFuture<R> fut, RetryContext retryContext) { if (!busyLock.enterBusy()) { fut.cancel(true); return; } try { - if (currentTimeMillis() >= stopTime) { - fut.completeExceptionally( - new TimeoutException(format("Send with retry timed out [retryCount = {}, groupId = {}].", retryCount, groupId))); + if (currentTimeMillis() >= retryContext.stopTime()) { + fut.completeExceptionally(new TimeoutException(format( + "Send with retry timed out [retryCount = {}, groupId = {}].", + retryContext.retryCount(), + groupId + ))); return; } - NetworkMessage request = requestFactory.apply(peer); + NetworkMessage request = retryContext.request(); - resolvePeer(peer) + resolvePeer(retryContext.targetPeer()) .thenCompose(node -> cluster.messagingService().invoke(node, request, configuration.responseTimeout().value())) .whenComplete((resp, err) -> { if (LOG.isTraceEnabled()) { LOG.trace("sendWithRetry req={} resp={} from={} to={} err={}", request, resp, cluster.topologyService().localMember().address(), - peer.consistentId(), + retryContext.targetPeer().consistentId(), err == null ? null : err.getMessage()); } - if (err != null) { - handleThrowable(err, peer, request, requestFactory, stopTime, fut, retryCount); - } else if (resp instanceof ErrorResponse) { - handleErrorResponse((ErrorResponse) resp, peer, request, requestFactory, stopTime, fut, retryCount); - } else if (resp instanceof SMErrorResponse) { - handleSmErrorResponse((SMErrorResponse) resp, fut); - } else { - leader = peer; // The OK response was received from a leader. - - fut.complete((R) resp); + try { + if (err != null) { + handleThrowable(fut, err, retryContext); + } else if (resp instanceof ErrorResponse) { + handleErrorResponse(fut, (ErrorResponse) resp, retryContext); + } else if (resp instanceof SMErrorResponse) { + handleSmErrorResponse(fut, (SMErrorResponse) resp); + } else { + leader = retryContext.targetPeer(); // The OK response was received from a leader. + + fut.complete((R) resp); + } + } catch (Throwable e) { + fut.completeExceptionally(e); Review Comment: Well, following will contradicts with what I previously wrote. Sorry for that. For now I believe that we should: - Retry the operation until timeout. - Exclude dead nodes. - If all nodes are touched - clean up excluded nodes and retry. - On success - clean up excluded nodes. Example 1: 0. peers(A,B,C) 1. touch A -> ENODESHUTDOWN -> excluded(A) 2. touch random(B,C) -> B -> ENODESHUTDOWN -> excluded(B) 3. touch random(C) -> C -> ENODESHUTDOWN -> excluded(C) -> excluded.cleanup() 4. touch random(A,B,C) 5. ... Example 2: 0. peers(A,B,C) 1. touch A -> ENODESHUTDOWN -> excluded(A) 2. touch random(B,C) -> B -> leader -> excluded.cleanup() Why it's important? It's quite common to update peers on raft rebalance. E.g. raft was moved from (A,B,C) to (D,E,F) Raft client on A wasn't updated yet, and thus knows only about A peer. Instead of failing fast on A initial touch we should wait until timeout for raft client update. Within update we will see new peers (D,E,F) and will try to send request there. An open question is how to propagate failure context into TimeoutException. Probably we may extend TimeoutMessage with peers -> response mapping. Not sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org