This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8db9b017c8f80dc61511d90a499c243e94bbcf8f Author: Yunze Xu <[email protected]> AuthorDate: Tue Aug 5 11:16:49 2025 +0800 [fix][client] Retry for unknown exceptions when creating a producer or consumer (#24599) (cherry picked from commit 6f992bdbdba5df36f75d0b13c0bbffb3e80aca33) --- .../client/impl/SimpleProduceConsumeIoTest.java | 232 ++++++++++++++------- .../apache/pulsar/client/impl/ConnectionPool.java | 2 +- .../apache/pulsar/client/impl/ConsumerImpl.java | 10 +- .../apache/pulsar/client/impl/ProducerImpl.java | 3 +- .../pulsar/common/protocol/PulsarHandler.java | 4 + 5 files changed, 168 insertions(+), 83 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.java index 4da3ce25733..faaef7336c7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.java @@ -18,30 +18,42 @@ */ package org.apache.pulsar.client.impl; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import io.netty.channel.ChannelFuture; +import io.netty.util.concurrent.GenericFutureListener; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; -import org.awaitility.Awaitility; -import org.awaitility.reflect.WhiteboxImpl; +import org.apache.pulsar.client.util.ExecutorProvider; import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Slf4j public class SimpleProduceConsumeIoTest extends ProducerConsumerBase { + private ExecutorService executor; + private String topic; private PulsarClientImpl singleConnectionPerBrokerClient; @BeforeClass(alwaysRun = true) @@ -49,86 +61,154 @@ public class SimpleProduceConsumeIoTest extends ProducerConsumerBase { protected void setup() throws Exception { super.internalSetup(); super.producerBaseSetup(); - singleConnectionPerBrokerClient = (PulsarClientImpl) PulsarClient.builder().connectionsPerBroker(1) - .serviceUrl(lookupUrl.toString()).build(); } @AfterClass(alwaysRun = true) @Override protected void cleanup() throws Exception { + executor.shutdown(); + super.internalCleanup(); + } + + @BeforeMethod + public void setupTopic() throws Exception { + executor = Executors.newSingleThreadExecutor(); + singleConnectionPerBrokerClient = (PulsarClientImpl) PulsarClient.builder().connectionsPerBroker(1) + .serviceUrl(lookupUrl.toString()).build(); + topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + admin.topics().createNonPartitionedTopic(topic); + } + + @AfterMethod(alwaysRun = true) + public void afterMethod() throws Exception { if (singleConnectionPerBrokerClient != null) { singleConnectionPerBrokerClient.close(); } - super.internalCleanup(); + executor.shutdown(); } - /** - * 1. Create a producer with a pooled connection. - * 2. When executing "producer.connectionOpened", the pooled connection has been closed due to a network issue. - * 3. Verify: the producer can be created successfully. - */ @Test - public void testUnstableNetWorkWhenCreatingProducer() throws Exception { - final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); - admin.topics().createNonPartitionedTopic(topic); - // Trigger a pooled connection creation. - ProducerImpl p = (ProducerImpl) singleConnectionPerBrokerClient.newProducer().topic(topic).create(); - ClientCnx cnx = p.getClientCnx(); - p.close(); - - // 1. Create a new producer with the pooled connection(since there is a pooled connection, the new producer - // will reuse it). - // 2. Trigger a network issue. - CountDownLatch countDownLatch = new CountDownLatch(1); - // A task for trigger network issue. - new Thread(() -> { - try { - countDownLatch.await(); - cnx.ctx().close(); - } catch (Exception ex) { + public void testUnstableNetWorkForProducer() throws Exception { + final var producer = (ProducerImpl<byte[]>) singleConnectionPerBrokerClient.newProducer().topic(topic).create(); + final var producerNetwork = new Network(producer.getClientCnx()); + producer.close(); + producerNetwork.close(); + + final var newProducer = executor.submit(() -> createProducer(__ -> producerNetwork.waitForClose())).get().get(); + assertEquals(newProducer.getState().toString(), "Ready"); + } + + @Test + public void testUnstableNetWorkForConsumer() throws Exception { + final var consumer = (ConsumerImpl<byte[]>) singleConnectionPerBrokerClient.newConsumer().topic(topic) + .subscriptionName("sub").subscribe(); + final var consumerNetwork = new Network(consumer.getClientCnx()); + consumer.close(); + consumerNetwork.close(); + + final var newConsumer = executor.submit(() -> subscribe(cnx -> consumerNetwork.waitForClose())).get().get(); + assertEquals(newConsumer.getState().toString(), "Ready"); + } + + @Test + public void testUnknownRpcExceptionForProducer() throws Exception { + final var firstTime = new AtomicBoolean(true); + final var producer = createProducer(cnx -> { + if (firstTime.compareAndSet(true, false)) { + setFailedContext(cnx); + } + }).get(3, TimeUnit.SECONDS); + assertEquals(producer.getState().toString(), "Ready"); + } + + @Test + public void testUnknownRpcExceptionForConsumer() throws Exception { + final var firstTime = new AtomicBoolean(true); + final var consumer = subscribe(cnx -> { + if (firstTime.compareAndSet(true, false)) { + setFailedContext(cnx); + } + }).get(3, TimeUnit.SECONDS); + assertEquals(consumer.getState().toString(), "Ready"); + } + + private CompletableFuture<ProducerImpl<byte[]>> createProducer( + java.util.function.Consumer<ClientCnx> cnxInterceptor) { + final var producerConf = ((ProducerBuilderImpl<byte[]>) singleConnectionPerBrokerClient.newProducer() + .topic(topic)).getConf(); + final var future = new CompletableFuture<Producer<byte[]>>(); + new ProducerImpl<>(singleConnectionPerBrokerClient, topic, producerConf, future, + -1, Schema.BYTES, null, Optional.empty()) { + @Override + public CompletableFuture<Void> connectionOpened(ClientCnx cnx) { + cnxInterceptor.accept(cnx); + return super.connectionOpened(cnx); } - }).start(); - // Create a new producer with the pooled connection. - AtomicReference<CompletableFuture<Producer<byte[]>>> p2FutureWrap = new AtomicReference<>(); - new Thread(() -> { - ProducerBuilder producerBuilder = singleConnectionPerBrokerClient.newProducer().topic(topic); - ProducerConfigurationData producerConf = WhiteboxImpl.getInternalState(producerBuilder, "conf"); - CompletableFuture<Producer<byte[]>> p2Future = new CompletableFuture(); - p2FutureWrap.set(p2Future); - new ProducerImpl<>(singleConnectionPerBrokerClient, "public/default/tp1", producerConf, p2Future, - -1, Schema.BYTES, null, Optional.empty()) { - @Override - public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) { - // Mock a network issue, and wait for the issue occurred. - countDownLatch.countDown(); - try { - Thread.sleep(1500); - } catch (InterruptedException e) { - } - // Call the real implementation. - return super.connectionOpened(cnx); + }; + return future.thenApply(__ -> (ProducerImpl<byte[]>) __); + } + + private CompletableFuture<ConsumerImpl<byte[]>> subscribe(java.util.function.Consumer<ClientCnx> cnxInterceptor) { + final var consumerConf = ((ConsumerBuilderImpl<byte[]>) singleConnectionPerBrokerClient.newConsumer() + .topic(topic).subscriptionName("sub")).getConf(); + final var future = new CompletableFuture<Consumer<byte[]>>(); + new ConsumerImpl<>(singleConnectionPerBrokerClient, topic, consumerConf, + new ExecutorProvider(1, "internal"), -1, true, false, future, null, 3600, Schema.BYTES, + new ConsumerInterceptors<>(List.of()), true) { + + @Override + public CompletableFuture<Void> connectionOpened(ClientCnx cnx) { + cnxInterceptor.accept(cnx); + return super.connectionOpened(cnx); + } + }; + return future.thenApply(__ -> (ConsumerImpl<byte[]>) __); + } + + private void setFailedContext(ClientCnx cnx) { + final var oldCtx = cnx.ctx(); + final var newCtx = spy(oldCtx); + doAnswer(invocationOnMock -> { + final var failedFuture = mock(ChannelFuture.class); + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + final var listener = (GenericFutureListener<ChannelFuture>) invocation.getArgument(0); + final var future = mock(ChannelFuture.class); + when(future.isSuccess()).thenReturn(false); + when(future.cause()).thenReturn(new RuntimeException("network exception")); + listener.operationComplete(future); + return future; + }).when(failedFuture).addListener(any()); + // Set back the original context because reconnection will still get the same `ClientCnx` from the pool + cnx.setCtx(oldCtx); + return failedFuture; + }).when(newCtx).writeAndFlush(any()); + + cnx.setCtx(newCtx); + } + + @RequiredArgsConstructor + private static class Network { + + private final ClientCnx cnx; + private final CountDownLatch countDownLatch = new CountDownLatch(1); + + public void close() { + new Thread(() -> { + try { + countDownLatch.await(); + cnx.ctx().close(); + } catch (Exception ignored) { } - }; - }).start(); - - // Verify: the producer can be created successfully. - Awaitility.await().untilAsserted(() -> { - assertNotNull(p2FutureWrap.get()); - assertTrue(p2FutureWrap.get().isDone()); - }); - // Print log. - p2FutureWrap.get().exceptionally(ex -> { - log.error("Failed to create producer", ex); - return null; - }); - Awaitility.await().untilAsserted(() -> { - assertFalse(p2FutureWrap.get().isCompletedExceptionally()); - assertTrue("Ready".equals( - WhiteboxImpl.getInternalState(p2FutureWrap.get().join(), "state").toString())); - }); - - // Cleanup. - p2FutureWrap.get().join().close(); - admin.topics().delete(topic); + }).start(); + } + + public void waitForClose() { + countDownLatch.countDown(); + try { + Thread.sleep(1000); + } catch (InterruptedException ignored) { + } + } } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index a6c574e8fa1..d384a408452 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -270,7 +270,7 @@ public class ConnectionPool implements AutoCloseable { } // Try use exists connection. if (clientCnx.getIdleState().tryMarkUsingAndClearIdleTime()) { - return CompletableFuture.supplyAsync(() -> clientCnx, clientCnx.ctx().executor()); + return CompletableFuture.completedFuture(clientCnx); } else { // If connection already release, create a new one. pool.remove(key, completableFuture); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 9d0b1ef9ded..6b154d7a6b6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -967,12 +967,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle cnx.sendRequestWithId(cmd, closeRequestId); } + final boolean retriable = PulsarClientException.isRetriableError(e.getCause()); + final boolean unrecoverable = isUnrecoverableError(e.getCause()); if (e.getCause() instanceof PulsarClientException - && PulsarClientException.isRetriableError(e.getCause()) - && !isUnrecoverableError(e.getCause()) + && retriable + && !unrecoverable && System.currentTimeMillis() < SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) { future.completeExceptionally(e.getCause()); - } else if (!subscribeFuture.isDone()) { + } else if (!subscribeFuture.isDone() && !retriable) { // unable to create new consumer, fail operation setState(State.Failed); final Throwable throwable = PulsarClientException.wrap(e, String.format("Failed to subscribe the " @@ -983,7 +985,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle closeConsumerTasks(); subscribeFuture.completeExceptionally(throwable); client.cleanupConsumer(this); - } else if (isUnrecoverableError(e.getCause())) { + } else if (unrecoverable) { closeWhenReceivedUnrecoverableError(e.getCause(), cnx); } else { // consumer was subscribed and connected but we got some error, keep trying diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index eef10c21d2c..cf35f8a7b58 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -2020,8 +2020,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne closeProducerTasks(); client.cleanupProducer(this); } else if (producerCreatedFuture.isDone() || ( - cause instanceof PulsarClientException - && PulsarClientException.isRetriableError(cause) + PulsarClientException.isRetriableError(cause) && System.currentTimeMillis() < PRODUCER_DEADLINE_UPDATER.get(ProducerImpl.this) )) { // Either we had already created the producer once (producerCreatedFuture.isDone()) or we are diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java index d5c741be01e..020b753086f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java @@ -19,11 +19,13 @@ package org.apache.pulsar.common.protocol; import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; +import com.google.common.annotations.VisibleForTesting; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.util.concurrent.ScheduledFuture; import java.net.SocketAddress; import java.util.concurrent.TimeUnit; +import lombok.Setter; import org.apache.pulsar.common.api.proto.CommandPing; import org.apache.pulsar.common.api.proto.CommandPong; import org.apache.pulsar.common.api.proto.ProtocolVersion; @@ -37,6 +39,8 @@ import org.slf4j.LoggerFactory; * parameter instance lifecycle. */ public abstract class PulsarHandler extends PulsarDecoder { + @VisibleForTesting + @Setter protected ChannelHandlerContext ctx; protected SocketAddress remoteAddress; private int remoteEndpointProtocolVersion = ProtocolVersion.v0.getValue();
