I agree to try to ensure ??at most once?? when closing??
> That would still get controlled by send timeout, after that, the send will fail and close should proceed. This sounds more in line with ??at most once???? ------------------ ???????? ------------------ ??????: "dev" <matteo.me...@gmail.com>; ????????: 2021??9??29??(??????) ????3:55 ??????: "Dev"<dev@pulsar.apache.org>; ????: Re: Correct semantics of producer close > but equally they might be > surprised when closeAsync doesn't complete because the pending messages > can't be cleared That would still get controlled by send timeout, after that, the send will fail and close should proceed. -- Matteo Merli <matteo.me...@gmail.com> On Wed, Sep 29, 2021 at 12:52 AM Jack Vanlightly <jvanligh...@splunk.com.invalid> wrote: > > I can see both sides of the argument regarding whether to flush pending > messages or not. But I think what is definitely in the contract is not to > discard any callbacks causing user code to block forever. No matter what, > we must always call the callbacks. > > Personally, I am in favour of a close operation not flushing pending > messages (and I define pending here as any message that has a callback). > The reason is that if we wait for all pending messages to be sent then we > now face a number of edge cases that could cause the close operation to > take a very long time to complete. What if the user code really just needs > to close the producer right now? If we amend the documentation to make it > clear that close does not flush pending messages then the user is now able > to explicitly craft the behaviour they need. If they want all messages > flushed first then chaing flushAsync->closeAsync else just closeAsync. > > Unfortunately I think user expectation, regardless of the current javadoc, > is that close would flush everything and in an ideal world it would. We > have the Principle of Least Surprise but we also have Safe By Default. > Users might be surprised that when calling closeAsync, a load of their > pending messages get ConnectionAlreadyClosed, but equally they might be > surprised when closeAsync doesn't complete because the pending messages > can't be cleared. Failing pending messages is the safer option. User code > must handle failure responses and cannot claim data loss with a > non-positive response. But if they can't close a producer, that could > result in a wider impact on their system, not to mention more issues > created in GitHub. > > Jack > > On Wed, Sep 29, 2021 at 7:05 AM Joe F <joefranc...@gmail.com> wrote: > > > [ External sender. Exercise caution. ] > > > > >I don't think that implementing `closeAsync` with graceful shutdown > > logic implies a guarantee of message publishing. Rather, it guarantees > > that failures will be the result of a real exception or a timeout. > > > > I think that's beside the point. There is no definition of "real" > > exceptions. At that point the app is publishing on a best effort basis, > > and there are no guarantees anywhere in client or server. > > > > There is no concept of "maybe published". OR > > "published-if-no_real_errors". What does that even mean? That is only a > > can of worms which is going to add to developer confusion and lead to > > Pulsar users finding in the worst possible way that something got lost > > because it never got published. It's a poor experience when you find it. > > I have a real life experience where a user used async APIs (in a lambda), > > which hummed along fine. One day much later, the cloud had a hitch, and > > they discovered a message was not delivered. > > > > I am more concerned about developers discovering at the worst possible time > > that ""published-if-no_real_errors" is a concept. > > > > My suggestion is to make this simple for developers. > > > > ----The sync/async nature of the close() [ or any other API, for that > > matter ] is completely orthogonal to the API semantics, and is just a > > programmatic choice to deal with how resources are managed within the > > program. That's not material here.--- > > > > A close() is an action that is shutting down the producer right now, not > > even waiting for any acks of inflight messages. A willingness to lose > > pending/inflight messages is explicit in that call. The producer will not > > be around to deal with errors or to retry failed messages once close() is > > invoked. > > > > On the contrary, if the client does not want to deal with message loss, > > then flush(), stick around to gather the acks, deal with errors and retries > > etc and then do close() . Then close() will be just a resource management > > action on the client. > > > > So update the documentation to reflect that. ---> if close() is called on a > > producer with messages pending acks, those messages are left indoubt. Avoid > > all mention of flushes, best effort etc. Users must buy into uncertainty, > > without any qualifications. > > > > I would at all costs avoid using the term "graceful" anywhere. That word > > has specific semantics associated with it in the systems/storage domain , > > and what is being proposed here is nothing like that. > > > > -j > > > > > > On Tue, Sep 28, 2021 at 7:05 PM Yunze Xu <y...@streamnative.io.invalid> > > wrote: > > > > > It??s a good point that `ProducerImpl#failPendingBatchMessages` treats > > > messages in batch container also as pending messages. > > > > > > I agree with your definition of "graceful close??. It??s more like a ??at > > > most once?? > > > semantics, like the original JavaDoc said > > > > > > > pending writes will not be retried > > > > > > Thanks, > > > Yunze > > > > > > > 2021??9??29?? ????5:24??Michael Marshall <mikemars...@gmail.com> ?????? > > > > > > > > Thanks for bringing this thread to the mailing list, Yunze. > > > > > > > > I think the right change is to update the `closeAsync` method to first > > > > flush `batchMessageContainer` and to then asynchronously wait for the > > > > `pendingMessages` queue to drain. We could add a new timeout or rely > > > > on the already implemented `sendTimeout` config to put an upper time > > > > limit on `closeAsync`. My reasoning as well as responses to Joe and > > > > Yunze follow: > > > > > > > >> we need to define the behavior for how to process `pendingMessages` > > > >> and `batchMessageContainer` when producer call `closeAsync`. > > > > > > > > Yes, this is exactly the clarification required, and I agree that the > > > > Javadoc is ambiguous and that the implementation doesn't align with > > > > the Javadoc. > > > > > > > > If we view the Javadoc as binding, then the fundamental question is > > > > what messages are "pending". The `pendingMessages` seem pretty easy to > > > > classify as "pending" given that they are already in flight on the > > > > network. > > > > > > > > I also consider `batchMessageContainer` to be "pending" because a > > > > client application already has callbacks for the messages in this > > > > container. These callbacks are expected to complete when the batch > > > > message delivery completes. Since the client application already has a > > > > reference to a callback, it isn't a problem that the producer > > > > implementation initiates the flush logic. (Note that the current > > > > design fails the `pendingMessages` but does not fail the > > > > `batchMessageContainer` when `closeAsync` is called, so the callbacks > > > > for that container are currently left incomplete forever if the client > > > > is closed with an unsent batch. We will need to address this design in > > > > the work that comes from this discussion.) > > > > > > > > Further, the `ProducerImpl#failPendingMessages` method includes logic > > > > to call `ProducerImpl#failPendingBatchMessages`, which implies that > > > > these batched, but not sent, messages have been historically > > > > considered "pending". > > > > > > > > If we view the Javadoc as non-binding, I think my guiding influence > > > > for the new design would be that the `closeAsync` method should result > > > > in a "graceful" shutdown of the client. > > > > > > > >> What exactly does "graceful" convey here? > > > > > > > > This is a great question, and will likely drive the design here. I > > > > view graceful to mean that the producer attempts to avoid artificial > > > > failures. That means trying to drain the queue instead of > > > > automatically failing all of the queue's callbacks. The tradeoff is > > > > that closing the producer takes longer. This reasoning would justify > > > > my claim that we should first flush the `batchMessageContainer` > > > > instead of failing the batch without any effort at delivery, as that > > > > would be artificial. > > > > > > > >> There is no guarantee that either case will ensure the message > > > >> is published. > > > > > > > > I don't think that implementing `closeAsync` with graceful shutdown > > > > logic implies a guarantee of message publishing. Rather, it guarantees > > > > that failures will be the result of a real exception or a timeout. > > > > Since calling `closeAsync` prevents additional messages from > > > > delivering, users leveraging this functionality might be operating > > > > with "at most once" delivery semantics where they'd prefer to deliver > > > > the messages if possible, but they aren't going to delay application > > > > shutdown indefinitely to deliver its last messages. If users need > > > > stronger guarantees about whether their messages are delivered, they > > > > are probably already using the flush methods to ensure that the > > > > producer's queues are empty before calling `closeAsync`. > > > > > > > > I also agree that in all of these cases, we're assuming that users are > > > > capturing references to the async callbacks and then making business > > > > logic decisions based on the results of those callbacks. > > > > > > > > Thanks, > > > > Michael > > > > > > > > On Tue, Sep 28, 2021 at 4:58 AM Yunze Xu <y...@streamnative.io.invalid > > > > > > wrote: > > > >> > > > >> I can??t agree more, just like what I??ve said in PR 12195: > > > >> > > > >>> At any case, when you choose `sendAsync`, you should always make use > > > of the returned future to confirm the result of all messages. In Kafka, > > > it's the send callback. > > > >> > > > >> But I found many users are confused about the current behavior, > > > especially > > > >> those are used to Kafka??s close semantics. They might expect a simple > > > try > > > >> to flush existing messages, which works at a simple test environment, > > > even > > > >> there's no guarantee for exception cases. > > > >> > > > >> > > > >> > > > >>> 2021??9??28?? ????4:37??Joe F <joefranc...@gmail.com> ?????? > > > >>> > > > >>> Clients should not depend on any of this behaviour, since the broker > > > is at > > > >>> the other end of an unreliable network connection. The > > > >>> semantic differences are kind of meaningless from a usability point, > > > since > > > >>> flushing on close =/= published. What exactly does "graceful" convey > > > >>> here? Flush the buffer on the client end and hope it makes it to > > the > > > >>> server. > > > >>> > > > >>> Is there a difference whether you flush(or process) pending messages > > > or > > > >>> not? There is no guarantee that either case will ensure the message > > is > > > >>> published. > > > >>> > > > >>> The only way to ensure that messages are published is to wait for the > > > ack. > > > >>> The correct model should be to wait for return on the blocking API, > > or > > > wait > > > >>> for future completion of the async API, then handle any publish > > errors > > > and > > > >>> then only close the producer. > > > >>> > > > >>> > > > >>> On Mon, Sep 27, 2021 at 8:50 PM Yunze Xu > > <y...@streamnative.io.invalid > > > > > > > >>> wrote: > > > >>> > > > >>>> Hi all, > > > >>>> > > > >>>> Recently I found a PR (https://github.com/apache/pulsar/pull/12195 > > < > > > >>>> https://github.com/apache/pulsar/pull/12195>) that > > > >>>> modifies the existing semantics of producer close. There're already > > > some > > > >>>> communications in this PR, but I think it's better to start a > > > discussion > > > >>>> here > > > >>>> to let more know. > > > >>>> > > > >>>> The existing implementation of producer close is: > > > >>>> 1. Cancel all timers, including send and batch container > > > >>>> (`batchMessageContainer`). > > > >>>> 2. Complete all pending messages (`pendingMessages`) with > > > >>>> `AlreadyCloseException`. > > > >>>> > > > >>>> See `ProducerImpl#closeAsync` for details. > > > >>>> > > > >>>> But the JavaDoc of `Producer#closeAsync` is: > > > >>>> > > > >>>>> No more writes will be accepted from this producer. Waits until all > > > >>>> pending write request are persisted. > > > >>>> > > > >>>> Anyway, the document and implementation are inconsistent. But > > > specifically, > > > >>>> we need to define the behavior for how to process `pendingMessages` > > > and > > > >>>> `batchMessageContainer` when producer call `closeAsync`. > > > >>>> > > > >>>> 1. batchMessageContainer: contains the buffered single messages > > > >>>> (`Message<T>`). > > > >>>> 2. pendingMessages: all inflight messages (`OpSendMsg`) in network. > > > >>>> > > > >>>> IMO, from the JavaDoc, only `pendingMessages` should be processed > > and > > > the > > > >>>> messages in `batchMessageContainer` should be discarded. > > > >>>> > > > >>>> Since other clients might have already implemented the similar > > > semantics of > > > >>>> Java clients. If we changed the semantics now, the behaviors among > > > >>>> different > > > >>>> clients might be inconsistent. > > > >>>> > > > >>>> Should we add a configuration to support graceful close to follow > > the > > > >>>> docs? Or > > > >>>> just change the current behavior? > > > >>>> > > > >>>> Thanks, > > > >>>> Yunze > > > >> > > > > > > > >