Thanks for bringing this thread to the mailing list, Yunze. I think the right change is to update the `closeAsync` method to first flush `batchMessageContainer` and to then asynchronously wait for the `pendingMessages` queue to drain. We could add a new timeout or rely on the already implemented `sendTimeout` config to put an upper time limit on `closeAsync`. My reasoning as well as responses to Joe and Yunze follow:
> we need to define the behavior for how to process `pendingMessages` > and `batchMessageContainer` when producer call `closeAsync`. Yes, this is exactly the clarification required, and I agree that the Javadoc is ambiguous and that the implementation doesn't align with the Javadoc. If we view the Javadoc as binding, then the fundamental question is what messages are "pending". The `pendingMessages` seem pretty easy to classify as "pending" given that they are already in flight on the network. I also consider `batchMessageContainer` to be "pending" because a client application already has callbacks for the messages in this container. These callbacks are expected to complete when the batch message delivery completes. Since the client application already has a reference to a callback, it isn't a problem that the producer implementation initiates the flush logic. (Note that the current design fails the `pendingMessages` but does not fail the `batchMessageContainer` when `closeAsync` is called, so the callbacks for that container are currently left incomplete forever if the client is closed with an unsent batch. We will need to address this design in the work that comes from this discussion.) Further, the `ProducerImpl#failPendingMessages` method includes logic to call `ProducerImpl#failPendingBatchMessages`, which implies that these batched, but not sent, messages have been historically considered "pending". If we view the Javadoc as non-binding, I think my guiding influence for the new design would be that the `closeAsync` method should result in a "graceful" shutdown of the client. > What exactly does "graceful" convey here? This is a great question, and will likely drive the design here. I view graceful to mean that the producer attempts to avoid artificial failures. That means trying to drain the queue instead of automatically failing all of the queue's callbacks. The tradeoff is that closing the producer takes longer. This reasoning would justify my claim that we should first flush the `batchMessageContainer` instead of failing the batch without any effort at delivery, as that would be artificial. > There is no guarantee that either case will ensure the message > is published. I don't think that implementing `closeAsync` with graceful shutdown logic implies a guarantee of message publishing. Rather, it guarantees that failures will be the result of a real exception or a timeout. Since calling `closeAsync` prevents additional messages from delivering, users leveraging this functionality might be operating with "at most once" delivery semantics where they'd prefer to deliver the messages if possible, but they aren't going to delay application shutdown indefinitely to deliver its last messages. If users need stronger guarantees about whether their messages are delivered, they are probably already using the flush methods to ensure that the producer's queues are empty before calling `closeAsync`. I also agree that in all of these cases, we're assuming that users are capturing references to the async callbacks and then making business logic decisions based on the results of those callbacks. Thanks, Michael On Tue, Sep 28, 2021 at 4:58 AM Yunze Xu <y...@streamnative.io.invalid> wrote: > > I can’t agree more, just like what I’ve said in PR 12195: > > > At any case, when you choose `sendAsync`, you should always make use of the > > returned future to confirm the result of all messages. In Kafka, it's the > > send callback. > > But I found many users are confused about the current behavior, especially > those are used to Kafka’s close semantics. They might expect a simple try > to flush existing messages, which works at a simple test environment, even > there's no guarantee for exception cases. > > > > > 2021年9月28日 下午4:37,Joe F <joefranc...@gmail.com> 写道: > > > > Clients should not depend on any of this behaviour, since the broker is at > > the other end of an unreliable network connection. The > > semantic differences are kind of meaningless from a usability point, since > > flushing on close =/= published. What exactly does "graceful" convey > > here? Flush the buffer on the client end and hope it makes it to the > > server. > > > > Is there a difference whether you flush(or process) pending messages or > > not? There is no guarantee that either case will ensure the message is > > published. > > > > The only way to ensure that messages are published is to wait for the ack. > > The correct model should be to wait for return on the blocking API, or wait > > for future completion of the async API, then handle any publish errors and > > then only close the producer. > > > > > > On Mon, Sep 27, 2021 at 8:50 PM Yunze Xu <y...@streamnative.io.invalid> > > wrote: > > > >> Hi all, > >> > >> Recently I found a PR (https://github.com/apache/pulsar/pull/12195 < > >> https://github.com/apache/pulsar/pull/12195>) that > >> modifies the existing semantics of producer close. There're already some > >> communications in this PR, but I think it's better to start a discussion > >> here > >> to let more know. > >> > >> The existing implementation of producer close is: > >> 1. Cancel all timers, including send and batch container > >> (`batchMessageContainer`). > >> 2. Complete all pending messages (`pendingMessages`) with > >> `AlreadyCloseException`. > >> > >> See `ProducerImpl#closeAsync` for details. > >> > >> But the JavaDoc of `Producer#closeAsync` is: > >> > >>> No more writes will be accepted from this producer. Waits until all > >> pending write request are persisted. > >> > >> Anyway, the document and implementation are inconsistent. But specifically, > >> we need to define the behavior for how to process `pendingMessages` and > >> `batchMessageContainer` when producer call `closeAsync`. > >> > >> 1. batchMessageContainer: contains the buffered single messages > >> (`Message<T>`). > >> 2. pendingMessages: all inflight messages (`OpSendMsg`) in network. > >> > >> IMO, from the JavaDoc, only `pendingMessages` should be processed and the > >> messages in `batchMessageContainer` should be discarded. > >> > >> Since other clients might have already implemented the similar semantics of > >> Java clients. If we changed the semantics now, the behaviors among > >> different > >> clients might be inconsistent. > >> > >> Should we add a configuration to support graceful close to follow the > >> docs? Or > >> just change the current behavior? > >> > >> Thanks, > >> Yunze >