It’s a good point that `ProducerImpl#failPendingBatchMessages` treats messages in batch container also as pending messages.
I agree with your definition of "graceful close”. It’s more like a “at most once” semantics, like the original JavaDoc said > pending writes will not be retried Thanks, Yunze > 2021年9月29日 上午5:24,Michael Marshall <mikemars...@gmail.com> 写道: > > Thanks for bringing this thread to the mailing list, Yunze. > > I think the right change is to update the `closeAsync` method to first > flush `batchMessageContainer` and to then asynchronously wait for the > `pendingMessages` queue to drain. We could add a new timeout or rely > on the already implemented `sendTimeout` config to put an upper time > limit on `closeAsync`. My reasoning as well as responses to Joe and > Yunze follow: > >> we need to define the behavior for how to process `pendingMessages` >> and `batchMessageContainer` when producer call `closeAsync`. > > Yes, this is exactly the clarification required, and I agree that the > Javadoc is ambiguous and that the implementation doesn't align with > the Javadoc. > > If we view the Javadoc as binding, then the fundamental question is > what messages are "pending". The `pendingMessages` seem pretty easy to > classify as "pending" given that they are already in flight on the > network. > > I also consider `batchMessageContainer` to be "pending" because a > client application already has callbacks for the messages in this > container. These callbacks are expected to complete when the batch > message delivery completes. Since the client application already has a > reference to a callback, it isn't a problem that the producer > implementation initiates the flush logic. (Note that the current > design fails the `pendingMessages` but does not fail the > `batchMessageContainer` when `closeAsync` is called, so the callbacks > for that container are currently left incomplete forever if the client > is closed with an unsent batch. We will need to address this design in > the work that comes from this discussion.) > > Further, the `ProducerImpl#failPendingMessages` method includes logic > to call `ProducerImpl#failPendingBatchMessages`, which implies that > these batched, but not sent, messages have been historically > considered "pending". > > If we view the Javadoc as non-binding, I think my guiding influence > for the new design would be that the `closeAsync` method should result > in a "graceful" shutdown of the client. > >> What exactly does "graceful" convey here? > > This is a great question, and will likely drive the design here. I > view graceful to mean that the producer attempts to avoid artificial > failures. That means trying to drain the queue instead of > automatically failing all of the queue's callbacks. The tradeoff is > that closing the producer takes longer. This reasoning would justify > my claim that we should first flush the `batchMessageContainer` > instead of failing the batch without any effort at delivery, as that > would be artificial. > >> There is no guarantee that either case will ensure the message >> is published. > > I don't think that implementing `closeAsync` with graceful shutdown > logic implies a guarantee of message publishing. Rather, it guarantees > that failures will be the result of a real exception or a timeout. > Since calling `closeAsync` prevents additional messages from > delivering, users leveraging this functionality might be operating > with "at most once" delivery semantics where they'd prefer to deliver > the messages if possible, but they aren't going to delay application > shutdown indefinitely to deliver its last messages. If users need > stronger guarantees about whether their messages are delivered, they > are probably already using the flush methods to ensure that the > producer's queues are empty before calling `closeAsync`. > > I also agree that in all of these cases, we're assuming that users are > capturing references to the async callbacks and then making business > logic decisions based on the results of those callbacks. > > Thanks, > Michael > > On Tue, Sep 28, 2021 at 4:58 AM Yunze Xu <y...@streamnative.io.invalid> wrote: >> >> I can’t agree more, just like what I’ve said in PR 12195: >> >>> At any case, when you choose `sendAsync`, you should always make use of the >>> returned future to confirm the result of all messages. In Kafka, it's the >>> send callback. >> >> But I found many users are confused about the current behavior, especially >> those are used to Kafka’s close semantics. They might expect a simple try >> to flush existing messages, which works at a simple test environment, even >> there's no guarantee for exception cases. >> >> >> >>> 2021年9月28日 下午4:37,Joe F <joefranc...@gmail.com> 写道: >>> >>> Clients should not depend on any of this behaviour, since the broker is at >>> the other end of an unreliable network connection. The >>> semantic differences are kind of meaningless from a usability point, since >>> flushing on close =/= published. What exactly does "graceful" convey >>> here? Flush the buffer on the client end and hope it makes it to the >>> server. >>> >>> Is there a difference whether you flush(or process) pending messages or >>> not? There is no guarantee that either case will ensure the message is >>> published. >>> >>> The only way to ensure that messages are published is to wait for the ack. >>> The correct model should be to wait for return on the blocking API, or wait >>> for future completion of the async API, then handle any publish errors and >>> then only close the producer. >>> >>> >>> On Mon, Sep 27, 2021 at 8:50 PM Yunze Xu <y...@streamnative.io.invalid> >>> wrote: >>> >>>> Hi all, >>>> >>>> Recently I found a PR (https://github.com/apache/pulsar/pull/12195 < >>>> https://github.com/apache/pulsar/pull/12195>) that >>>> modifies the existing semantics of producer close. There're already some >>>> communications in this PR, but I think it's better to start a discussion >>>> here >>>> to let more know. >>>> >>>> The existing implementation of producer close is: >>>> 1. Cancel all timers, including send and batch container >>>> (`batchMessageContainer`). >>>> 2. Complete all pending messages (`pendingMessages`) with >>>> `AlreadyCloseException`. >>>> >>>> See `ProducerImpl#closeAsync` for details. >>>> >>>> But the JavaDoc of `Producer#closeAsync` is: >>>> >>>>> No more writes will be accepted from this producer. Waits until all >>>> pending write request are persisted. >>>> >>>> Anyway, the document and implementation are inconsistent. But specifically, >>>> we need to define the behavior for how to process `pendingMessages` and >>>> `batchMessageContainer` when producer call `closeAsync`. >>>> >>>> 1. batchMessageContainer: contains the buffered single messages >>>> (`Message<T>`). >>>> 2. pendingMessages: all inflight messages (`OpSendMsg`) in network. >>>> >>>> IMO, from the JavaDoc, only `pendingMessages` should be processed and the >>>> messages in `batchMessageContainer` should be discarded. >>>> >>>> Since other clients might have already implemented the similar semantics of >>>> Java clients. If we changed the semantics now, the behaviors among >>>> different >>>> clients might be inconsistent. >>>> >>>> Should we add a configuration to support graceful close to follow the >>>> docs? Or >>>> just change the current behavior? >>>> >>>> Thanks, >>>> Yunze >>