This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b5c818a538 [fix][client] Fail messages immediately in ProducerImpl 
when in terminal state (#25317)
1b5c818a538 is described below

commit 1b5c818a538ee2cbcc88a4d3b4955eeb07a7d586
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Mar 16 00:23:17 2026 -0700

    [fix][client] Fail messages immediately in ProducerImpl when in terminal 
state (#25317)
---
 .../broker/service/TopicTerminationTest.java       |  3 ++-
 .../apache/pulsar/client/impl/ProducerImpl.java    | 30 +++++++++++++++++++++-
 2 files changed, 31 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
index e4e361eb0b1..40379420637 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
@@ -142,6 +142,7 @@ public class TopicTerminationTest extends BrokerTestBase {
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .sendTimeout(5, TimeUnit.SECONDS)
             .create();
 
         CyclicBarrier barrier = new CyclicBarrier(2);
@@ -170,7 +171,7 @@ public class TopicTerminationTest extends BrokerTestBase {
         boolean alreadyFailed = false;
 
         try {
-            FutureUtil.waitForAll(futures).get();
+            FutureUtil.waitForAll(futures).get(10, TimeUnit.SECONDS);
         } catch (Exception e) {
             // Ignore for now, check is below
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 3d601465aac..62e8373e2d7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1051,6 +1051,23 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         }
     }
 
+    private PulsarClientException getTerminalException(State state) {
+        switch (state) {
+            case Terminated:
+                return new PulsarClientException.TopicTerminatedException(
+                        format("The topic %s that the producer %s produces to 
has been terminated", topic,
+                                producerName));
+            case Closed:
+                return new PulsarClientException.AlreadyClosedException(
+                        format("The producer %s of the topic %s was already 
closed", producerName, topic));
+            case ProducerFenced:
+                return new PulsarClientException.ProducerFencedException(
+                        format("The producer %s of the topic %s was fenced", 
producerName, topic));
+            default:
+                return new PulsarClientException.NotConnectedException();
+        }
+    }
+
     private boolean isValidProducerState(SendCallback callback, long 
sequenceId) {
         switch (getState()) {
             case Ready:
@@ -2472,9 +2489,20 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 op.cmd.release();
                 return;
             }
+            final State state = getState();
+            if (state == State.Terminated || state == State.Closed || state == 
State.ProducerFenced) {
+                // The producer is in a terminal state and will never 
reconnect. Fail the message immediately
+                // rather than leaving it stuck in pendingMessages until 
sendTimeout.
+                releaseSemaphoreForSendOp(op);
+                
client.getMemoryLimitController().releaseMemory(op.uncompressedSize);
+                op.sendComplete(getTerminalException(state));
+                ReferenceCountUtil.safeRelease(op.cmd);
+                op.recycle();
+                return;
+            }
             pendingMessages.add(op);
             updateLastSeqPushed(op);
-            if (State.RegisteringSchema.equals(getState())) {
+            if (State.RegisteringSchema.equals(state)) {
                 // Since there is a in-progress schema registration, do not 
continuously publish to avoid break publish
                 // ordering. After the schema registration finished, it will 
trigger a "recoverProcessOpSendMsgFrom" to
                 // publish all messages in "pendingMessages".

Reply via email to