I agree that we must ensure that every pending callback must be completed
eventually (timeout or another error is not a problem),
because we cannot let the client application hang forever.
I believe that the application can perform a flush() explicitly and also
wait for every callback to be executed if that is the requirement.

Usually you call close() when:
1. you have a serious problem: you already know that there is a hard error,
and you want to close the Producer or the Application and possibly start a
new one to recover
2. you are shutting down your application or component: you have control
over the callbacks, so you can wait for them to complete

So case 2. can be covered by the application. We have to support case 1:
fail fast and close (no need for flush()) .

In my experience trying to implement "graceful stops" adds only complexity
and false hopes to the users.

Enrico



Il giorno mer 29 set 2021 alle ore 13:58 Nirvana <1572139...@qq.com.invalid>
ha scritto:

> I agree to try to ensure ”at most once“ when closing。
>
>
> &gt; That would still get controlled by send timeout, after that, the send
> will fail and close should proceed.
> This sounds more in line with “at most once”。
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "dev"
>                                                                 <
> matteo.me...@gmail.com&gt;;
> 发送时间:&nbsp;2021年9月29日(星期三) 下午3:55
> 收件人:&nbsp;"Dev"<dev@pulsar.apache.org&gt;;
>
> 主题:&nbsp;Re: Correct semantics of producer close
>
>
>
> &gt; but equally they might be
> &gt; surprised when closeAsync doesn't complete because the pending
> messages
> &gt; can't be cleared
>
> That would still get controlled by send timeout, after that, the send
> will fail and close should proceed.
>
> --
> Matteo Merli
> <matteo.me...@gmail.com&gt;
>
> On Wed, Sep 29, 2021 at 12:52 AM Jack Vanlightly
> <jvanligh...@splunk.com.invalid&gt; wrote:
> &gt;
> &gt; I can see both sides of the argument regarding whether to flush
> pending
> &gt; messages or not. But I think what is definitely in the contract is
> not to
> &gt; discard any callbacks causing user code to block forever. No matter
> what,
> &gt; we must always call the callbacks.
> &gt;
> &gt; Personally, I am in favour of a close operation not flushing pending
> &gt; messages (and I define pending here as any message that has a
> callback).
> &gt; The reason is that if we wait for all pending messages to be sent
> then we
> &gt; now face a number of edge cases that could cause the close operation
> to
> &gt; take a very long time to complete. What if the user code really just
> needs
> &gt; to close the producer right now? If we amend the documentation to
> make it
> &gt; clear that close does not flush pending messages then the user is now
> able
> &gt; to explicitly craft the behaviour they need. If they want all messages
> &gt; flushed first then chaing flushAsync-&gt;closeAsync else just
> closeAsync.
> &gt;
> &gt; Unfortunately I think user expectation, regardless of the current
> javadoc,
> &gt; is that close would flush everything and in an ideal world it would.
> We
> &gt; have the Principle of Least Surprise but we also have Safe By Default.
> &gt; Users might be surprised that when calling closeAsync, a load of their
> &gt; pending messages get ConnectionAlreadyClosed, but equally they might
> be
> &gt; surprised when closeAsync doesn't complete because the pending
> messages
> &gt; can't be cleared. Failing pending messages is the safer option. User
> code
> &gt; must handle failure responses and cannot claim data loss with a
> &gt; non-positive response. But if they can't close a producer, that could
> &gt; result in a wider impact on their system, not to mention more issues
> &gt; created in GitHub.
> &gt;
> &gt; Jack
> &gt;
> &gt; On Wed, Sep 29, 2021 at 7:05 AM Joe F <joefranc...@gmail.com&gt;
> wrote:
> &gt;
> &gt; &gt; [ External sender. Exercise caution. ]
> &gt; &gt;
> &gt; &gt; &gt;I don't think that implementing `closeAsync` with graceful
> shutdown
> &gt; &gt; logic implies a guarantee of message publishing. Rather, it
> guarantees
> &gt; &gt; that failures will be the result of a real exception or a
> timeout.
> &gt; &gt;
> &gt; &gt; I think that's beside the point.&nbsp;&nbsp;&nbsp;&nbsp; There
> is no definition of "real"
> &gt; &gt; exceptions.&nbsp;&nbsp; At that point the app is publishing on a
> best effort basis,
> &gt; &gt; and there are no guarantees anywhere in client or server.
> &gt; &gt;
> &gt; &gt; There is no concept&nbsp; of&nbsp; "maybe published". OR
> &gt; &gt; "published-if-no_real_errors".&nbsp; What does that even
> mean?&nbsp; That is only a
> &gt; &gt; can of worms which is going to add to developer confusion and
> lead to
> &gt; &gt; Pulsar users finding in the worst possible way that something
> got lost
> &gt; &gt; because it never got published.&nbsp; It's a poor experience
> when you find it.
> &gt; &gt; I have a real life experience where a user used async APIs (in a
> lambda),
> &gt; &gt; which hummed along fine.&nbsp; One day much later, the cloud had
> a hitch, and
> &gt; &gt; they discovered a message was&nbsp; not delivered.
> &gt; &gt;
> &gt; &gt; I am more concerned about developers discovering at the worst
> possible time
> &gt; &gt; that&nbsp; ""published-if-no_real_errors"&nbsp; is a concept.
> &gt; &gt;
> &gt; &gt; My suggestion is to make this simple for developers.
> &gt; &gt;
> &gt; &gt; ----The sync/async nature of the close() [ or any other API, for
> that
> &gt; &gt; matter ]&nbsp; is completely orthogonal to the API semantics,
> and is just a
> &gt; &gt; programmatic choice to deal with&nbsp; how resources are managed
> within the
> &gt; &gt; program. That's not material here.---
> &gt; &gt;
> &gt; &gt; A close() is an action that is shutting down the producer right
> now, not
> &gt; &gt; even waiting for any acks of inflight messages. A willingness to
> lose
> &gt; &gt; pending/inflight messages is explicit in that call.&nbsp; The
> producer will&nbsp; not
> &gt; &gt; be around to deal with errors or to retry failed messages once
> close() is
> &gt; &gt; invoked.
> &gt; &gt;
> &gt; &gt; On the contrary, if the client does not want to deal with
> message loss,
> &gt; &gt; then flush(), stick around to gather the acks, deal with errors
> and retries
> &gt; &gt; etc and then do close() . Then close() will be just a resource
> management
> &gt; &gt; action on the client.
> &gt; &gt;
> &gt; &gt; So update the documentation to reflect that. ---&gt; if close()
> is called on a
> &gt; &gt; producer with messages pending acks, those messages are left
> indoubt. Avoid
> &gt; &gt; all mention of flushes, best effort etc.&nbsp; Users must buy
> into&nbsp; uncertainty,
> &gt; &gt; without any qualifications.
> &gt; &gt;
> &gt; &gt; I would at all costs avoid using the term "graceful"
> anywhere.&nbsp; That word
> &gt; &gt; has specific semantics associated with it in the systems/storage
> domain ,
> &gt; &gt; and what is being proposed here is nothing like that.
> &gt; &gt;
> &gt; &gt; -j
> &gt; &gt;
> &gt; &gt;
> &gt; &gt; On Tue, Sep 28, 2021 at 7:05 PM Yunze Xu
> <y...@streamnative.io.invalid&gt;
> &gt; &gt; wrote:
> &gt; &gt;
> &gt; &gt; &gt; It’s a good point that
> `ProducerImpl#failPendingBatchMessages` treats
> &gt; &gt; &gt; messages in batch container also as pending messages.
> &gt; &gt; &gt;
> &gt; &gt; &gt; I agree with your definition of "graceful close”. It’s more
> like a “at
> &gt; &gt; &gt; most once”
> &gt; &gt; &gt; semantics, like the original JavaDoc said
> &gt; &gt; &gt;
> &gt; &gt; &gt; &gt; pending writes will not be retried
> &gt; &gt; &gt;
> &gt; &gt; &gt; Thanks,
> &gt; &gt; &gt; Yunze
> &gt; &gt; &gt;
> &gt; &gt; &gt; &gt; 2021年9月29日 上午5:24,Michael Marshall <
> mikemars...@gmail.com&gt; 写道:
> &gt; &gt; &gt; &gt;
> &gt; &gt; &gt; &gt; Thanks for bringing this thread to the mailing list,
> Yunze.
> &gt; &gt; &gt; &gt;
> &gt; &gt; &gt; &gt; I think the right change is to update the `closeAsync`
> method to first
> &gt; &gt; &gt; &gt; flush `batchMessageContainer` and to then
> asynchronously wait for the
> &gt; &gt; &gt; &gt; `pendingMessages` queue to drain. We could add a new
> timeout or rely
> &gt; &gt; &gt; &gt; on the already implemented `sendTimeout` config to put
> an upper time
> &gt; &gt; &gt; &gt; limit on `closeAsync`. My reasoning as well as
> responses to Joe and
> &gt; &gt; &gt; &gt; Yunze follow:
> &gt; &gt; &gt; &gt;
> &gt; &gt; &gt; &gt;&gt; we need to define the behavior for how to process
> `pendingMessages`
> &gt; &gt; &gt; &gt;&gt; and `batchMessageContainer` when producer call
> `closeAsync`.
> &gt; &gt; &gt; &gt;
> &gt; &gt; &gt; &gt; Yes, this is exactly the clarification required, and I
> agree that the
> &gt; &gt; &gt; &gt; Javadoc is ambiguous and that the implementation
> doesn't align with
> &gt; &gt; &gt; &gt; the Javadoc.
> &gt; &gt; &gt; &gt;
> &gt; &gt; &gt; &gt; If we view the Javadoc as binding, then the
> fundamental question is
> &gt; &gt; &gt; &gt; what messages are "pending". The `pendingMessages`
> seem pretty easy to
> &gt; &gt; &gt; &gt; classify as "pending" given that they are already in
> flight on the
> &gt; &gt; &gt; &gt; network.
> &gt; &gt; &gt; &gt;
> &gt; &gt; &gt; &gt; I also consider `batchMessageContainer` to be
> "pending" because a
> &gt; &gt; &gt; &gt; client application already has callbacks for the
> messages in this
> &gt; &gt; &gt; &gt; container. These callbacks are expected to complete
> when the batch
> &gt; &gt; &gt; &gt; message delivery completes. Since the client
> application already has a
> &gt; &gt; &gt; &gt; reference to a callback, it isn't a problem that the
> producer
> &gt; &gt; &gt; &gt; implementation initiates the flush logic. (Note that
> the current
> &gt; &gt; &gt; &gt; design fails the `pendingMessages` but does not fail
> the
> &gt; &gt; &gt; &gt; `batchMessageContainer` when `closeAsync` is called,
> so the callbacks
> &gt; &gt; &gt; &gt; for that container are currently left incomplete
> forever if the client
> &gt; &gt; &gt; &gt; is closed with an unsent batch. We will need to
> address this design in
> &gt; &gt; &gt; &gt; the work that comes from this discussion.)
> &gt; &gt; &gt; &gt;
> &gt; &gt; &gt; &gt; Further, the `ProducerImpl#failPendingMessages` method
> includes logic
> &gt; &gt; &gt; &gt; to call `ProducerImpl#failPendingBatchMessages`, which
> implies that
> &gt; &gt; &gt; &gt; these batched, but not sent, messages have been
> historically
> &gt; &gt; &gt; &gt; considered "pending".
> &gt; &gt; &gt; &gt;
> &gt; &gt; &gt; &gt; If we view the Javadoc as non-binding, I think my
> guiding influence
> &gt; &gt; &gt; &gt; for the new design would be that the `closeAsync`
> method should result
> &gt; &gt; &gt; &gt; in a "graceful" shutdown of the client.
> &gt; &gt; &gt; &gt;
> &gt; &gt; &gt; &gt;&gt; What exactly does "graceful" convey here?
> &gt; &gt; &gt; &gt;
> &gt; &gt; &gt; &gt; This is a great question, and will likely drive the
> design here. I
> &gt; &gt; &gt; &gt; view graceful to mean that the producer attempts to
> avoid artificial
> &gt; &gt; &gt; &gt; failures. That means trying to drain the queue instead
> of
> &gt; &gt; &gt; &gt; automatically failing all of the queue's callbacks.
> The tradeoff is
> &gt; &gt; &gt; &gt; that closing the producer takes longer. This reasoning
> would justify
> &gt; &gt; &gt; &gt; my claim that we should first flush the
> `batchMessageContainer`
> &gt; &gt; &gt; &gt; instead of failing the batch without any effort at
> delivery, as that
> &gt; &gt; &gt; &gt; would be artificial.
> &gt; &gt; &gt; &gt;
> &gt; &gt; &gt; &gt;&gt; There is no guarantee that either case will ensure
> the message
> &gt; &gt; &gt; &gt;&gt; is published.
> &gt; &gt; &gt; &gt;
> &gt; &gt; &gt; &gt; I don't think that implementing `closeAsync` with
> graceful shutdown
> &gt; &gt; &gt; &gt; logic implies a guarantee of message publishing.
> Rather, it guarantees
> &gt; &gt; &gt; &gt; that failures will be the result of a real exception
> or a timeout.
> &gt; &gt; &gt; &gt; Since calling `closeAsync` prevents additional
> messages from
> &gt; &gt; &gt; &gt; delivering, users leveraging this functionality might
> be operating
> &gt; &gt; &gt; &gt; with "at most once" delivery semantics where they'd
> prefer to deliver
> &gt; &gt; &gt; &gt; the messages if possible, but they aren't going to
> delay application
> &gt; &gt; &gt; &gt; shutdown indefinitely to deliver its last messages. If
> users need
> &gt; &gt; &gt; &gt; stronger guarantees about whether their messages are
> delivered, they
> &gt; &gt; &gt; &gt; are probably already using the flush methods to ensure
> that the
> &gt; &gt; &gt; &gt; producer's queues are empty before calling
> `closeAsync`.
> &gt; &gt; &gt; &gt;
> &gt; &gt; &gt; &gt; I also agree that in all of these cases, we're
> assuming that users are
> &gt; &gt; &gt; &gt; capturing references to the async callbacks and then
> making business
> &gt; &gt; &gt; &gt; logic decisions based on the results of those
> callbacks.
> &gt; &gt; &gt; &gt;
> &gt; &gt; &gt; &gt; Thanks,
> &gt; &gt; &gt; &gt; Michael
> &gt; &gt; &gt; &gt;
> &gt; &gt; &gt; &gt; On Tue, Sep 28, 2021 at 4:58 AM Yunze Xu
> <y...@streamnative.io.invalid
> &gt; &gt; &gt;
> &gt; &gt; &gt; wrote:
> &gt; &gt; &gt; &gt;&gt;
> &gt; &gt; &gt; &gt;&gt; I can’t agree more, just like what I’ve said in PR
> 12195:
> &gt; &gt; &gt; &gt;&gt;
> &gt; &gt; &gt; &gt;&gt;&gt; At any case, when you choose `sendAsync`, you
> should always make use
> &gt; &gt; &gt; of the returned future to confirm the result of all
> messages. In Kafka,
> &gt; &gt; &gt; it's the send callback.
> &gt; &gt; &gt; &gt;&gt;
> &gt; &gt; &gt; &gt;&gt; But I found many users are confused about the
> current behavior,
> &gt; &gt; &gt; especially
> &gt; &gt; &gt; &gt;&gt; those are used to Kafka’s close semantics. They
> might expect a simple
> &gt; &gt; &gt; try
> &gt; &gt; &gt; &gt;&gt; to flush existing messages, which works at a
> simple test environment,
> &gt; &gt; &gt; even
> &gt; &gt; &gt; &gt;&gt; there's no guarantee for exception cases.
> &gt; &gt; &gt; &gt;&gt;
> &gt; &gt; &gt; &gt;&gt;
> &gt; &gt; &gt; &gt;&gt;
> &gt; &gt; &gt; &gt;&gt;&gt; 2021年9月28日 下午4:37,Joe F <joefranc...@gmail.com&gt;
> 写道:
> &gt; &gt; &gt; &gt;&gt;&gt;
> &gt; &gt; &gt; &gt;&gt;&gt; Clients should not depend on any of this
> behaviour, since the broker
> &gt; &gt; &gt; is at
> &gt; &gt; &gt; &gt;&gt;&gt; the other end of an unreliable&nbsp; network
> connection. The
> &gt; &gt; &gt; &gt;&gt;&gt; semantic differences are kind of meaningless
> from a usability point,
> &gt; &gt; &gt; since
> &gt; &gt; &gt; &gt;&gt;&gt; flushing on close =/= published.&nbsp; What
> exactly does "graceful" convey
> &gt; &gt; &gt; &gt;&gt;&gt; here?&nbsp; Flush the&nbsp; buffer on the
> client end and hope it makes it to
> &gt; &gt; the
> &gt; &gt; &gt; &gt;&gt;&gt; server.
> &gt; &gt; &gt; &gt;&gt;&gt;
> &gt; &gt; &gt; &gt;&gt;&gt; Is there a&nbsp; difference whether you
> flush(or process) pending messages
> &gt; &gt; &gt; or
> &gt; &gt; &gt; &gt;&gt;&gt; not? There is no guarantee that either case
> will ensure the message
> &gt; &gt; is
> &gt; &gt; &gt; &gt;&gt;&gt; published.
> &gt; &gt; &gt; &gt;&gt;&gt;
> &gt; &gt; &gt; &gt;&gt;&gt; The only way to ensure that messages are
> published is to wait for the
> &gt; &gt; &gt; ack.
> &gt; &gt; &gt; &gt;&gt;&gt; The correct model should be to wait for return
> on the blocking API,
> &gt; &gt; or
> &gt; &gt; &gt; wait
> &gt; &gt; &gt; &gt;&gt;&gt; for future completion of the async API, then
> handle any publish
> &gt; &gt; errors
> &gt; &gt; &gt; and
> &gt; &gt; &gt; &gt;&gt;&gt; then only close the producer.
> &gt; &gt; &gt; &gt;&gt;&gt;
> &gt; &gt; &gt; &gt;&gt;&gt;
> &gt; &gt; &gt; &gt;&gt;&gt; On Mon, Sep 27, 2021 at 8:50 PM Yunze Xu
> &gt; &gt; <y...@streamnative.io.invalid
> &gt; &gt; &gt; &gt;
> &gt; &gt; &gt; &gt;&gt;&gt; wrote:
> &gt; &gt; &gt; &gt;&gt;&gt;
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Hi all,
> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Recently I found a PR (
> https://github.com/apache/pulsar/pull/12195
> &gt <https://github.com/apache/pulsar/pull/12195&gt>; &gt; <
> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> https://github.com/apache/pulsar/pull/12195&gt;) that
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; modifies the existing semantics of
> producer close. There're already
> &gt; &gt; &gt; some
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; communications in this PR, but I think
> it's better to start a
> &gt; &gt; &gt; discussion
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; here
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; to let more know.
> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; The existing implementation of producer
> close is:
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; 1. Cancel all timers, including send and
> batch container
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; (`batchMessageContainer`).
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; 2. Complete all pending messages
> (`pendingMessages`) with
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; `AlreadyCloseException`.
> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; See `ProducerImpl#closeAsync` for details.
> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; But the JavaDoc of `Producer#closeAsync`
> is:
> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> &gt; &gt; &gt; &gt;&gt;&gt;&gt;&gt; No more writes will be accepted from
> this producer. Waits until all
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; pending write request are persisted.
> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Anyway, the document and implementation
> are inconsistent. But
> &gt; &gt; &gt; specifically,
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; we need to define the behavior for how to
> process `pendingMessages`
> &gt; &gt; &gt; and
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; `batchMessageContainer` when producer call
> `closeAsync`.
> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; 1. batchMessageContainer: contains the
> buffered single messages
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; (`Message<T&gt;`).
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; 2. pendingMessages: all inflight messages
> (`OpSendMsg`) in network.
> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; IMO, from the JavaDoc, only
> `pendingMessages` should be processed
> &gt; &gt; and
> &gt; &gt; &gt; the
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; messages in `batchMessageContainer` should
> be discarded.
> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Since other clients might have already
> implemented the similar
> &gt; &gt; &gt; semantics of
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Java clients. If we changed the semantics
> now, the behaviors among
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; different
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; clients might be inconsistent.
> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Should we add a configuration to support
> graceful close to follow
> &gt; &gt; the
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; docs? Or
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; just change the current behavior?
> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Thanks,
> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Yunze
> &gt; &gt; &gt; &gt;&gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt;

Reply via email to