timoninmaxim commented on code in PR #11954:
URL: https://github.com/apache/ignite/pull/11954#discussion_r2084677980


##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java:
##########
@@ -243,65 +243,105 @@ private <T> void handleServiceAsync(
         List<ClientConnectionException> failures
     ) {
         try {
-            applyOnDefaultChannel(
-                channel -> applyOnClientChannelAsync(fut, channel, op, 
payloadWriter, payloadReader, failures),
-                null,
-                failures
-            );
+            ClientChannel ch = applyOnDefaultChannel(Function.identity(), 
null, failures);
+
+            applyOnClientChannelAsync(fut, ch, op, payloadWriter, 
payloadReader, failures);
         }
         catch (Throwable ex) {
             fut.completeExceptionally(ex);
         }
     }
 
-    /** */
-    private <T> Object applyOnClientChannelAsync(
+    /**
+     * Retries an async operation on the same channel if it fails with a 
connection exception
+     * then falls back to other channels if retry fails. Aggregates failures 
and completes the original future.
+     */
+    private <T> void applyOnClientChannelAsync(
         final CompletableFuture<T> fut,
         ClientChannel ch,
         ClientOperation op,
         Consumer<PayloadOutputChannel> payloadWriter,
         Function<PayloadInputChannel, T> payloadReader,
         List<ClientConnectionException> failures
     ) {
-        return ch
-            .serviceAsync(op, payloadWriter, payloadReader)
-            .handle((res, err) -> {
+        ch.serviceAsync(op, payloadWriter, payloadReader)
+            .whenComplete((res, err) -> {
+                if (fut.isDone())

Review Comment:
   In which case the future is done here?



##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java:
##########
@@ -243,65 +243,105 @@ private <T> void handleServiceAsync(
         List<ClientConnectionException> failures
     ) {
         try {
-            applyOnDefaultChannel(
-                channel -> applyOnClientChannelAsync(fut, channel, op, 
payloadWriter, payloadReader, failures),
-                null,
-                failures
-            );
+            ClientChannel ch = applyOnDefaultChannel(Function.identity(), 
null, failures);
+
+            applyOnClientChannelAsync(fut, ch, op, payloadWriter, 
payloadReader, failures);
         }
         catch (Throwable ex) {
             fut.completeExceptionally(ex);
         }
     }
 
-    /** */
-    private <T> Object applyOnClientChannelAsync(
+    /**
+     * Retries an async operation on the same channel if it fails with a 
connection exception
+     * then falls back to other channels if retry fails. Aggregates failures 
and completes the original future.
+     */
+    private <T> void applyOnClientChannelAsync(
         final CompletableFuture<T> fut,
         ClientChannel ch,
         ClientOperation op,
         Consumer<PayloadOutputChannel> payloadWriter,
         Function<PayloadInputChannel, T> payloadReader,
         List<ClientConnectionException> failures
     ) {
-        return ch
-            .serviceAsync(op, payloadWriter, payloadReader)

Review Comment:
   useless change



##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java:
##########
@@ -243,65 +243,105 @@ private <T> void handleServiceAsync(
         List<ClientConnectionException> failures
     ) {
         try {
-            applyOnDefaultChannel(
-                channel -> applyOnClientChannelAsync(fut, channel, op, 
payloadWriter, payloadReader, failures),
-                null,
-                failures
-            );
+            ClientChannel ch = applyOnDefaultChannel(Function.identity(), 
null, failures);
+
+            applyOnClientChannelAsync(fut, ch, op, payloadWriter, 
payloadReader, failures);
         }
         catch (Throwable ex) {
             fut.completeExceptionally(ex);
         }
     }
 
-    /** */
-    private <T> Object applyOnClientChannelAsync(
+    /**
+     * Retries an async operation on the same channel if it fails with a 
connection exception
+     * then falls back to other channels if retry fails. Aggregates failures 
and completes the original future.
+     */
+    private <T> void applyOnClientChannelAsync(
         final CompletableFuture<T> fut,
         ClientChannel ch,
         ClientOperation op,
         Consumer<PayloadOutputChannel> payloadWriter,
         Function<PayloadInputChannel, T> payloadReader,
         List<ClientConnectionException> failures
     ) {
-        return ch
-            .serviceAsync(op, payloadWriter, payloadReader)
-            .handle((res, err) -> {
+        ch.serviceAsync(op, payloadWriter, payloadReader)
+            .whenComplete((res, err) -> {
+                if (fut.isDone())
+                    return;
+
                 if (err == null) {
                     fut.complete(res);
 
-                    return null;
+                    return;
                 }
 
-                if (err instanceof ClientConnectionException) {
-                    ClientConnectionException failure0 = 
(ClientConnectionException)err;
+                if (!(err instanceof ClientConnectionException)) {
+                    fut.completeExceptionally(err instanceof ClientException ? 
err : new ClientException(err));
 
-                    failures.add(failure0);
+                    return;
+                }
 
-                    try {
-                        // Will try to reinit channels if topology changed.
-                        onChannelFailure(ch, err, failures);
-                    }
-                    catch (Throwable ex) {
-                        fut.completeExceptionally(ex);
+                // Retry use same channel in case of connection exception.
+                ClientConnectionException failure0 = 
(ClientConnectionException) err;
+
+                UUID nodeId = ch.serverNodeId();
+
+                ClientChannelHolder hld = (nodeId != null) ? 
nodeChannels.get(nodeId) : null;
+
+                try {
+                    onChannelFailure(ch, err, failures);
 
-                        return null;
+                    if (hld == null) {
+                        failures.add(failure0);
+
+                        tryOtherChannels(fut, op, payloadWriter, 
payloadReader, failures, failure0);
+
+                        return;
                     }
 
-                    if (failures.size() < srvcChannelsLimit && shouldRetry(op, 
failures.size() - 1, failure0)) {
-                        handleServiceAsync(fut, op, payloadWriter, 
payloadReader, failures);
+                    if (shouldRetry(op, failures.size() - 1, failure0)) {
+                        ClientChannel newCh = hld.getOrCreateChannel();
 
-                        return null;
+                        applyOnClientChannelAsync(fut, newCh, op, 
payloadWriter, payloadReader, failures);

Review Comment:
   It looks like you will fall in a recursion here.



##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java:
##########
@@ -243,65 +243,105 @@ private <T> void handleServiceAsync(
         List<ClientConnectionException> failures
     ) {
         try {
-            applyOnDefaultChannel(
-                channel -> applyOnClientChannelAsync(fut, channel, op, 
payloadWriter, payloadReader, failures),
-                null,
-                failures
-            );
+            ClientChannel ch = applyOnDefaultChannel(Function.identity(), 
null, failures);
+
+            applyOnClientChannelAsync(fut, ch, op, payloadWriter, 
payloadReader, failures);
         }
         catch (Throwable ex) {
             fut.completeExceptionally(ex);
         }
     }
 
-    /** */
-    private <T> Object applyOnClientChannelAsync(
+    /**
+     * Retries an async operation on the same channel if it fails with a 
connection exception
+     * then falls back to other channels if retry fails. Aggregates failures 
and completes the original future.
+     */
+    private <T> void applyOnClientChannelAsync(
         final CompletableFuture<T> fut,
         ClientChannel ch,
         ClientOperation op,
         Consumer<PayloadOutputChannel> payloadWriter,
         Function<PayloadInputChannel, T> payloadReader,
         List<ClientConnectionException> failures
     ) {
-        return ch
-            .serviceAsync(op, payloadWriter, payloadReader)
-            .handle((res, err) -> {
+        ch.serviceAsync(op, payloadWriter, payloadReader)
+            .whenComplete((res, err) -> {
+                if (fut.isDone())
+                    return;
+
                 if (err == null) {
                     fut.complete(res);
 
-                    return null;
+                    return;
                 }
 
-                if (err instanceof ClientConnectionException) {
-                    ClientConnectionException failure0 = 
(ClientConnectionException)err;
+                if (!(err instanceof ClientConnectionException)) {
+                    fut.completeExceptionally(err instanceof ClientException ? 
err : new ClientException(err));
 
-                    failures.add(failure0);
+                    return;
+                }
 
-                    try {
-                        // Will try to reinit channels if topology changed.
-                        onChannelFailure(ch, err, failures);
-                    }
-                    catch (Throwable ex) {
-                        fut.completeExceptionally(ex);
+                // Retry use same channel in case of connection exception.
+                ClientConnectionException failure0 = 
(ClientConnectionException) err;
+
+                UUID nodeId = ch.serverNodeId();
+
+                ClientChannelHolder hld = (nodeId != null) ? 
nodeChannels.get(nodeId) : null;
+
+                try {
+                    onChannelFailure(ch, err, failures);
 
-                        return null;
+                    if (hld == null) {
+                        failures.add(failure0);
+
+                        tryOtherChannels(fut, op, payloadWriter, 
payloadReader, failures, failure0);
+
+                        return;
                     }
 
-                    if (failures.size() < srvcChannelsLimit && shouldRetry(op, 
failures.size() - 1, failure0)) {
-                        handleServiceAsync(fut, op, payloadWriter, 
payloadReader, failures);
+                    if (shouldRetry(op, failures.size() - 1, failure0)) {
+                        ClientChannel newCh = hld.getOrCreateChannel();
 
-                        return null;
+                        applyOnClientChannelAsync(fut, newCh, op, 
payloadWriter, payloadReader, failures);
+
+                        return;
                     }
 
-                    fut.completeExceptionally(composeException(failures));
+                    failures.add(failure0);
+
+                    tryOtherChannels(fut, op, payloadWriter, payloadReader, 
failures, failure0);
                 }
-                else
-                    fut.completeExceptionally(err instanceof ClientException ? 
err : new ClientException(err));
+                catch (ClientConnectionException reconnectEx) {
+                    onChannelFailure(hld, null, reconnectEx, failures);
 
-                return null;
+                    tryOtherChannels(fut, op, payloadWriter, payloadReader, 
failures, reconnectEx);
+                }
+                catch (Throwable ex) {
+                    fut.completeExceptionally(ex);
+                }
             });
     }
 
+    /**
+     * Attempts on other channels if the limit is not reached.
+     * */
+    private <T> void tryOtherChannels(
+        final CompletableFuture<T> fut,
+        ClientOperation op,
+        Consumer<PayloadOutputChannel> payloadWriter,
+        Function<PayloadInputChannel, T> payloadReader,
+        List<ClientConnectionException> failures,
+        ClientConnectionException failure0
+    ) {
+        if (fut.isDone() || failure0 == null)
+            return;

Review Comment:
   How the fut will be completed in this case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to