This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6f992bdbdba [fix][client] Retry for unknown exceptions when creating a
producer or consumer (#24599)
6f992bdbdba is described below
commit 6f992bdbdba5df36f75d0b13c0bbffb3e80aca33
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Aug 5 11:16:49 2025 +0800
[fix][client] Retry for unknown exceptions when creating a producer or
consumer (#24599)
---
.../client/impl/SimpleProduceConsumeIoTest.java | 232 ++++++++++++++-------
.../apache/pulsar/client/impl/ConnectionPool.java | 2 +-
.../apache/pulsar/client/impl/ConsumerImpl.java | 10 +-
.../apache/pulsar/client/impl/ProducerImpl.java | 3 +-
.../pulsar/common/protocol/PulsarHandler.java | 4 +
5 files changed, 168 insertions(+), 83 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.java
index 4da3ce25733..faaef7336c7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.java
@@ -18,30 +18,42 @@
*/
package org.apache.pulsar.client.impl;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.awaitility.Awaitility;
-import org.awaitility.reflect.WhiteboxImpl;
+import org.apache.pulsar.client.util.ExecutorProvider;
import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Slf4j
public class SimpleProduceConsumeIoTest extends ProducerConsumerBase {
+ private ExecutorService executor;
+ private String topic;
private PulsarClientImpl singleConnectionPerBrokerClient;
@BeforeClass(alwaysRun = true)
@@ -49,86 +61,154 @@ public class SimpleProduceConsumeIoTest extends
ProducerConsumerBase {
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
- singleConnectionPerBrokerClient = (PulsarClientImpl)
PulsarClient.builder().connectionsPerBroker(1)
- .serviceUrl(lookupUrl.toString()).build();
}
@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
+ executor.shutdown();
+ super.internalCleanup();
+ }
+
+ @BeforeMethod
+ public void setupTopic() throws Exception {
+ executor = Executors.newSingleThreadExecutor();
+ singleConnectionPerBrokerClient = (PulsarClientImpl)
PulsarClient.builder().connectionsPerBroker(1)
+ .serviceUrl(lookupUrl.toString()).build();
+ topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ admin.topics().createNonPartitionedTopic(topic);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void afterMethod() throws Exception {
if (singleConnectionPerBrokerClient != null) {
singleConnectionPerBrokerClient.close();
}
- super.internalCleanup();
+ executor.shutdown();
}
- /**
- * 1. Create a producer with a pooled connection.
- * 2. When executing "producer.connectionOpened", the pooled connection
has been closed due to a network issue.
- * 3. Verify: the producer can be created successfully.
- */
@Test
- public void testUnstableNetWorkWhenCreatingProducer() throws Exception {
- final String topic =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
- admin.topics().createNonPartitionedTopic(topic);
- // Trigger a pooled connection creation.
- ProducerImpl p = (ProducerImpl)
singleConnectionPerBrokerClient.newProducer().topic(topic).create();
- ClientCnx cnx = p.getClientCnx();
- p.close();
-
- // 1. Create a new producer with the pooled connection(since there is
a pooled connection, the new producer
- // will reuse it).
- // 2. Trigger a network issue.
- CountDownLatch countDownLatch = new CountDownLatch(1);
- // A task for trigger network issue.
- new Thread(() -> {
- try {
- countDownLatch.await();
- cnx.ctx().close();
- } catch (Exception ex) {
+ public void testUnstableNetWorkForProducer() throws Exception {
+ final var producer = (ProducerImpl<byte[]>)
singleConnectionPerBrokerClient.newProducer().topic(topic).create();
+ final var producerNetwork = new Network(producer.getClientCnx());
+ producer.close();
+ producerNetwork.close();
+
+ final var newProducer = executor.submit(() -> createProducer(__ ->
producerNetwork.waitForClose())).get().get();
+ assertEquals(newProducer.getState().toString(), "Ready");
+ }
+
+ @Test
+ public void testUnstableNetWorkForConsumer() throws Exception {
+ final var consumer = (ConsumerImpl<byte[]>)
singleConnectionPerBrokerClient.newConsumer().topic(topic)
+ .subscriptionName("sub").subscribe();
+ final var consumerNetwork = new Network(consumer.getClientCnx());
+ consumer.close();
+ consumerNetwork.close();
+
+ final var newConsumer = executor.submit(() -> subscribe(cnx ->
consumerNetwork.waitForClose())).get().get();
+ assertEquals(newConsumer.getState().toString(), "Ready");
+ }
+
+ @Test
+ public void testUnknownRpcExceptionForProducer() throws Exception {
+ final var firstTime = new AtomicBoolean(true);
+ final var producer = createProducer(cnx -> {
+ if (firstTime.compareAndSet(true, false)) {
+ setFailedContext(cnx);
+ }
+ }).get(3, TimeUnit.SECONDS);
+ assertEquals(producer.getState().toString(), "Ready");
+ }
+
+ @Test
+ public void testUnknownRpcExceptionForConsumer() throws Exception {
+ final var firstTime = new AtomicBoolean(true);
+ final var consumer = subscribe(cnx -> {
+ if (firstTime.compareAndSet(true, false)) {
+ setFailedContext(cnx);
+ }
+ }).get(3, TimeUnit.SECONDS);
+ assertEquals(consumer.getState().toString(), "Ready");
+ }
+
+ private CompletableFuture<ProducerImpl<byte[]>> createProducer(
+ java.util.function.Consumer<ClientCnx> cnxInterceptor) {
+ final var producerConf = ((ProducerBuilderImpl<byte[]>)
singleConnectionPerBrokerClient.newProducer()
+ .topic(topic)).getConf();
+ final var future = new CompletableFuture<Producer<byte[]>>();
+ new ProducerImpl<>(singleConnectionPerBrokerClient, topic,
producerConf, future,
+ -1, Schema.BYTES, null, Optional.empty()) {
+ @Override
+ public CompletableFuture<Void> connectionOpened(ClientCnx cnx) {
+ cnxInterceptor.accept(cnx);
+ return super.connectionOpened(cnx);
}
- }).start();
- // Create a new producer with the pooled connection.
- AtomicReference<CompletableFuture<Producer<byte[]>>> p2FutureWrap =
new AtomicReference<>();
- new Thread(() -> {
- ProducerBuilder producerBuilder =
singleConnectionPerBrokerClient.newProducer().topic(topic);
- ProducerConfigurationData producerConf =
WhiteboxImpl.getInternalState(producerBuilder, "conf");
- CompletableFuture<Producer<byte[]>> p2Future = new
CompletableFuture();
- p2FutureWrap.set(p2Future);
- new ProducerImpl<>(singleConnectionPerBrokerClient,
"public/default/tp1", producerConf, p2Future,
- -1, Schema.BYTES, null, Optional.empty()) {
- @Override
- public CompletableFuture<Void> connectionOpened(final
ClientCnx cnx) {
- // Mock a network issue, and wait for the issue occurred.
- countDownLatch.countDown();
- try {
- Thread.sleep(1500);
- } catch (InterruptedException e) {
- }
- // Call the real implementation.
- return super.connectionOpened(cnx);
+ };
+ return future.thenApply(__ -> (ProducerImpl<byte[]>) __);
+ }
+
+ private CompletableFuture<ConsumerImpl<byte[]>>
subscribe(java.util.function.Consumer<ClientCnx> cnxInterceptor) {
+ final var consumerConf = ((ConsumerBuilderImpl<byte[]>)
singleConnectionPerBrokerClient.newConsumer()
+ .topic(topic).subscriptionName("sub")).getConf();
+ final var future = new CompletableFuture<Consumer<byte[]>>();
+ new ConsumerImpl<>(singleConnectionPerBrokerClient, topic,
consumerConf,
+ new ExecutorProvider(1, "internal"), -1, true, false, future,
null, 3600, Schema.BYTES,
+ new ConsumerInterceptors<>(List.of()), true) {
+
+ @Override
+ public CompletableFuture<Void> connectionOpened(ClientCnx cnx) {
+ cnxInterceptor.accept(cnx);
+ return super.connectionOpened(cnx);
+ }
+ };
+ return future.thenApply(__ -> (ConsumerImpl<byte[]>) __);
+ }
+
+ private void setFailedContext(ClientCnx cnx) {
+ final var oldCtx = cnx.ctx();
+ final var newCtx = spy(oldCtx);
+ doAnswer(invocationOnMock -> {
+ final var failedFuture = mock(ChannelFuture.class);
+ doAnswer(invocation -> {
+ @SuppressWarnings("unchecked")
+ final var listener = (GenericFutureListener<ChannelFuture>)
invocation.getArgument(0);
+ final var future = mock(ChannelFuture.class);
+ when(future.isSuccess()).thenReturn(false);
+ when(future.cause()).thenReturn(new RuntimeException("network
exception"));
+ listener.operationComplete(future);
+ return future;
+ }).when(failedFuture).addListener(any());
+ // Set back the original context because reconnection will still
get the same `ClientCnx` from the pool
+ cnx.setCtx(oldCtx);
+ return failedFuture;
+ }).when(newCtx).writeAndFlush(any());
+
+ cnx.setCtx(newCtx);
+ }
+
+ @RequiredArgsConstructor
+ private static class Network {
+
+ private final ClientCnx cnx;
+ private final CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ public void close() {
+ new Thread(() -> {
+ try {
+ countDownLatch.await();
+ cnx.ctx().close();
+ } catch (Exception ignored) {
}
- };
- }).start();
-
- // Verify: the producer can be created successfully.
- Awaitility.await().untilAsserted(() -> {
- assertNotNull(p2FutureWrap.get());
- assertTrue(p2FutureWrap.get().isDone());
- });
- // Print log.
- p2FutureWrap.get().exceptionally(ex -> {
- log.error("Failed to create producer", ex);
- return null;
- });
- Awaitility.await().untilAsserted(() -> {
- assertFalse(p2FutureWrap.get().isCompletedExceptionally());
- assertTrue("Ready".equals(
- WhiteboxImpl.getInternalState(p2FutureWrap.get().join(),
"state").toString()));
- });
-
- // Cleanup.
- p2FutureWrap.get().join().close();
- admin.topics().delete(topic);
+ }).start();
+ }
+
+ public void waitForClose() {
+ countDownLatch.countDown();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignored) {
+ }
+ }
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index d86f9ef1ea0..d002fba6e6d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -289,7 +289,7 @@ public class ConnectionPool implements AutoCloseable {
}
// Try use exists connection.
if (clientCnx.getIdleState().tryMarkUsingAndClearIdleTime()) {
- return CompletableFuture.supplyAsync(() -> clientCnx,
clientCnx.ctx().executor());
+ return CompletableFuture.completedFuture(clientCnx);
} else {
// If connection already release, create a new one.
pool.remove(key, completableFuture);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 85d4dabf8e0..b9fe9fba810 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -974,12 +974,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
cnx.sendRequestWithId(cmd, closeRequestId);
}
+ final boolean retriable =
PulsarClientException.isRetriableError(e.getCause());
+ final boolean unrecoverable =
isUnrecoverableError(e.getCause());
if (e.getCause() instanceof PulsarClientException
- && PulsarClientException.isRetriableError(e.getCause())
- && !isUnrecoverableError(e.getCause())
+ && retriable
+ && !unrecoverable
&& System.currentTimeMillis() <
SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
future.completeExceptionally(e.getCause());
- } else if (!subscribeFuture.isDone()) {
+ } else if (!subscribeFuture.isDone() && !retriable) {
// unable to create new consumer, fail operation
setState(State.Failed);
final Throwable throwable = PulsarClientException.wrap(e,
String.format("Failed to subscribe the "
@@ -990,7 +992,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
closeConsumerTasks();
subscribeFuture.completeExceptionally(throwable);
client.cleanupConsumer(this);
- } else if (isUnrecoverableError(e.getCause())) {
+ } else if (unrecoverable) {
closeWhenReceivedUnrecoverableError(e.getCause(), cnx);
} else {
// consumer was subscribed and connected but we got some
error, keep trying
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index e173dfb8668..a6987ca11f8 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -2037,8 +2037,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
closeProducerTasks();
client.cleanupProducer(this);
} else if (producerCreatedFuture.isDone() || (
- cause instanceof PulsarClientException
- && PulsarClientException.isRetriableError(cause)
+ PulsarClientException.isRetriableError(cause)
&& System.currentTimeMillis() <
PRODUCER_DEADLINE_UPDATER.get(ProducerImpl.this)
)) {
// Either we had already created the producer once
(producerCreatedFuture.isDone()) or we are
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
index d5c741be01e..020b753086f 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
@@ -19,11 +19,13 @@
package org.apache.pulsar.common.protocol;
import static
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.ScheduledFuture;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
+import lombok.Setter;
import org.apache.pulsar.common.api.proto.CommandPing;
import org.apache.pulsar.common.api.proto.CommandPong;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
@@ -37,6 +39,8 @@ import org.slf4j.LoggerFactory;
* parameter instance lifecycle.
*/
public abstract class PulsarHandler extends PulsarDecoder {
+ @VisibleForTesting
+ @Setter
protected ChannelHandlerContext ctx;
protected SocketAddress remoteAddress;
private int remoteEndpointProtocolVersion = ProtocolVersion.v0.getValue();