This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f992bdbdba [fix][client] Retry for unknown exceptions when creating a 
producer or consumer (#24599)
6f992bdbdba is described below

commit 6f992bdbdba5df36f75d0b13c0bbffb3e80aca33
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Aug 5 11:16:49 2025 +0800

    [fix][client] Retry for unknown exceptions when creating a producer or 
consumer (#24599)
---
 .../client/impl/SimpleProduceConsumeIoTest.java    | 232 ++++++++++++++-------
 .../apache/pulsar/client/impl/ConnectionPool.java  |   2 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  10 +-
 .../apache/pulsar/client/impl/ProducerImpl.java    |   3 +-
 .../pulsar/common/protocol/PulsarHandler.java      |   4 +
 5 files changed, 168 insertions(+), 83 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.java
index 4da3ce25733..faaef7336c7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.java
@@ -18,30 +18,42 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.awaitility.Awaitility;
-import org.awaitility.reflect.WhiteboxImpl;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Slf4j
 public class SimpleProduceConsumeIoTest extends ProducerConsumerBase {
 
+    private ExecutorService executor;
+    private String topic;
     private PulsarClientImpl singleConnectionPerBrokerClient;
 
     @BeforeClass(alwaysRun = true)
@@ -49,86 +61,154 @@ public class SimpleProduceConsumeIoTest extends 
ProducerConsumerBase {
     protected void setup() throws Exception {
         super.internalSetup();
         super.producerBaseSetup();
-        singleConnectionPerBrokerClient = (PulsarClientImpl) 
PulsarClient.builder().connectionsPerBroker(1)
-                .serviceUrl(lookupUrl.toString()).build();
     }
 
     @AfterClass(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
+        executor.shutdown();
+        super.internalCleanup();
+    }
+
+    @BeforeMethod
+    public void setupTopic() throws Exception {
+        executor = Executors.newSingleThreadExecutor();
+        singleConnectionPerBrokerClient = (PulsarClientImpl) 
PulsarClient.builder().connectionsPerBroker(1)
+                .serviceUrl(lookupUrl.toString()).build();
+        topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(topic);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void afterMethod() throws Exception {
         if (singleConnectionPerBrokerClient != null) {
             singleConnectionPerBrokerClient.close();
         }
-        super.internalCleanup();
+        executor.shutdown();
     }
 
-    /**
-     * 1. Create a producer with a pooled connection.
-     * 2. When executing "producer.connectionOpened", the pooled connection 
has been closed due to a network issue.
-     * 3. Verify: the producer can be created successfully.
-     */
     @Test
-    public void testUnstableNetWorkWhenCreatingProducer() throws Exception {
-        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
-        admin.topics().createNonPartitionedTopic(topic);
-        // Trigger a pooled connection creation.
-        ProducerImpl p = (ProducerImpl) 
singleConnectionPerBrokerClient.newProducer().topic(topic).create();
-        ClientCnx cnx = p.getClientCnx();
-        p.close();
-
-        // 1. Create a new producer with the pooled connection(since there is 
a pooled connection, the new producer
-        // will reuse it).
-        // 2. Trigger a network issue.
-        CountDownLatch countDownLatch = new CountDownLatch(1);
-        // A task for trigger network issue.
-        new Thread(() -> {
-            try {
-                countDownLatch.await();
-                cnx.ctx().close();
-            } catch (Exception ex) {
+    public void testUnstableNetWorkForProducer() throws Exception {
+        final var producer = (ProducerImpl<byte[]>) 
singleConnectionPerBrokerClient.newProducer().topic(topic).create();
+        final var producerNetwork = new Network(producer.getClientCnx());
+        producer.close();
+        producerNetwork.close();
+
+        final var newProducer = executor.submit(() -> createProducer(__ -> 
producerNetwork.waitForClose())).get().get();
+        assertEquals(newProducer.getState().toString(), "Ready");
+    }
+
+    @Test
+    public void testUnstableNetWorkForConsumer() throws Exception {
+        final var consumer = (ConsumerImpl<byte[]>) 
singleConnectionPerBrokerClient.newConsumer().topic(topic)
+                .subscriptionName("sub").subscribe();
+        final var consumerNetwork = new Network(consumer.getClientCnx());
+        consumer.close();
+        consumerNetwork.close();
+
+        final var newConsumer = executor.submit(() -> subscribe(cnx -> 
consumerNetwork.waitForClose())).get().get();
+        assertEquals(newConsumer.getState().toString(), "Ready");
+    }
+
+    @Test
+    public void testUnknownRpcExceptionForProducer() throws Exception {
+        final var firstTime = new AtomicBoolean(true);
+        final var producer = createProducer(cnx -> {
+            if (firstTime.compareAndSet(true, false)) {
+                setFailedContext(cnx);
+            }
+        }).get(3, TimeUnit.SECONDS);
+        assertEquals(producer.getState().toString(), "Ready");
+    }
+
+    @Test
+    public void testUnknownRpcExceptionForConsumer() throws Exception {
+        final var firstTime = new AtomicBoolean(true);
+        final var consumer = subscribe(cnx -> {
+            if (firstTime.compareAndSet(true, false)) {
+                setFailedContext(cnx);
+            }
+        }).get(3, TimeUnit.SECONDS);
+        assertEquals(consumer.getState().toString(), "Ready");
+    }
+
+    private CompletableFuture<ProducerImpl<byte[]>> createProducer(
+            java.util.function.Consumer<ClientCnx> cnxInterceptor) {
+        final var producerConf = ((ProducerBuilderImpl<byte[]>) 
singleConnectionPerBrokerClient.newProducer()
+                .topic(topic)).getConf();
+        final var future = new CompletableFuture<Producer<byte[]>>();
+        new ProducerImpl<>(singleConnectionPerBrokerClient, topic, 
producerConf, future,
+                -1, Schema.BYTES, null, Optional.empty()) {
+            @Override
+            public CompletableFuture<Void> connectionOpened(ClientCnx cnx) {
+                cnxInterceptor.accept(cnx);
+                return super.connectionOpened(cnx);
             }
-        }).start();
-        // Create a new producer with the pooled connection.
-        AtomicReference<CompletableFuture<Producer<byte[]>>> p2FutureWrap = 
new AtomicReference<>();
-        new Thread(() -> {
-            ProducerBuilder producerBuilder = 
singleConnectionPerBrokerClient.newProducer().topic(topic);
-            ProducerConfigurationData producerConf = 
WhiteboxImpl.getInternalState(producerBuilder, "conf");
-            CompletableFuture<Producer<byte[]>> p2Future = new 
CompletableFuture();
-            p2FutureWrap.set(p2Future);
-            new ProducerImpl<>(singleConnectionPerBrokerClient, 
"public/default/tp1", producerConf, p2Future,
-                    -1, Schema.BYTES, null, Optional.empty()) {
-                @Override
-                public CompletableFuture<Void> connectionOpened(final 
ClientCnx cnx) {
-                    // Mock a network issue, and wait for the issue occurred.
-                    countDownLatch.countDown();
-                    try {
-                        Thread.sleep(1500);
-                    } catch (InterruptedException e) {
-                    }
-                    // Call the real implementation.
-                    return super.connectionOpened(cnx);
+        };
+        return future.thenApply(__ -> (ProducerImpl<byte[]>) __);
+    }
+
+    private CompletableFuture<ConsumerImpl<byte[]>> 
subscribe(java.util.function.Consumer<ClientCnx> cnxInterceptor) {
+        final var consumerConf = ((ConsumerBuilderImpl<byte[]>) 
singleConnectionPerBrokerClient.newConsumer()
+                .topic(topic).subscriptionName("sub")).getConf();
+        final var future = new CompletableFuture<Consumer<byte[]>>();
+        new ConsumerImpl<>(singleConnectionPerBrokerClient, topic, 
consumerConf,
+                new ExecutorProvider(1, "internal"), -1, true, false, future, 
null, 3600, Schema.BYTES,
+                new ConsumerInterceptors<>(List.of()), true) {
+
+            @Override
+            public CompletableFuture<Void> connectionOpened(ClientCnx cnx) {
+                cnxInterceptor.accept(cnx);
+                return super.connectionOpened(cnx);
+            }
+        };
+        return future.thenApply(__ -> (ConsumerImpl<byte[]>) __);
+    }
+
+    private void setFailedContext(ClientCnx cnx) {
+        final var oldCtx = cnx.ctx();
+        final var newCtx = spy(oldCtx);
+        doAnswer(invocationOnMock -> {
+            final var failedFuture = mock(ChannelFuture.class);
+            doAnswer(invocation -> {
+                @SuppressWarnings("unchecked")
+                final var listener = (GenericFutureListener<ChannelFuture>) 
invocation.getArgument(0);
+                final var future = mock(ChannelFuture.class);
+                when(future.isSuccess()).thenReturn(false);
+                when(future.cause()).thenReturn(new RuntimeException("network 
exception"));
+                listener.operationComplete(future);
+                return future;
+            }).when(failedFuture).addListener(any());
+            // Set back the original context because reconnection will still 
get the same `ClientCnx` from the pool
+            cnx.setCtx(oldCtx);
+            return failedFuture;
+        }).when(newCtx).writeAndFlush(any());
+
+        cnx.setCtx(newCtx);
+    }
+
+    @RequiredArgsConstructor
+    private static class Network {
+
+        private final ClientCnx cnx;
+        private final CountDownLatch countDownLatch = new CountDownLatch(1);
+
+        public void close() {
+            new Thread(() -> {
+                try {
+                    countDownLatch.await();
+                    cnx.ctx().close();
+                } catch (Exception ignored) {
                 }
-            };
-        }).start();
-
-        // Verify: the producer can be created successfully.
-        Awaitility.await().untilAsserted(() -> {
-            assertNotNull(p2FutureWrap.get());
-            assertTrue(p2FutureWrap.get().isDone());
-        });
-        // Print log.
-        p2FutureWrap.get().exceptionally(ex -> {
-            log.error("Failed to create producer", ex);
-            return null;
-        });
-        Awaitility.await().untilAsserted(() -> {
-            assertFalse(p2FutureWrap.get().isCompletedExceptionally());
-            assertTrue("Ready".equals(
-                    WhiteboxImpl.getInternalState(p2FutureWrap.get().join(), 
"state").toString()));
-        });
-
-        // Cleanup.
-        p2FutureWrap.get().join().close();
-        admin.topics().delete(topic);
+            }).start();
+        }
+
+        public void waitForClose() {
+            countDownLatch.countDown();
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException ignored) {
+            }
+        }
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index d86f9ef1ea0..d002fba6e6d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -289,7 +289,7 @@ public class ConnectionPool implements AutoCloseable {
             }
             // Try use exists connection.
             if (clientCnx.getIdleState().tryMarkUsingAndClearIdleTime()) {
-                return CompletableFuture.supplyAsync(() -> clientCnx, 
clientCnx.ctx().executor());
+                return CompletableFuture.completedFuture(clientCnx);
             } else {
                 // If connection already release, create a new one.
                 pool.remove(key, completableFuture);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 85d4dabf8e0..b9fe9fba810 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -974,12 +974,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                     cnx.sendRequestWithId(cmd, closeRequestId);
                 }
 
+                final boolean retriable = 
PulsarClientException.isRetriableError(e.getCause());
+                final boolean unrecoverable = 
isUnrecoverableError(e.getCause());
                 if (e.getCause() instanceof PulsarClientException
-                        && PulsarClientException.isRetriableError(e.getCause())
-                        && !isUnrecoverableError(e.getCause())
+                        && retriable
+                        && !unrecoverable
                         && System.currentTimeMillis() < 
SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
                     future.completeExceptionally(e.getCause());
-                } else if (!subscribeFuture.isDone()) {
+                } else if (!subscribeFuture.isDone() && !retriable) {
                     // unable to create new consumer, fail operation
                     setState(State.Failed);
                     final Throwable throwable = PulsarClientException.wrap(e, 
String.format("Failed to subscribe the "
@@ -990,7 +992,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                     closeConsumerTasks();
                     subscribeFuture.completeExceptionally(throwable);
                     client.cleanupConsumer(this);
-                } else if (isUnrecoverableError(e.getCause())) {
+                } else if (unrecoverable) {
                     closeWhenReceivedUnrecoverableError(e.getCause(), cnx);
                 } else {
                     // consumer was subscribed and connected but we got some 
error, keep trying
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index e173dfb8668..a6987ca11f8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -2037,8 +2037,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 closeProducerTasks();
                 client.cleanupProducer(this);
             } else if (producerCreatedFuture.isDone() || (
-                    cause instanceof PulsarClientException
-                            && PulsarClientException.isRetriableError(cause)
+                            PulsarClientException.isRetriableError(cause)
                             && System.currentTimeMillis() < 
PRODUCER_DEADLINE_UPDATER.get(ProducerImpl.this)
             )) {
                 // Either we had already created the producer once 
(producerCreatedFuture.isDone()) or we are
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
index d5c741be01e..020b753086f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
@@ -19,11 +19,13 @@
 package org.apache.pulsar.common.protocol;
 
 import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.concurrent.ScheduledFuture;
 import java.net.SocketAddress;
 import java.util.concurrent.TimeUnit;
+import lombok.Setter;
 import org.apache.pulsar.common.api.proto.CommandPing;
 import org.apache.pulsar.common.api.proto.CommandPong;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
@@ -37,6 +39,8 @@ import org.slf4j.LoggerFactory;
  * parameter instance lifecycle.
  */
 public abstract class PulsarHandler extends PulsarDecoder {
+    @VisibleForTesting
+    @Setter
     protected ChannelHandlerContext ctx;
     protected SocketAddress remoteAddress;
     private int remoteEndpointProtocolVersion = ProtocolVersion.v0.getValue();

Reply via email to