I have two questions: 1. Does close imply immediate shutdown? 2. Does close imply flush?
There is not yet consensus on 1, and 2 is only relevant if 1's answer is "no". Thus far, the conversation has centered on the `Producer#close` method. I'd like to broaden the discussion to include some other methods from the `PulsarClient` interface: `shutdown` and `close`. The Javadoc for `PulsarClient#shutdown` describes the "shutdown immediately" behavior. It says: > Release all the resources and close all the producer, consumer and > reader instances without waiting for ongoing operations to complete. The Javadoc for `PulsarClient#close` describes waiting for pending/in-flight messages to complete before returning. It says: > This operation will trigger a graceful close of all producer, consumer > and reader instances that this client has currently active. That implies > that close will block and wait until all pending producer send requests > are persisted. One question that follows from the above: why does the `Producer` not have a `shutdown` method? I think this is because the "immediate shutdown" behavior is not necessary for a single producer. When immediate shutdown semantics are required, the `PulsarClient#shutdown` method is sufficient because it is used when shutting down the whole application. (If this is not correct, perhaps we should add a `shutdown` method to the producer?) Since immediate shutdown semantics are already available via our client API, I posit that the answer to question 1 is no, `close` does not imply immediate shutdown. At the very least, `close` in the Pulsar Client has not historically implied immediate shutdown. Additionally, it is relevant to point out that the `Producer#close` method is already sending a `CLOSE_PRODUCER` command and waiting on a response back from the broker. The broker's producer close method has the following Javadoc: > Close the producer immediately if: a. the connection is dropped > b. it's a graceful close and no pending publish acks are left > else wait for pending publish acks Since we're already waiting on the broker to respond to the producer's `CLOSE_PRODUCER` request, I see no reason to fail pending/in-flight messages immediately, especially because we should get a response back for those messages before getting the `SUCCESS` response from the broker since the responses will come on the same TCP connection. We could even simplify the close logic so that when the `CLOSE_PRODUCER` request completes (either successfully or because of a failure), we fail all remaining pending message futures. Ultimately, we need to decide whether to update the implementation to match the existing Javadocs, or to update the Javadocs to indicate that `close` means an immediate shutdown, which includes failing all outstanding message futures immediately. My vote is to make the implementation align with the Javadocs. Regarding question 2, I prefer that `close` implies flush because it is only a single (batched) message being flushed. If we do flush this message, we'll need to make sure that the message is sent before the `CLOSE_PRODUCER` command is sent. Thanks, Michael On Wed, Sep 29, 2021 at 7:04 AM Enrico Olivelli <eolive...@gmail.com> wrote: > > I agree that we must ensure that every pending callback must be completed > eventually (timeout or another error is not a problem), > because we cannot let the client application hang forever. > I believe that the application can perform a flush() explicitly and also > wait for every callback to be executed if that is the requirement. > > Usually you call close() when: > 1. you have a serious problem: you already know that there is a hard error, > and you want to close the Producer or the Application and possibly start a > new one to recover > 2. you are shutting down your application or component: you have control > over the callbacks, so you can wait for them to complete > > So case 2. can be covered by the application. We have to support case 1: > fail fast and close (no need for flush()) . > > In my experience trying to implement "graceful stops" adds only complexity > and false hopes to the users. > > Enrico > > > > Il giorno mer 29 set 2021 alle ore 13:58 Nirvana <1572139...@qq.com.invalid> > ha scritto: > > > I agree to try to ensure ”at most once“ when closing。 > > > > > > > That would still get controlled by send timeout, after that, the send > > will fail and close should proceed. > > This sounds more in line with “at most once”。 > > > > > > ------------------ 原始邮件 ------------------ > > 发件人: > > "dev" > > < > > matteo.me...@gmail.com>; > > 发送时间: 2021年9月29日(星期三) 下午3:55 > > 收件人: "Dev"<dev@pulsar.apache.org>; > > > > 主题: Re: Correct semantics of producer close > > > > > > > > > but equally they might be > > > surprised when closeAsync doesn't complete because the pending > > messages > > > can't be cleared > > > > That would still get controlled by send timeout, after that, the send > > will fail and close should proceed. > > > > -- > > Matteo Merli > > <matteo.me...@gmail.com> > > > > On Wed, Sep 29, 2021 at 12:52 AM Jack Vanlightly > > <jvanligh...@splunk.com.invalid> wrote: > > > > > > I can see both sides of the argument regarding whether to flush > > pending > > > messages or not. But I think what is definitely in the contract is > > not to > > > discard any callbacks causing user code to block forever. No matter > > what, > > > we must always call the callbacks. > > > > > > Personally, I am in favour of a close operation not flushing pending > > > messages (and I define pending here as any message that has a > > callback). > > > The reason is that if we wait for all pending messages to be sent > > then we > > > now face a number of edge cases that could cause the close operation > > to > > > take a very long time to complete. What if the user code really just > > needs > > > to close the producer right now? If we amend the documentation to > > make it > > > clear that close does not flush pending messages then the user is now > > able > > > to explicitly craft the behaviour they need. If they want all messages > > > flushed first then chaing flushAsync->closeAsync else just > > closeAsync. > > > > > > Unfortunately I think user expectation, regardless of the current > > javadoc, > > > is that close would flush everything and in an ideal world it would. > > We > > > have the Principle of Least Surprise but we also have Safe By Default. > > > Users might be surprised that when calling closeAsync, a load of their > > > pending messages get ConnectionAlreadyClosed, but equally they might > > be > > > surprised when closeAsync doesn't complete because the pending > > messages > > > can't be cleared. Failing pending messages is the safer option. User > > code > > > must handle failure responses and cannot claim data loss with a > > > non-positive response. But if they can't close a producer, that could > > > result in a wider impact on their system, not to mention more issues > > > created in GitHub. > > > > > > Jack > > > > > > On Wed, Sep 29, 2021 at 7:05 AM Joe F <joefranc...@gmail.com> > > wrote: > > > > > > > [ External sender. Exercise caution. ] > > > > > > > > >I don't think that implementing `closeAsync` with graceful > > shutdown > > > > logic implies a guarantee of message publishing. Rather, it > > guarantees > > > > that failures will be the result of a real exception or a > > timeout. > > > > > > > > I think that's beside the point. There > > is no definition of "real" > > > > exceptions. At that point the app is publishing on a > > best effort basis, > > > > and there are no guarantees anywhere in client or server. > > > > > > > > There is no concept of "maybe published". OR > > > > "published-if-no_real_errors". What does that even > > mean? That is only a > > > > can of worms which is going to add to developer confusion and > > lead to > > > > Pulsar users finding in the worst possible way that something > > got lost > > > > because it never got published. It's a poor experience > > when you find it. > > > > I have a real life experience where a user used async APIs (in a > > lambda), > > > > which hummed along fine. One day much later, the cloud had > > a hitch, and > > > > they discovered a message was not delivered. > > > > > > > > I am more concerned about developers discovering at the worst > > possible time > > > > that ""published-if-no_real_errors" is a concept. > > > > > > > > My suggestion is to make this simple for developers. > > > > > > > > ----The sync/async nature of the close() [ or any other API, for > > that > > > > matter ] is completely orthogonal to the API semantics, > > and is just a > > > > programmatic choice to deal with how resources are managed > > within the > > > > program. That's not material here.--- > > > > > > > > A close() is an action that is shutting down the producer right > > now, not > > > > even waiting for any acks of inflight messages. A willingness to > > lose > > > > pending/inflight messages is explicit in that call. The > > producer will not > > > > be around to deal with errors or to retry failed messages once > > close() is > > > > invoked. > > > > > > > > On the contrary, if the client does not want to deal with > > message loss, > > > > then flush(), stick around to gather the acks, deal with errors > > and retries > > > > etc and then do close() . Then close() will be just a resource > > management > > > > action on the client. > > > > > > > > So update the documentation to reflect that. ---> if close() > > is called on a > > > > producer with messages pending acks, those messages are left > > indoubt. Avoid > > > > all mention of flushes, best effort etc. Users must buy > > into uncertainty, > > > > without any qualifications. > > > > > > > > I would at all costs avoid using the term "graceful" > > anywhere. That word > > > > has specific semantics associated with it in the systems/storage > > domain , > > > > and what is being proposed here is nothing like that. > > > > > > > > -j > > > > > > > > > > > > On Tue, Sep 28, 2021 at 7:05 PM Yunze Xu > > <y...@streamnative.io.invalid> > > > > wrote: > > > > > > > > > It’s a good point that > > `ProducerImpl#failPendingBatchMessages` treats > > > > > messages in batch container also as pending messages. > > > > > > > > > > I agree with your definition of "graceful close”. It’s more > > like a “at > > > > > most once” > > > > > semantics, like the original JavaDoc said > > > > > > > > > > > pending writes will not be retried > > > > > > > > > > Thanks, > > > > > Yunze > > > > > > > > > > > 2021年9月29日 上午5:24,Michael Marshall < > > mikemars...@gmail.com> 写道: > > > > > > > > > > > > Thanks for bringing this thread to the mailing list, > > Yunze. > > > > > > > > > > > > I think the right change is to update the `closeAsync` > > method to first > > > > > > flush `batchMessageContainer` and to then > > asynchronously wait for the > > > > > > `pendingMessages` queue to drain. We could add a new > > timeout or rely > > > > > > on the already implemented `sendTimeout` config to put > > an upper time > > > > > > limit on `closeAsync`. My reasoning as well as > > responses to Joe and > > > > > > Yunze follow: > > > > > > > > > > > >> we need to define the behavior for how to process > > `pendingMessages` > > > > > >> and `batchMessageContainer` when producer call > > `closeAsync`. > > > > > > > > > > > > Yes, this is exactly the clarification required, and I > > agree that the > > > > > > Javadoc is ambiguous and that the implementation > > doesn't align with > > > > > > the Javadoc. > > > > > > > > > > > > If we view the Javadoc as binding, then the > > fundamental question is > > > > > > what messages are "pending". The `pendingMessages` > > seem pretty easy to > > > > > > classify as "pending" given that they are already in > > flight on the > > > > > > network. > > > > > > > > > > > > I also consider `batchMessageContainer` to be > > "pending" because a > > > > > > client application already has callbacks for the > > messages in this > > > > > > container. These callbacks are expected to complete > > when the batch > > > > > > message delivery completes. Since the client > > application already has a > > > > > > reference to a callback, it isn't a problem that the > > producer > > > > > > implementation initiates the flush logic. (Note that > > the current > > > > > > design fails the `pendingMessages` but does not fail > > the > > > > > > `batchMessageContainer` when `closeAsync` is called, > > so the callbacks > > > > > > for that container are currently left incomplete > > forever if the client > > > > > > is closed with an unsent batch. We will need to > > address this design in > > > > > > the work that comes from this discussion.) > > > > > > > > > > > > Further, the `ProducerImpl#failPendingMessages` method > > includes logic > > > > > > to call `ProducerImpl#failPendingBatchMessages`, which > > implies that > > > > > > these batched, but not sent, messages have been > > historically > > > > > > considered "pending". > > > > > > > > > > > > If we view the Javadoc as non-binding, I think my > > guiding influence > > > > > > for the new design would be that the `closeAsync` > > method should result > > > > > > in a "graceful" shutdown of the client. > > > > > > > > > > > >> What exactly does "graceful" convey here? > > > > > > > > > > > > This is a great question, and will likely drive the > > design here. I > > > > > > view graceful to mean that the producer attempts to > > avoid artificial > > > > > > failures. That means trying to drain the queue instead > > of > > > > > > automatically failing all of the queue's callbacks. > > The tradeoff is > > > > > > that closing the producer takes longer. This reasoning > > would justify > > > > > > my claim that we should first flush the > > `batchMessageContainer` > > > > > > instead of failing the batch without any effort at > > delivery, as that > > > > > > would be artificial. > > > > > > > > > > > >> There is no guarantee that either case will ensure > > the message > > > > > >> is published. > > > > > > > > > > > > I don't think that implementing `closeAsync` with > > graceful shutdown > > > > > > logic implies a guarantee of message publishing. > > Rather, it guarantees > > > > > > that failures will be the result of a real exception > > or a timeout. > > > > > > Since calling `closeAsync` prevents additional > > messages from > > > > > > delivering, users leveraging this functionality might > > be operating > > > > > > with "at most once" delivery semantics where they'd > > prefer to deliver > > > > > > the messages if possible, but they aren't going to > > delay application > > > > > > shutdown indefinitely to deliver its last messages. If > > users need > > > > > > stronger guarantees about whether their messages are > > delivered, they > > > > > > are probably already using the flush methods to ensure > > that the > > > > > > producer's queues are empty before calling > > `closeAsync`. > > > > > > > > > > > > I also agree that in all of these cases, we're > > assuming that users are > > > > > > capturing references to the async callbacks and then > > making business > > > > > > logic decisions based on the results of those > > callbacks. > > > > > > > > > > > > Thanks, > > > > > > Michael > > > > > > > > > > > > On Tue, Sep 28, 2021 at 4:58 AM Yunze Xu > > <y...@streamnative.io.invalid > > > > > > > > > > wrote: > > > > > >> > > > > > >> I can’t agree more, just like what I’ve said in PR > > 12195: > > > > > >> > > > > > >>> At any case, when you choose `sendAsync`, you > > should always make use > > > > > of the returned future to confirm the result of all > > messages. In Kafka, > > > > > it's the send callback. > > > > > >> > > > > > >> But I found many users are confused about the > > current behavior, > > > > > especially > > > > > >> those are used to Kafka’s close semantics. They > > might expect a simple > > > > > try > > > > > >> to flush existing messages, which works at a > > simple test environment, > > > > > even > > > > > >> there's no guarantee for exception cases. > > > > > >> > > > > > >> > > > > > >> > > > > > >>> 2021年9月28日 下午4:37,Joe F > > <joefranc...@gmail.com> > > 写道: > > > > > >>> > > > > > >>> Clients should not depend on any of this > > behaviour, since the broker > > > > > is at > > > > > >>> the other end of an unreliable network > > connection. The > > > > > >>> semantic differences are kind of meaningless > > from a usability point, > > > > > since > > > > > >>> flushing on close =/= published. What > > exactly does "graceful" convey > > > > > >>> here? Flush the buffer on the > > client end and hope it makes it to > > > > the > > > > > >>> server. > > > > > >>> > > > > > >>> Is there a difference whether you > > flush(or process) pending messages > > > > > or > > > > > >>> not? There is no guarantee that either case > > will ensure the message > > > > is > > > > > >>> published. > > > > > >>> > > > > > >>> The only way to ensure that messages are > > published is to wait for the > > > > > ack. > > > > > >>> The correct model should be to wait for return > > on the blocking API, > > > > or > > > > > wait > > > > > >>> for future completion of the async API, then > > handle any publish > > > > errors > > > > > and > > > > > >>> then only close the producer. > > > > > >>> > > > > > >>> > > > > > >>> On Mon, Sep 27, 2021 at 8:50 PM Yunze Xu > > > > <y...@streamnative.io.invalid > > > > > > > > > > > >>> wrote: > > > > > >>> > > > > > >>>> Hi all, > > > > > >>>> > > > > > >>>> Recently I found a PR ( > > https://github.com/apache/pulsar/pull/12195 > > > <https://github.com/apache/pulsar/pull/12195>>; > < > > > > > >>>> > > https://github.com/apache/pulsar/pull/12195>) that > > > > > >>>> modifies the existing semantics of > > producer close. There're already > > > > > some > > > > > >>>> communications in this PR, but I think > > it's better to start a > > > > > discussion > > > > > >>>> here > > > > > >>>> to let more know. > > > > > >>>> > > > > > >>>> The existing implementation of producer > > close is: > > > > > >>>> 1. Cancel all timers, including send and > > batch container > > > > > >>>> (`batchMessageContainer`). > > > > > >>>> 2. Complete all pending messages > > (`pendingMessages`) with > > > > > >>>> `AlreadyCloseException`. > > > > > >>>> > > > > > >>>> See `ProducerImpl#closeAsync` for details. > > > > > >>>> > > > > > >>>> But the JavaDoc of `Producer#closeAsync` > > is: > > > > > >>>> > > > > > >>>>> No more writes will be accepted from > > this producer. Waits until all > > > > > >>>> pending write request are persisted. > > > > > >>>> > > > > > >>>> Anyway, the document and implementation > > are inconsistent. But > > > > > specifically, > > > > > >>>> we need to define the behavior for how to > > process `pendingMessages` > > > > > and > > > > > >>>> `batchMessageContainer` when producer call > > `closeAsync`. > > > > > >>>> > > > > > >>>> 1. batchMessageContainer: contains the > > buffered single messages > > > > > >>>> (`Message<T>`). > > > > > >>>> 2. pendingMessages: all inflight messages > > (`OpSendMsg`) in network. > > > > > >>>> > > > > > >>>> IMO, from the JavaDoc, only > > `pendingMessages` should be processed > > > > and > > > > > the > > > > > >>>> messages in `batchMessageContainer` should > > be discarded. > > > > > >>>> > > > > > >>>> Since other clients might have already > > implemented the similar > > > > > semantics of > > > > > >>>> Java clients. If we changed the semantics > > now, the behaviors among > > > > > >>>> different > > > > > >>>> clients might be inconsistent. > > > > > >>>> > > > > > >>>> Should we add a configuration to support > > graceful close to follow > > > > the > > > > > >>>> docs? Or > > > > > >>>> just change the current behavior? > > > > > >>>> > > > > > >>>> Thanks, > > > > > >>>> Yunze > > > > > >> > > > > > > > > > > > > > >