timoninmaxim commented on code in PR #11954:
URL: https://github.com/apache/ignite/pull/11954#discussion_r2109679938


##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java:
##########
@@ -243,63 +243,120 @@ private <T> void handleServiceAsync(
         List<ClientConnectionException> failures
     ) {
         try {
-            applyOnDefaultChannel(
-                channel -> applyOnClientChannelAsync(fut, channel, op, 
payloadWriter, payloadReader, failures),
-                null,
-                failures
-            );
+            ClientChannel ch = applyOnDefaultChannel(Function.identity(), 
null, failures);
+
+            applyOnClientChannelAsync(fut, ch, op, payloadWriter, 
payloadReader, failures);
         }
         catch (Throwable ex) {
             fut.completeExceptionally(ex);
         }
     }
 
-    /** */
-    private <T> Object applyOnClientChannelAsync(
+    /**
+     * Retries an async operation on the same channel if it fails with a 
connection exception
+     * then falls back to other channels if retry fails. Aggregates failures 
and completes the original future.
+     */
+    private <T> void applyOnClientChannelAsync(
         final CompletableFuture<T> fut,
         ClientChannel ch,
         ClientOperation op,
         Consumer<PayloadOutputChannel> payloadWriter,
         Function<PayloadInputChannel, T> payloadReader,
         List<ClientConnectionException> failures
     ) {
-        return ch
-            .serviceAsync(op, payloadWriter, payloadReader)
-            .handle((res, err) -> {
-                if (err == null) {
-                    fut.complete(res);
+        ch.serviceAsync(op, payloadWriter, payloadReader).handle((res, err) -> 
{
+            if (err == null) {
+                fut.complete(res);
 
-                    return null;
-                }
+                return null;
+            }
 
-                if (err instanceof ClientConnectionException) {
-                    ClientConnectionException failure0 = 
(ClientConnectionException)err;
+            // Retry use same channel in case of connection exception.
+            if (err instanceof ClientConnectionException) {
+                ClientConnectionException failure0 = 
(ClientConnectionException)err;
 
-                    failures.add(failure0);
+                UUID nodeId = ch.serverNodeId();
 
-                    try {
-                        // Will try to reinit channels if topology changed.
-                        onChannelFailure(ch, err, failures);
-                    }
-                    catch (Throwable ex) {
-                        fut.completeExceptionally(ex);
+                ClientChannelHolder hld = (nodeId != null) ? 
nodeChannels.get(nodeId) : null;
+
+                try {
+                    // Will try to reinit channels if topology changed.
+                    onChannelFailure(ch, err, failures);
+
+                    if (hld == null) {
+                        failures.add(failure0);
 
-                        return null;
+                        throw failure0;
                     }
 
-                    if (failures.size() < srvcChannelsLimit && shouldRetry(op, 
failures.size() - 1, failure0)) {
-                        handleServiceAsync(fut, op, payloadWriter, 
payloadReader, failures);
+                    if (shouldRetry(op, failures.size() - 1, failure0)) {
+                        try {
+                            hld.getOrCreateChannel().serviceAsync(op, 
payloadWriter, payloadReader)
+                                .handle((retryRes, retryErr) -> {
+                                    if (retryErr == null)
+                                        fut.complete(retryRes);
+                                    else
+                                        fallbackToOtherChannels(fut, op, 
payloadWriter, payloadReader, failures);
+
+                                    return null;
+                                });
+                        }
+                        catch (ClientConnectionException retryEx) {
+                            failures.add(retryEx);
 
-                        return null;
+                            throw retryEx;
+                        }
                     }
+                    else {
+                        failures.add(failure0);
 
-                    fut.completeExceptionally(composeException(failures));
+                        fut.completeExceptionally(composeException(failures));
+                    }
                 }
-                else
-                    fut.completeExceptionally(err instanceof ClientException ? 
err : new ClientException(err));
+                catch (ClientConnectionException reconnectEx) {
+                    onChannelFailure(hld, ch, reconnectEx, failures);
 
-                return null;
-            });
+                    fallbackToOtherChannels(fut, op, payloadWriter, 
payloadReader, failures);
+                }
+                catch (Throwable ex) {
+                    fut.completeExceptionally(ex);
+                }
+            }
+            else
+                fut.completeExceptionally(err instanceof ClientException ? err 
: new ClientException(err));
+
+            return null;
+        });
+    }
+
+    /**
+     * Handle reconnection attempt to another channels after failure on client 
channel twice.
+     */
+    private <T> void fallbackToOtherChannels(
+        CompletableFuture<T> fut,
+        ClientOperation op,
+        Consumer<PayloadOutputChannel> payloadWriter,
+        Function<PayloadInputChannel, T> payloadReader,
+        List<ClientConnectionException> failures
+    ) {
+        ClientConnectionException err = failures.get(failures.size() - 1);

Review Comment:
   not fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to