Following up here. I am pretty sure part of this conversation has been based on a misunderstanding of the code. From what I can tell, the behavior for `Producer#closeAsync` in the client (mostly) aligns with the current Javadocs.
> The existing implementation of producer close is: > 1. Cancel all timers, including send and batch container > (`batchMessageContainer`). > 2. Complete all pending messages (`pendingMessages`) with > `AlreadyCloseException`. I agree with 1, but I think 2 is only partially correct. The client will only exceptionally complete pending messages if the connection is null or not ready, or after the broker responds to the `CLOSE_PRODUCER` command or a timeout passes. This behavior seems right to me. Here is the relevant code: https://github.com/apache/pulsar/blob/9d309145f342bc416b8b4663125e1216903a3e83/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L875-L909 The only remaining question: does close imply flush? If not, we'll need update the logic to fail the messages contained in the `batchMessageContainer` during close. Otherwise, we'll update the logic to call flush before sending the `CLOSE_PRODUCER` command and everything should work as documented. In both cases, we should update the Javadocs to make the behavior clearer. Thanks, Michael On Thu, Sep 30, 2021 at 11:55 AM Michael Marshall <mikemars...@gmail.com> wrote: > > I have two questions: > > 1. Does close imply immediate shutdown? > 2. Does close imply flush? > > There is not yet consensus on 1, and 2 is only relevant if 1's answer is "no". > > Thus far, the conversation has centered on the `Producer#close` > method. I'd like to broaden the discussion to include some other > methods from the `PulsarClient` interface: `shutdown` and `close`. > > The Javadoc for `PulsarClient#shutdown` describes the "shutdown > immediately" behavior. It says: > > > Release all the resources and close all the producer, consumer and > > reader instances without waiting for ongoing operations to complete. > > The Javadoc for `PulsarClient#close` describes waiting for > pending/in-flight messages to complete before returning. It says: > > > This operation will trigger a graceful close of all producer, consumer > > and reader instances that this client has currently active. That implies > > that close will block and wait until all pending producer send requests > > are persisted. > > One question that follows from the above: why does the `Producer` not > have a `shutdown` method? I think this is because the "immediate > shutdown" behavior is not necessary for a single producer. When > immediate shutdown semantics are required, the `PulsarClient#shutdown` > method is sufficient because it is used when shutting down the whole > application. (If this is not correct, perhaps we should add a > `shutdown` method to the producer?) > > Since immediate shutdown semantics are already available via our > client API, I posit that the answer to question 1 is no, `close` does > not imply immediate shutdown. At the very least, `close` in the Pulsar > Client has not historically implied immediate shutdown. > > Additionally, it is relevant to point out that the `Producer#close` > method is already sending a `CLOSE_PRODUCER` command and waiting on a > response back from the broker. The broker's producer close method has > the following Javadoc: > > > Close the producer immediately if: a. the connection is dropped > > b. it's a graceful close and no pending publish acks are left > > else wait for pending publish acks > > Since we're already waiting on the broker to respond to the producer's > `CLOSE_PRODUCER` request, I see no reason to fail pending/in-flight > messages immediately, especially because we should get a response back > for those messages before getting the `SUCCESS` response from the > broker since the responses will come on the same TCP connection. We > could even simplify the close logic so that when the `CLOSE_PRODUCER` > request completes (either successfully or because of a failure), we > fail all remaining pending message futures. > > Ultimately, we need to decide whether to update the implementation to > match the existing Javadocs, or to update the Javadocs to indicate > that `close` means an immediate shutdown, which includes failing all > outstanding message futures immediately. My vote is to make the > implementation align with the Javadocs. > > Regarding question 2, I prefer that `close` implies flush because it > is only a single (batched) message being flushed. If we do flush this > message, we'll need to make sure that the message is sent before > the `CLOSE_PRODUCER` command is sent. > > Thanks, > Michael > > > On Wed, Sep 29, 2021 at 7:04 AM Enrico Olivelli <eolive...@gmail.com> wrote: > > > > I agree that we must ensure that every pending callback must be completed > > eventually (timeout or another error is not a problem), > > because we cannot let the client application hang forever. > > I believe that the application can perform a flush() explicitly and also > > wait for every callback to be executed if that is the requirement. > > > > Usually you call close() when: > > 1. you have a serious problem: you already know that there is a hard error, > > and you want to close the Producer or the Application and possibly start a > > new one to recover > > 2. you are shutting down your application or component: you have control > > over the callbacks, so you can wait for them to complete > > > > So case 2. can be covered by the application. We have to support case 1: > > fail fast and close (no need for flush()) . > > > > In my experience trying to implement "graceful stops" adds only complexity > > and false hopes to the users. > > > > Enrico > > > > > > > > Il giorno mer 29 set 2021 alle ore 13:58 Nirvana <1572139...@qq.com.invalid> > > ha scritto: > > > > > I agree to try to ensure ”at most once“ when closing。 > > > > > > > > > > That would still get controlled by send timeout, after that, the send > > > will fail and close should proceed. > > > This sounds more in line with “at most once”。 > > > > > > > > > ------------------ 原始邮件 ------------------ > > > 发件人: > > > "dev" > > > < > > > matteo.me...@gmail.com>; > > > 发送时间: 2021年9月29日(星期三) 下午3:55 > > > 收件人: "Dev"<dev@pulsar.apache.org>; > > > > > > 主题: Re: Correct semantics of producer close > > > > > > > > > > > > > but equally they might be > > > > surprised when closeAsync doesn't complete because the pending > > > messages > > > > can't be cleared > > > > > > That would still get controlled by send timeout, after that, the send > > > will fail and close should proceed. > > > > > > -- > > > Matteo Merli > > > <matteo.me...@gmail.com> > > > > > > On Wed, Sep 29, 2021 at 12:52 AM Jack Vanlightly > > > <jvanligh...@splunk.com.invalid> wrote: > > > > > > > > I can see both sides of the argument regarding whether to flush > > > pending > > > > messages or not. But I think what is definitely in the contract is > > > not to > > > > discard any callbacks causing user code to block forever. No matter > > > what, > > > > we must always call the callbacks. > > > > > > > > Personally, I am in favour of a close operation not flushing pending > > > > messages (and I define pending here as any message that has a > > > callback). > > > > The reason is that if we wait for all pending messages to be sent > > > then we > > > > now face a number of edge cases that could cause the close operation > > > to > > > > take a very long time to complete. What if the user code really just > > > needs > > > > to close the producer right now? If we amend the documentation to > > > make it > > > > clear that close does not flush pending messages then the user is now > > > able > > > > to explicitly craft the behaviour they need. If they want all > > > messages > > > > flushed first then chaing flushAsync->closeAsync else just > > > closeAsync. > > > > > > > > Unfortunately I think user expectation, regardless of the current > > > javadoc, > > > > is that close would flush everything and in an ideal world it would. > > > We > > > > have the Principle of Least Surprise but we also have Safe By > > > Default. > > > > Users might be surprised that when calling closeAsync, a load of > > > their > > > > pending messages get ConnectionAlreadyClosed, but equally they might > > > be > > > > surprised when closeAsync doesn't complete because the pending > > > messages > > > > can't be cleared. Failing pending messages is the safer option. User > > > code > > > > must handle failure responses and cannot claim data loss with a > > > > non-positive response. But if they can't close a producer, that could > > > > result in a wider impact on their system, not to mention more issues > > > > created in GitHub. > > > > > > > > Jack > > > > > > > > On Wed, Sep 29, 2021 at 7:05 AM Joe F <joefranc...@gmail.com> > > > wrote: > > > > > > > > > [ External sender. Exercise caution. ] > > > > > > > > > > >I don't think that implementing `closeAsync` with graceful > > > shutdown > > > > > logic implies a guarantee of message publishing. Rather, it > > > guarantees > > > > > that failures will be the result of a real exception or a > > > timeout. > > > > > > > > > > I think that's beside the point. There > > > is no definition of "real" > > > > > exceptions. At that point the app is publishing on a > > > best effort basis, > > > > > and there are no guarantees anywhere in client or server. > > > > > > > > > > There is no concept of "maybe published". OR > > > > > "published-if-no_real_errors". What does that even > > > mean? That is only a > > > > > can of worms which is going to add to developer confusion and > > > lead to > > > > > Pulsar users finding in the worst possible way that something > > > got lost > > > > > because it never got published. It's a poor experience > > > when you find it. > > > > > I have a real life experience where a user used async APIs (in a > > > lambda), > > > > > which hummed along fine. One day much later, the cloud had > > > a hitch, and > > > > > they discovered a message was not delivered. > > > > > > > > > > I am more concerned about developers discovering at the worst > > > possible time > > > > > that ""published-if-no_real_errors" is a concept. > > > > > > > > > > My suggestion is to make this simple for developers. > > > > > > > > > > ----The sync/async nature of the close() [ or any other API, for > > > that > > > > > matter ] is completely orthogonal to the API semantics, > > > and is just a > > > > > programmatic choice to deal with how resources are managed > > > within the > > > > > program. That's not material here.--- > > > > > > > > > > A close() is an action that is shutting down the producer right > > > now, not > > > > > even waiting for any acks of inflight messages. A willingness to > > > lose > > > > > pending/inflight messages is explicit in that call. The > > > producer will not > > > > > be around to deal with errors or to retry failed messages once > > > close() is > > > > > invoked. > > > > > > > > > > On the contrary, if the client does not want to deal with > > > message loss, > > > > > then flush(), stick around to gather the acks, deal with errors > > > and retries > > > > > etc and then do close() . Then close() will be just a resource > > > management > > > > > action on the client. > > > > > > > > > > So update the documentation to reflect that. ---> if close() > > > is called on a > > > > > producer with messages pending acks, those messages are left > > > indoubt. Avoid > > > > > all mention of flushes, best effort etc. Users must buy > > > into uncertainty, > > > > > without any qualifications. > > > > > > > > > > I would at all costs avoid using the term "graceful" > > > anywhere. That word > > > > > has specific semantics associated with it in the systems/storage > > > domain , > > > > > and what is being proposed here is nothing like that. > > > > > > > > > > -j > > > > > > > > > > > > > > > On Tue, Sep 28, 2021 at 7:05 PM Yunze Xu > > > <y...@streamnative.io.invalid> > > > > > wrote: > > > > > > > > > > > It’s a good point that > > > `ProducerImpl#failPendingBatchMessages` treats > > > > > > messages in batch container also as pending messages. > > > > > > > > > > > > I agree with your definition of "graceful close”. It’s more > > > like a “at > > > > > > most once” > > > > > > semantics, like the original JavaDoc said > > > > > > > > > > > > > pending writes will not be retried > > > > > > > > > > > > Thanks, > > > > > > Yunze > > > > > > > > > > > > > 2021年9月29日 上午5:24,Michael Marshall < > > > mikemars...@gmail.com> 写道: > > > > > > > > > > > > > > Thanks for bringing this thread to the mailing list, > > > Yunze. > > > > > > > > > > > > > > I think the right change is to update the `closeAsync` > > > method to first > > > > > > > flush `batchMessageContainer` and to then > > > asynchronously wait for the > > > > > > > `pendingMessages` queue to drain. We could add a new > > > timeout or rely > > > > > > > on the already implemented `sendTimeout` config to put > > > an upper time > > > > > > > limit on `closeAsync`. My reasoning as well as > > > responses to Joe and > > > > > > > Yunze follow: > > > > > > > > > > > > > >> we need to define the behavior for how to process > > > `pendingMessages` > > > > > > >> and `batchMessageContainer` when producer call > > > `closeAsync`. > > > > > > > > > > > > > > Yes, this is exactly the clarification required, and I > > > agree that the > > > > > > > Javadoc is ambiguous and that the implementation > > > doesn't align with > > > > > > > the Javadoc. > > > > > > > > > > > > > > If we view the Javadoc as binding, then the > > > fundamental question is > > > > > > > what messages are "pending". The `pendingMessages` > > > seem pretty easy to > > > > > > > classify as "pending" given that they are already in > > > flight on the > > > > > > > network. > > > > > > > > > > > > > > I also consider `batchMessageContainer` to be > > > "pending" because a > > > > > > > client application already has callbacks for the > > > messages in this > > > > > > > container. These callbacks are expected to complete > > > when the batch > > > > > > > message delivery completes. Since the client > > > application already has a > > > > > > > reference to a callback, it isn't a problem that the > > > producer > > > > > > > implementation initiates the flush logic. (Note that > > > the current > > > > > > > design fails the `pendingMessages` but does not fail > > > the > > > > > > > `batchMessageContainer` when `closeAsync` is called, > > > so the callbacks > > > > > > > for that container are currently left incomplete > > > forever if the client > > > > > > > is closed with an unsent batch. We will need to > > > address this design in > > > > > > > the work that comes from this discussion.) > > > > > > > > > > > > > > Further, the `ProducerImpl#failPendingMessages` method > > > includes logic > > > > > > > to call `ProducerImpl#failPendingBatchMessages`, which > > > implies that > > > > > > > these batched, but not sent, messages have been > > > historically > > > > > > > considered "pending". > > > > > > > > > > > > > > If we view the Javadoc as non-binding, I think my > > > guiding influence > > > > > > > for the new design would be that the `closeAsync` > > > method should result > > > > > > > in a "graceful" shutdown of the client. > > > > > > > > > > > > > >> What exactly does "graceful" convey here? > > > > > > > > > > > > > > This is a great question, and will likely drive the > > > design here. I > > > > > > > view graceful to mean that the producer attempts to > > > avoid artificial > > > > > > > failures. That means trying to drain the queue instead > > > of > > > > > > > automatically failing all of the queue's callbacks. > > > The tradeoff is > > > > > > > that closing the producer takes longer. This reasoning > > > would justify > > > > > > > my claim that we should first flush the > > > `batchMessageContainer` > > > > > > > instead of failing the batch without any effort at > > > delivery, as that > > > > > > > would be artificial. > > > > > > > > > > > > > >> There is no guarantee that either case will ensure > > > the message > > > > > > >> is published. > > > > > > > > > > > > > > I don't think that implementing `closeAsync` with > > > graceful shutdown > > > > > > > logic implies a guarantee of message publishing. > > > Rather, it guarantees > > > > > > > that failures will be the result of a real exception > > > or a timeout. > > > > > > > Since calling `closeAsync` prevents additional > > > messages from > > > > > > > delivering, users leveraging this functionality might > > > be operating > > > > > > > with "at most once" delivery semantics where they'd > > > prefer to deliver > > > > > > > the messages if possible, but they aren't going to > > > delay application > > > > > > > shutdown indefinitely to deliver its last messages. If > > > users need > > > > > > > stronger guarantees about whether their messages are > > > delivered, they > > > > > > > are probably already using the flush methods to ensure > > > that the > > > > > > > producer's queues are empty before calling > > > `closeAsync`. > > > > > > > > > > > > > > I also agree that in all of these cases, we're > > > assuming that users are > > > > > > > capturing references to the async callbacks and then > > > making business > > > > > > > logic decisions based on the results of those > > > callbacks. > > > > > > > > > > > > > > Thanks, > > > > > > > Michael > > > > > > > > > > > > > > On Tue, Sep 28, 2021 at 4:58 AM Yunze Xu > > > <y...@streamnative.io.invalid > > > > > > > > > > > > wrote: > > > > > > >> > > > > > > >> I can’t agree more, just like what I’ve said in PR > > > 12195: > > > > > > >> > > > > > > >>> At any case, when you choose `sendAsync`, you > > > should always make use > > > > > > of the returned future to confirm the result of all > > > messages. In Kafka, > > > > > > it's the send callback. > > > > > > >> > > > > > > >> But I found many users are confused about the > > > current behavior, > > > > > > especially > > > > > > >> those are used to Kafka’s close semantics. They > > > might expect a simple > > > > > > try > > > > > > >> to flush existing messages, which works at a > > > simple test environment, > > > > > > even > > > > > > >> there's no guarantee for exception cases. > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >>> 2021年9月28日 下午4:37,Joe F > > > <joefranc...@gmail.com> > > > 写道: > > > > > > >>> > > > > > > >>> Clients should not depend on any of this > > > behaviour, since the broker > > > > > > is at > > > > > > >>> the other end of an unreliable network > > > connection. The > > > > > > >>> semantic differences are kind of meaningless > > > from a usability point, > > > > > > since > > > > > > >>> flushing on close =/= published. What > > > exactly does "graceful" convey > > > > > > >>> here? Flush the buffer on the > > > client end and hope it makes it to > > > > > the > > > > > > >>> server. > > > > > > >>> > > > > > > >>> Is there a difference whether you > > > flush(or process) pending messages > > > > > > or > > > > > > >>> not? There is no guarantee that either case > > > will ensure the message > > > > > is > > > > > > >>> published. > > > > > > >>> > > > > > > >>> The only way to ensure that messages are > > > published is to wait for the > > > > > > ack. > > > > > > >>> The correct model should be to wait for return > > > on the blocking API, > > > > > or > > > > > > wait > > > > > > >>> for future completion of the async API, then > > > handle any publish > > > > > errors > > > > > > and > > > > > > >>> then only close the producer. > > > > > > >>> > > > > > > >>> > > > > > > >>> On Mon, Sep 27, 2021 at 8:50 PM Yunze Xu > > > > > <y...@streamnative.io.invalid > > > > > > > > > > > > > >>> wrote: > > > > > > >>> > > > > > > >>>> Hi all, > > > > > > >>>> > > > > > > >>>> Recently I found a PR ( > > > https://github.com/apache/pulsar/pull/12195 > > > > <https://github.com/apache/pulsar/pull/12195>>; > < > > > > > > >>>> > > > https://github.com/apache/pulsar/pull/12195>) that > > > > > > >>>> modifies the existing semantics of > > > producer close. There're already > > > > > > some > > > > > > >>>> communications in this PR, but I think > > > it's better to start a > > > > > > discussion > > > > > > >>>> here > > > > > > >>>> to let more know. > > > > > > >>>> > > > > > > >>>> The existing implementation of producer > > > close is: > > > > > > >>>> 1. Cancel all timers, including send and > > > batch container > > > > > > >>>> (`batchMessageContainer`). > > > > > > >>>> 2. Complete all pending messages > > > (`pendingMessages`) with > > > > > > >>>> `AlreadyCloseException`. > > > > > > >>>> > > > > > > >>>> See `ProducerImpl#closeAsync` for details. > > > > > > >>>> > > > > > > >>>> But the JavaDoc of `Producer#closeAsync` > > > is: > > > > > > >>>> > > > > > > >>>>> No more writes will be accepted from > > > this producer. Waits until all > > > > > > >>>> pending write request are persisted. > > > > > > >>>> > > > > > > >>>> Anyway, the document and implementation > > > are inconsistent. But > > > > > > specifically, > > > > > > >>>> we need to define the behavior for how to > > > process `pendingMessages` > > > > > > and > > > > > > >>>> `batchMessageContainer` when producer call > > > `closeAsync`. > > > > > > >>>> > > > > > > >>>> 1. batchMessageContainer: contains the > > > buffered single messages > > > > > > >>>> (`Message<T>`). > > > > > > >>>> 2. pendingMessages: all inflight messages > > > (`OpSendMsg`) in network. > > > > > > >>>> > > > > > > >>>> IMO, from the JavaDoc, only > > > `pendingMessages` should be processed > > > > > and > > > > > > the > > > > > > >>>> messages in `batchMessageContainer` should > > > be discarded. > > > > > > >>>> > > > > > > >>>> Since other clients might have already > > > implemented the similar > > > > > > semantics of > > > > > > >>>> Java clients. If we changed the semantics > > > now, the behaviors among > > > > > > >>>> different > > > > > > >>>> clients might be inconsistent. > > > > > > >>>> > > > > > > >>>> Should we add a configuration to support > > > graceful close to follow > > > > > the > > > > > > >>>> docs? Or > > > > > > >>>> just change the current behavior? > > > > > > >>>> > > > > > > >>>> Thanks, > > > > > > >>>> Yunze > > > > > > >> > > > > > > > > > > > > > > > > >