This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1b5c818a538 [fix][client] Fail messages immediately in ProducerImpl
when in terminal state (#25317)
1b5c818a538 is described below
commit 1b5c818a538ee2cbcc88a4d3b4955eeb07a7d586
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Mar 16 00:23:17 2026 -0700
[fix][client] Fail messages immediately in ProducerImpl when in terminal
state (#25317)
---
.../broker/service/TopicTerminationTest.java | 3 ++-
.../apache/pulsar/client/impl/ProducerImpl.java | 30 +++++++++++++++++++++-
2 files changed, 31 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
index e4e361eb0b1..40379420637 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
@@ -142,6 +142,7 @@ public class TopicTerminationTest extends BrokerTestBase {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .sendTimeout(5, TimeUnit.SECONDS)
.create();
CyclicBarrier barrier = new CyclicBarrier(2);
@@ -170,7 +171,7 @@ public class TopicTerminationTest extends BrokerTestBase {
boolean alreadyFailed = false;
try {
- FutureUtil.waitForAll(futures).get();
+ FutureUtil.waitForAll(futures).get(10, TimeUnit.SECONDS);
} catch (Exception e) {
// Ignore for now, check is below
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 3d601465aac..62e8373e2d7 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1051,6 +1051,23 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
}
+ private PulsarClientException getTerminalException(State state) {
+ switch (state) {
+ case Terminated:
+ return new PulsarClientException.TopicTerminatedException(
+ format("The topic %s that the producer %s produces to
has been terminated", topic,
+ producerName));
+ case Closed:
+ return new PulsarClientException.AlreadyClosedException(
+ format("The producer %s of the topic %s was already
closed", producerName, topic));
+ case ProducerFenced:
+ return new PulsarClientException.ProducerFencedException(
+ format("The producer %s of the topic %s was fenced",
producerName, topic));
+ default:
+ return new PulsarClientException.NotConnectedException();
+ }
+ }
+
private boolean isValidProducerState(SendCallback callback, long
sequenceId) {
switch (getState()) {
case Ready:
@@ -2472,9 +2489,20 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
op.cmd.release();
return;
}
+ final State state = getState();
+ if (state == State.Terminated || state == State.Closed || state ==
State.ProducerFenced) {
+ // The producer is in a terminal state and will never
reconnect. Fail the message immediately
+ // rather than leaving it stuck in pendingMessages until
sendTimeout.
+ releaseSemaphoreForSendOp(op);
+
client.getMemoryLimitController().releaseMemory(op.uncompressedSize);
+ op.sendComplete(getTerminalException(state));
+ ReferenceCountUtil.safeRelease(op.cmd);
+ op.recycle();
+ return;
+ }
pendingMessages.add(op);
updateLastSeqPushed(op);
- if (State.RegisteringSchema.equals(getState())) {
+ if (State.RegisteringSchema.equals(state)) {
// Since there is a in-progress schema registration, do not
continuously publish to avoid break publish
// ordering. After the schema registration finished, it will
trigger a "recoverProcessOpSendMsgFrom" to
// publish all messages in "pendingMessages".