I agree to try to ensure ??at most once?? when closing??
> That would still get controlled by send timeout, after that, the send
will fail and close should proceed.
This sounds more in line with ??at most once????
------------------ ???????? ------------------
??????:
"dev"
<[email protected]>;
????????: 2021??9??29??(??????) ????3:55
??????: "Dev"<[email protected]>;
????: Re: Correct semantics of producer close
> but equally they might be
> surprised when closeAsync doesn't complete because the pending messages
> can't be cleared
That would still get controlled by send timeout, after that, the send
will fail and close should proceed.
--
Matteo Merli
<[email protected]>
On Wed, Sep 29, 2021 at 12:52 AM Jack Vanlightly
<[email protected]> wrote:
>
> I can see both sides of the argument regarding whether to flush pending
> messages or not. But I think what is definitely in the contract is not to
> discard any callbacks causing user code to block forever. No matter what,
> we must always call the callbacks.
>
> Personally, I am in favour of a close operation not flushing pending
> messages (and I define pending here as any message that has a callback).
> The reason is that if we wait for all pending messages to be sent then we
> now face a number of edge cases that could cause the close operation to
> take a very long time to complete. What if the user code really just needs
> to close the producer right now? If we amend the documentation to make it
> clear that close does not flush pending messages then the user is now able
> to explicitly craft the behaviour they need. If they want all messages
> flushed first then chaing flushAsync->closeAsync else just closeAsync.
>
> Unfortunately I think user expectation, regardless of the current javadoc,
> is that close would flush everything and in an ideal world it would. We
> have the Principle of Least Surprise but we also have Safe By Default.
> Users might be surprised that when calling closeAsync, a load of their
> pending messages get ConnectionAlreadyClosed, but equally they might be
> surprised when closeAsync doesn't complete because the pending messages
> can't be cleared. Failing pending messages is the safer option. User code
> must handle failure responses and cannot claim data loss with a
> non-positive response. But if they can't close a producer, that could
> result in a wider impact on their system, not to mention more issues
> created in GitHub.
>
> Jack
>
> On Wed, Sep 29, 2021 at 7:05 AM Joe F <[email protected]> wrote:
>
> > [ External sender. Exercise caution. ]
> >
> > >I don't think that implementing `closeAsync` with graceful
shutdown
> > logic implies a guarantee of message publishing. Rather, it guarantees
> > that failures will be the result of a real exception or a timeout.
> >
> > I think that's beside the point. There is no
definition of "real"
> > exceptions. At that point the app is publishing on a best
effort basis,
> > and there are no guarantees anywhere in client or server.
> >
> > There is no concept of "maybe published". OR
> > "published-if-no_real_errors". What does that even mean?
That is only a
> > can of worms which is going to add to developer confusion and lead to
> > Pulsar users finding in the worst possible way that something got lost
> > because it never got published. It's a poor experience when you
find it.
> > I have a real life experience where a user used async APIs (in a
lambda),
> > which hummed along fine. One day much later, the cloud had a
hitch, and
> > they discovered a message was not delivered.
> >
> > I am more concerned about developers discovering at the worst
possible time
> > that ""published-if-no_real_errors" is a concept.
> >
> > My suggestion is to make this simple for developers.
> >
> > ----The sync/async nature of the close() [ or any other API, for that
> > matter ] is completely orthogonal to the API semantics, and is
just a
> > programmatic choice to deal with how resources are managed
within the
> > program. That's not material here.---
> >
> > A close() is an action that is shutting down the producer right now,
not
> > even waiting for any acks of inflight messages. A willingness to lose
> > pending/inflight messages is explicit in that call. The
producer will not
> > be around to deal with errors or to retry failed messages once
close() is
> > invoked.
> >
> > On the contrary, if the client does not want to deal with message
loss,
> > then flush(), stick around to gather the acks, deal with errors and
retries
> > etc and then do close() . Then close() will be just a resource
management
> > action on the client.
> >
> > So update the documentation to reflect that. ---> if close() is
called on a
> > producer with messages pending acks, those messages are left indoubt.
Avoid
> > all mention of flushes, best effort etc. Users must buy
into uncertainty,
> > without any qualifications.
> >
> > I would at all costs avoid using the term "graceful" anywhere.
That word
> > has specific semantics associated with it in the systems/storage
domain ,
> > and what is being proposed here is nothing like that.
> >
> > -j
> >
> >
> > On Tue, Sep 28, 2021 at 7:05 PM Yunze Xu
<[email protected]>
> > wrote:
> >
> > > It??s a good point that `ProducerImpl#failPendingBatchMessages`
treats
> > > messages in batch container also as pending messages.
> > >
> > > I agree with your definition of "graceful close??. It??s more
like a ??at
> > > most once??
> > > semantics, like the original JavaDoc said
> > >
> > > > pending writes will not be retried
> > >
> > > Thanks,
> > > Yunze
> > >
> > > > 2021??9??29?? ????5:24??Michael Marshall
<[email protected]> ??????
> > > >
> > > > Thanks for bringing this thread to the mailing list, Yunze.
> > > >
> > > > I think the right change is to update the `closeAsync`
method to first
> > > > flush `batchMessageContainer` and to then asynchronously
wait for the
> > > > `pendingMessages` queue to drain. We could add a new
timeout or rely
> > > > on the already implemented `sendTimeout` config to put an
upper time
> > > > limit on `closeAsync`. My reasoning as well as responses to
Joe and
> > > > Yunze follow:
> > > >
> > > >> we need to define the behavior for how to process
`pendingMessages`
> > > >> and `batchMessageContainer` when producer call
`closeAsync`.
> > > >
> > > > Yes, this is exactly the clarification required, and I
agree that the
> > > > Javadoc is ambiguous and that the implementation doesn't
align with
> > > > the Javadoc.
> > > >
> > > > If we view the Javadoc as binding, then the fundamental
question is
> > > > what messages are "pending". The `pendingMessages` seem
pretty easy to
> > > > classify as "pending" given that they are already in flight
on the
> > > > network.
> > > >
> > > > I also consider `batchMessageContainer` to be "pending"
because a
> > > > client application already has callbacks for the messages
in this
> > > > container. These callbacks are expected to complete when
the batch
> > > > message delivery completes. Since the client application
already has a
> > > > reference to a callback, it isn't a problem that the
producer
> > > > implementation initiates the flush logic. (Note that the
current
> > > > design fails the `pendingMessages` but does not fail the
> > > > `batchMessageContainer` when `closeAsync` is called, so the
callbacks
> > > > for that container are currently left incomplete forever if
the client
> > > > is closed with an unsent batch. We will need to address
this design in
> > > > the work that comes from this discussion.)
> > > >
> > > > Further, the `ProducerImpl#failPendingMessages` method
includes logic
> > > > to call `ProducerImpl#failPendingBatchMessages`, which
implies that
> > > > these batched, but not sent, messages have been historically
> > > > considered "pending".
> > > >
> > > > If we view the Javadoc as non-binding, I think my guiding
influence
> > > > for the new design would be that the `closeAsync` method
should result
> > > > in a "graceful" shutdown of the client.
> > > >
> > > >> What exactly does "graceful" convey here?
> > > >
> > > > This is a great question, and will likely drive the design
here. I
> > > > view graceful to mean that the producer attempts to avoid
artificial
> > > > failures. That means trying to drain the queue instead of
> > > > automatically failing all of the queue's callbacks. The
tradeoff is
> > > > that closing the producer takes longer. This reasoning
would justify
> > > > my claim that we should first flush the
`batchMessageContainer`
> > > > instead of failing the batch without any effort at
delivery, as that
> > > > would be artificial.
> > > >
> > > >> There is no guarantee that either case will ensure the
message
> > > >> is published.
> > > >
> > > > I don't think that implementing `closeAsync` with graceful
shutdown
> > > > logic implies a guarantee of message publishing. Rather, it
guarantees
> > > > that failures will be the result of a real exception or a
timeout.
> > > > Since calling `closeAsync` prevents additional messages from
> > > > delivering, users leveraging this functionality might be
operating
> > > > with "at most once" delivery semantics where they'd prefer
to deliver
> > > > the messages if possible, but they aren't going to delay
application
> > > > shutdown indefinitely to deliver its last messages. If
users need
> > > > stronger guarantees about whether their messages are
delivered, they
> > > > are probably already using the flush methods to ensure that
the
> > > > producer's queues are empty before calling `closeAsync`.
> > > >
> > > > I also agree that in all of these cases, we're assuming
that users are
> > > > capturing references to the async callbacks and then making
business
> > > > logic decisions based on the results of those callbacks.
> > > >
> > > > Thanks,
> > > > Michael
> > > >
> > > > On Tue, Sep 28, 2021 at 4:58 AM Yunze Xu
<[email protected]
> > >
> > > wrote:
> > > >>
> > > >> I can??t agree more, just like what I??ve said in PR
12195:
> > > >>
> > > >>> At any case, when you choose `sendAsync`, you
should always make use
> > > of the returned future to confirm the result of all messages. In
Kafka,
> > > it's the send callback.
> > > >>
> > > >> But I found many users are confused about the current
behavior,
> > > especially
> > > >> those are used to Kafka??s close semantics. They might
expect a simple
> > > try
> > > >> to flush existing messages, which works at a simple
test environment,
> > > even
> > > >> there's no guarantee for exception cases.
> > > >>
> > > >>
> > > >>
> > > >>> 2021??9??28?? ????4:37??Joe F
<[email protected]> ??????
> > > >>>
> > > >>> Clients should not depend on any of this behaviour,
since the broker
> > > is at
> > > >>> the other end of an unreliable network
connection. The
> > > >>> semantic differences are kind of meaningless from a
usability point,
> > > since
> > > >>> flushing on close =/= published. What exactly
does "graceful" convey
> > > >>> here? Flush the buffer on the client
end and hope it makes it to
> > the
> > > >>> server.
> > > >>>
> > > >>> Is there a difference whether you flush(or
process) pending messages
> > > or
> > > >>> not? There is no guarantee that either case will
ensure the message
> > is
> > > >>> published.
> > > >>>
> > > >>> The only way to ensure that messages are published
is to wait for the
> > > ack.
> > > >>> The correct model should be to wait for return on
the blocking API,
> > or
> > > wait
> > > >>> for future completion of the async API, then handle
any publish
> > errors
> > > and
> > > >>> then only close the producer.
> > > >>>
> > > >>>
> > > >>> On Mon, Sep 27, 2021 at 8:50 PM Yunze Xu
> > <[email protected]
> > > >
> > > >>> wrote:
> > > >>>
> > > >>>> Hi all,
> > > >>>>
> > > >>>> Recently I found a PR
(https://github.com/apache/pulsar/pull/12195
> > <
> > > >>>>
https://github.com/apache/pulsar/pull/12195>) that
> > > >>>> modifies the existing semantics of producer
close. There're already
> > > some
> > > >>>> communications in this PR, but I think it's
better to start a
> > > discussion
> > > >>>> here
> > > >>>> to let more know.
> > > >>>>
> > > >>>> The existing implementation of producer close
is:
> > > >>>> 1. Cancel all timers, including send and batch
container
> > > >>>> (`batchMessageContainer`).
> > > >>>> 2. Complete all pending messages
(`pendingMessages`) with
> > > >>>> `AlreadyCloseException`.
> > > >>>>
> > > >>>> See `ProducerImpl#closeAsync` for details.
> > > >>>>
> > > >>>> But the JavaDoc of `Producer#closeAsync` is:
> > > >>>>
> > > >>>>> No more writes will be accepted from this
producer. Waits until all
> > > >>>> pending write request are persisted.
> > > >>>>
> > > >>>> Anyway, the document and implementation are
inconsistent. But
> > > specifically,
> > > >>>> we need to define the behavior for how to
process `pendingMessages`
> > > and
> > > >>>> `batchMessageContainer` when producer call
`closeAsync`.
> > > >>>>
> > > >>>> 1. batchMessageContainer: contains the buffered
single messages
> > > >>>> (`Message<T>`).
> > > >>>> 2. pendingMessages: all inflight messages
(`OpSendMsg`) in network.
> > > >>>>
> > > >>>> IMO, from the JavaDoc, only `pendingMessages`
should be processed
> > and
> > > the
> > > >>>> messages in `batchMessageContainer` should be
discarded.
> > > >>>>
> > > >>>> Since other clients might have already
implemented the similar
> > > semantics of
> > > >>>> Java clients. If we changed the semantics now,
the behaviors among
> > > >>>> different
> > > >>>> clients might be inconsistent.
> > > >>>>
> > > >>>> Should we add a configuration to support
graceful close to follow
> > the
> > > >>>> docs? Or
> > > >>>> just change the current behavior?
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Yunze
> > > >>
> > >
> > >
> >