sanpwc commented on code in PR #4543: URL: https://github.com/apache/ignite-3/pull/4543#discussion_r1799775627
########## modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java: ########## @@ -556,167 +557,172 @@ private <R extends NetworkMessage> CompletableFuture<R> sendWithRetry( ) { var future = new CompletableFuture<R>(); - sendWithRetry(peer, requestFactory, currentTimeMillis() + configuration.retryTimeout().value(), future, 1); + var context = new RetryContext(peer, requestFactory, currentTimeMillis() + configuration.retryTimeout().value(), 0); + + sendWithRetry(future, context); return future; } /** * Retries a request until success or timeout. * - * @param peer Initial target peer, request can be sent to a random peer if the target peer is unavailable. - * @param requestFactory Factory for creating requests to the target peer. - * @param stopTime Stop time. - * @param fut The future. + * @param fut Result future. + * @param retryContext Context. * @param <R> Response type. - * @param retryCount Number of retries made. sendWithRetry method has a recursion nature, in case of recoverable exceptions or peer - * unavailability it'll be scheduled for a next attempt. Generally a request will be retried until success or timeout. */ - private <R extends NetworkMessage> void sendWithRetry( - Peer peer, - Function<Peer, ? extends NetworkMessage> requestFactory, - long stopTime, - CompletableFuture<R> fut, - int retryCount - - ) { + private <R extends NetworkMessage> void sendWithRetry(CompletableFuture<R> fut, RetryContext retryContext) { if (!busyLock.enterBusy()) { fut.cancel(true); return; } try { - if (currentTimeMillis() >= stopTime) { - fut.completeExceptionally( - new TimeoutException(format("Send with retry timed out [retryCount = {}, groupId = {}].", retryCount, groupId))); + if (currentTimeMillis() >= retryContext.stopTime()) { + fut.completeExceptionally(new TimeoutException(format( + "Send with retry timed out [retryCount = {}, groupId = {}].", + retryContext.retryCount(), + groupId + ))); return; } - NetworkMessage request = requestFactory.apply(peer); + NetworkMessage request = retryContext.request(); - resolvePeer(peer) + resolvePeer(retryContext.targetPeer()) .thenCompose(node -> cluster.messagingService().invoke(node, request, configuration.responseTimeout().value())) .whenComplete((resp, err) -> { if (LOG.isTraceEnabled()) { LOG.trace("sendWithRetry req={} resp={} from={} to={} err={}", request, resp, cluster.topologyService().localMember().address(), - peer.consistentId(), + retryContext.targetPeer().consistentId(), err == null ? null : err.getMessage()); } - if (err != null) { - handleThrowable(err, peer, request, requestFactory, stopTime, fut, retryCount); - } else if (resp instanceof ErrorResponse) { - handleErrorResponse((ErrorResponse) resp, peer, request, requestFactory, stopTime, fut, retryCount); - } else if (resp instanceof SMErrorResponse) { - handleSmErrorResponse((SMErrorResponse) resp, fut); - } else { - leader = peer; // The OK response was received from a leader. - - fut.complete((R) resp); + try { + if (err != null) { + handleThrowable(fut, err, retryContext); + } else if (resp instanceof ErrorResponse) { + handleErrorResponse(fut, (ErrorResponse) resp, retryContext); + } else if (resp instanceof SMErrorResponse) { + handleSmErrorResponse(fut, (SMErrorResponse) resp); + } else { + leader = retryContext.targetPeer(); // The OK response was received from a leader. + + fut.complete((R) resp); + } + } catch (Throwable e) { + fut.completeExceptionally(e); Review Comment: What will happen if there's only only peer in the list and it's shutting down (returns ENODESHUTDOWN)? Are we going to retry the request? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org