This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 84703ace1b4ba7998a5251f22fd0c1cef9c4f294 Author: Yunze Xu <[email protected]> AuthorDate: Tue Aug 5 11:29:04 2025 +0800 [improve][doc] Improve the JavaDocs of sendAsync to avoid improper use (#24601) (cherry picked from commit dedba1a9bf0becb1b3f80f1d89a779451734fbe6) --- .../org/apache/pulsar/client/api/Producer.java | 59 ++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java index 4cf3d6b8164..3c5102eed2f 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java @@ -20,6 +20,8 @@ package org.apache.pulsar.client.api; import java.io.Closeable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.Consumer; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; @@ -70,6 +72,63 @@ public interface Producer<T> extends Closeable { * {@link ProducerBuilder#blockIfQueueFull(boolean)} to change the blocking behavior. * * <p>Use {@link #newMessage()} to specify more properties than just the value on the message to be sent. + * <p> + * Note: The returned future is completed in the internal network I/O thread. If a callback that takes a long time + * to complete is registered on the future, it can negatively impact the internal network processing. + * </p> + * + * For example, consider the following code snippet: + * <pre>{@code + * Producer<byte[]> producer1 = client.newProducer().topic("topic1").create(); + * CompletableFuture<MessageId> future = producer.sendAsync("hello".getBytes()); + * future.thenAccept(__ -> { + * try { + * Thread.sleep(1000 * 3600L); // Simulates a long-running task (1 hour) + * } catch (InterruptedException ignored) { + * } + * }); + * future.get(); + * Producer<byte[]> producer2 = client.newProducer().topic("topic2").create(); + * }</pre> + * + * <p> + * In this example, the creation of `producer2` could be blocked for 1 hour. This behavior might seem + * counter-intuitive, but it occurs because the callback registered on the `future` is executed immediately in the + * single network I/O thread after the `future` is completed. While the callback is running (e.g., sleeping for 1 + * hour), the I/O thread is unable to process any other network responses from the broker, causing a bottleneck. + * </p> + * + * <p> + * In addition, invoking any synchronous APIs within the callback of an asynchronous operation will lead to a + * deadlock. For example: + * </p> + * + * <pre>{@code + * producer.sendAsync("msg-1".getBytes()).thenAccept(__ -> producer.send("msg-2".getBytes())); + * }</pre> + * + * <p> + * In the example above, the synchronous `send` method is called within the callback of the asynchronous `sendAsync` + * method. This will cause a deadlock because the I/O thread responsible for completing the `sendAsync` operation is + * blocked waiting for the synchronous `send` method to complete. As a result, "msg-2" will never be sent, and the + * I/O thread will remain blocked indefinitely. This can have a cascading effect, impacting all producers and + * consumers created by the same {@link PulsarClient}. + * </p> + * + * <p> + * To avoid issues above, you should ensure that callbacks are executed in a separate thread or executor. This can + * be achieved by using the `xxxAsync` APIs, such as: + * </p> + * + * <ul> + * <li>{@link CompletableFuture#thenAcceptAsync(Consumer, Executor)}</li> + * <li>{@link CompletableFuture#thenAcceptAsync(Consumer)}</li> + * </ul> + * + * <p> + * These methods allow you to specify an executor for the callback, ensuring that the network I/O thread remains + * unblocked. Alternatively, you can ensure that the callback logic is lightweight and completes quickly. + * </p> * * @param message * a byte array with the payload of the message
