This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 84703ace1b4ba7998a5251f22fd0c1cef9c4f294
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Aug 5 11:29:04 2025 +0800

    [improve][doc] Improve the JavaDocs of sendAsync to avoid improper use 
(#24601)
    
    (cherry picked from commit dedba1a9bf0becb1b3f80f1d89a779451734fbe6)
---
 .../org/apache/pulsar/client/api/Producer.java     | 59 ++++++++++++++++++++++
 1 file changed, 59 insertions(+)

diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java
index 4cf3d6b8164..3c5102eed2f 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.client.api;
 
 import java.io.Closeable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
@@ -70,6 +72,63 @@ public interface Producer<T> extends Closeable {
      * {@link ProducerBuilder#blockIfQueueFull(boolean)} to change the 
blocking behavior.
      *
      * <p>Use {@link #newMessage()} to specify more properties than just the 
value on the message to be sent.
+     * <p>
+     * Note: The returned future is completed in the internal network I/O 
thread. If a callback that takes a long time
+     * to complete is registered on the future, it can negatively impact the 
internal network processing.
+     * </p>
+     *
+     * For example, consider the following code snippet:
+     * <pre>{@code
+     * Producer<byte[]> producer1 = 
client.newProducer().topic("topic1").create();
+     * CompletableFuture<MessageId> future = 
producer.sendAsync("hello".getBytes());
+     * future.thenAccept(__ -> {
+     *     try {
+     *         Thread.sleep(1000 * 3600L); // Simulates a long-running task (1 
hour)
+     *     } catch (InterruptedException ignored) {
+     *     }
+     * });
+     * future.get();
+     * Producer<byte[]> producer2 = 
client.newProducer().topic("topic2").create();
+     * }</pre>
+     *
+     * <p>
+     * In this example, the creation of `producer2` could be blocked for 1 
hour. This behavior might seem
+     * counter-intuitive, but it occurs because the callback registered on the 
`future` is executed immediately in the
+     * single network I/O thread after the `future` is completed. While the 
callback is running (e.g., sleeping for 1
+     * hour), the I/O thread is unable to process any other network responses 
from the broker, causing a bottleneck.
+     * </p>
+     *
+     * <p>
+     * In addition, invoking any synchronous APIs within the callback of an 
asynchronous operation will lead to a
+     * deadlock. For example:
+     * </p>
+     *
+     * <pre>{@code
+     * producer.sendAsync("msg-1".getBytes()).thenAccept(__ -> 
producer.send("msg-2".getBytes()));
+     * }</pre>
+     *
+     * <p>
+     * In the example above, the synchronous `send` method is called within 
the callback of the asynchronous `sendAsync`
+     * method. This will cause a deadlock because the I/O thread responsible 
for completing the `sendAsync` operation is
+     * blocked waiting for the synchronous `send` method to complete. As a 
result, "msg-2" will never be sent, and the
+     * I/O thread will remain blocked indefinitely. This can have a cascading 
effect, impacting all producers and
+     * consumers created by the same {@link PulsarClient}.
+     * </p>
+     *
+     * <p>
+     * To avoid issues above, you should ensure that callbacks are executed in 
a separate thread or executor. This can
+     * be achieved by using the `xxxAsync` APIs, such as:
+     * </p>
+     *
+     * <ul>
+     *   <li>{@link CompletableFuture#thenAcceptAsync(Consumer, Executor)}</li>
+     *   <li>{@link CompletableFuture#thenAcceptAsync(Consumer)}</li>
+     * </ul>
+     *
+     * <p>
+     * These methods allow you to specify an executor for the callback, 
ensuring that the network I/O thread remains
+     * unblocked. Alternatively, you can ensure that the callback logic is 
lightweight and completes quickly.
+     * </p>
      *
      * @param message
      *            a byte array with the payload of the message

Reply via email to