I agree to try to ensure ??at most once?? when closing??

> That would still get controlled by send timeout, after that, the send
will fail and close should proceed.
This sounds more in line with ??at most once????


------------------ ???????? ------------------
??????:                                                                         
                                               "dev"                            
                                                        
<matteo.me...@gmail.com&gt;;
????????:&nbsp;2021??9??29??(??????) ????3:55
??????:&nbsp;"Dev"<dev@pulsar.apache.org&gt;;

????:&nbsp;Re: Correct semantics of producer close



&gt; but equally they might be
&gt; surprised when closeAsync doesn't complete because the pending messages
&gt; can't be cleared

That would still get controlled by send timeout, after that, the send
will fail and close should proceed.

--
Matteo Merli
<matteo.me...@gmail.com&gt;

On Wed, Sep 29, 2021 at 12:52 AM Jack Vanlightly
<jvanligh...@splunk.com.invalid&gt; wrote:
&gt;
&gt; I can see both sides of the argument regarding whether to flush pending
&gt; messages or not. But I think what is definitely in the contract is not to
&gt; discard any callbacks causing user code to block forever. No matter what,
&gt; we must always call the callbacks.
&gt;
&gt; Personally, I am in favour of a close operation not flushing pending
&gt; messages (and I define pending here as any message that has a callback).
&gt; The reason is that if we wait for all pending messages to be sent then we
&gt; now face a number of edge cases that could cause the close operation to
&gt; take a very long time to complete. What if the user code really just needs
&gt; to close the producer right now? If we amend the documentation to make it
&gt; clear that close does not flush pending messages then the user is now able
&gt; to explicitly craft the behaviour they need. If they want all messages
&gt; flushed first then chaing flushAsync-&gt;closeAsync else just closeAsync.
&gt;
&gt; Unfortunately I think user expectation, regardless of the current javadoc,
&gt; is that close would flush everything and in an ideal world it would. We
&gt; have the Principle of Least Surprise but we also have Safe By Default.
&gt; Users might be surprised that when calling closeAsync, a load of their
&gt; pending messages get ConnectionAlreadyClosed, but equally they might be
&gt; surprised when closeAsync doesn't complete because the pending messages
&gt; can't be cleared. Failing pending messages is the safer option. User code
&gt; must handle failure responses and cannot claim data loss with a
&gt; non-positive response. But if they can't close a producer, that could
&gt; result in a wider impact on their system, not to mention more issues
&gt; created in GitHub.
&gt;
&gt; Jack
&gt;
&gt; On Wed, Sep 29, 2021 at 7:05 AM Joe F <joefranc...@gmail.com&gt; wrote:
&gt;
&gt; &gt; [ External sender. Exercise caution. ]
&gt; &gt;
&gt; &gt; &gt;I don't think that implementing `closeAsync` with graceful 
shutdown
&gt; &gt; logic implies a guarantee of message publishing. Rather, it guarantees
&gt; &gt; that failures will be the result of a real exception or a timeout.
&gt; &gt;
&gt; &gt; I think that's beside the point.&nbsp;&nbsp;&nbsp;&nbsp; There is no 
definition of "real"
&gt; &gt; exceptions.&nbsp;&nbsp; At that point the app is publishing on a best 
effort basis,
&gt; &gt; and there are no guarantees anywhere in client or server.
&gt; &gt;
&gt; &gt; There is no concept&nbsp; of&nbsp; "maybe published". OR
&gt; &gt; "published-if-no_real_errors".&nbsp; What does that even mean?&nbsp; 
That is only a
&gt; &gt; can of worms which is going to add to developer confusion and lead to
&gt; &gt; Pulsar users finding in the worst possible way that something got lost
&gt; &gt; because it never got published.&nbsp; It's a poor experience when you 
find it.
&gt; &gt; I have a real life experience where a user used async APIs (in a 
lambda),
&gt; &gt; which hummed along fine.&nbsp; One day much later, the cloud had a 
hitch, and
&gt; &gt; they discovered a message was&nbsp; not delivered.
&gt; &gt;
&gt; &gt; I am more concerned about developers discovering at the worst 
possible time
&gt; &gt; that&nbsp; ""published-if-no_real_errors"&nbsp; is a concept.
&gt; &gt;
&gt; &gt; My suggestion is to make this simple for developers.
&gt; &gt;
&gt; &gt; ----The sync/async nature of the close() [ or any other API, for that
&gt; &gt; matter ]&nbsp; is completely orthogonal to the API semantics, and is 
just a
&gt; &gt; programmatic choice to deal with&nbsp; how resources are managed 
within the
&gt; &gt; program. That's not material here.---
&gt; &gt;
&gt; &gt; A close() is an action that is shutting down the producer right now, 
not
&gt; &gt; even waiting for any acks of inflight messages. A willingness to lose
&gt; &gt; pending/inflight messages is explicit in that call.&nbsp; The 
producer will&nbsp; not
&gt; &gt; be around to deal with errors or to retry failed messages once 
close() is
&gt; &gt; invoked.
&gt; &gt;
&gt; &gt; On the contrary, if the client does not want to deal with message 
loss,
&gt; &gt; then flush(), stick around to gather the acks, deal with errors and 
retries
&gt; &gt; etc and then do close() . Then close() will be just a resource 
management
&gt; &gt; action on the client.
&gt; &gt;
&gt; &gt; So update the documentation to reflect that. ---&gt; if close() is 
called on a
&gt; &gt; producer with messages pending acks, those messages are left indoubt. 
Avoid
&gt; &gt; all mention of flushes, best effort etc.&nbsp; Users must buy 
into&nbsp; uncertainty,
&gt; &gt; without any qualifications.
&gt; &gt;
&gt; &gt; I would at all costs avoid using the term "graceful" anywhere.&nbsp; 
That word
&gt; &gt; has specific semantics associated with it in the systems/storage 
domain ,
&gt; &gt; and what is being proposed here is nothing like that.
&gt; &gt;
&gt; &gt; -j
&gt; &gt;
&gt; &gt;
&gt; &gt; On Tue, Sep 28, 2021 at 7:05 PM Yunze Xu 
<y...@streamnative.io.invalid&gt;
&gt; &gt; wrote:
&gt; &gt;
&gt; &gt; &gt; It??s a good point that `ProducerImpl#failPendingBatchMessages` 
treats
&gt; &gt; &gt; messages in batch container also as pending messages.
&gt; &gt; &gt;
&gt; &gt; &gt; I agree with your definition of "graceful close??. It??s more 
like a ??at
&gt; &gt; &gt; most once??
&gt; &gt; &gt; semantics, like the original JavaDoc said
&gt; &gt; &gt;
&gt; &gt; &gt; &gt; pending writes will not be retried
&gt; &gt; &gt;
&gt; &gt; &gt; Thanks,
&gt; &gt; &gt; Yunze
&gt; &gt; &gt;
&gt; &gt; &gt; &gt; 2021??9??29?? ????5:24??Michael Marshall 
<mikemars...@gmail.com&gt; ??????
&gt; &gt; &gt; &gt;
&gt; &gt; &gt; &gt; Thanks for bringing this thread to the mailing list, Yunze.
&gt; &gt; &gt; &gt;
&gt; &gt; &gt; &gt; I think the right change is to update the `closeAsync` 
method to first
&gt; &gt; &gt; &gt; flush `batchMessageContainer` and to then asynchronously 
wait for the
&gt; &gt; &gt; &gt; `pendingMessages` queue to drain. We could add a new 
timeout or rely
&gt; &gt; &gt; &gt; on the already implemented `sendTimeout` config to put an 
upper time
&gt; &gt; &gt; &gt; limit on `closeAsync`. My reasoning as well as responses to 
Joe and
&gt; &gt; &gt; &gt; Yunze follow:
&gt; &gt; &gt; &gt;
&gt; &gt; &gt; &gt;&gt; we need to define the behavior for how to process 
`pendingMessages`
&gt; &gt; &gt; &gt;&gt; and `batchMessageContainer` when producer call 
`closeAsync`.
&gt; &gt; &gt; &gt;
&gt; &gt; &gt; &gt; Yes, this is exactly the clarification required, and I 
agree that the
&gt; &gt; &gt; &gt; Javadoc is ambiguous and that the implementation doesn't 
align with
&gt; &gt; &gt; &gt; the Javadoc.
&gt; &gt; &gt; &gt;
&gt; &gt; &gt; &gt; If we view the Javadoc as binding, then the fundamental 
question is
&gt; &gt; &gt; &gt; what messages are "pending". The `pendingMessages` seem 
pretty easy to
&gt; &gt; &gt; &gt; classify as "pending" given that they are already in flight 
on the
&gt; &gt; &gt; &gt; network.
&gt; &gt; &gt; &gt;
&gt; &gt; &gt; &gt; I also consider `batchMessageContainer` to be "pending" 
because a
&gt; &gt; &gt; &gt; client application already has callbacks for the messages 
in this
&gt; &gt; &gt; &gt; container. These callbacks are expected to complete when 
the batch
&gt; &gt; &gt; &gt; message delivery completes. Since the client application 
already has a
&gt; &gt; &gt; &gt; reference to a callback, it isn't a problem that the 
producer
&gt; &gt; &gt; &gt; implementation initiates the flush logic. (Note that the 
current
&gt; &gt; &gt; &gt; design fails the `pendingMessages` but does not fail the
&gt; &gt; &gt; &gt; `batchMessageContainer` when `closeAsync` is called, so the 
callbacks
&gt; &gt; &gt; &gt; for that container are currently left incomplete forever if 
the client
&gt; &gt; &gt; &gt; is closed with an unsent batch. We will need to address 
this design in
&gt; &gt; &gt; &gt; the work that comes from this discussion.)
&gt; &gt; &gt; &gt;
&gt; &gt; &gt; &gt; Further, the `ProducerImpl#failPendingMessages` method 
includes logic
&gt; &gt; &gt; &gt; to call `ProducerImpl#failPendingBatchMessages`, which 
implies that
&gt; &gt; &gt; &gt; these batched, but not sent, messages have been historically
&gt; &gt; &gt; &gt; considered "pending".
&gt; &gt; &gt; &gt;
&gt; &gt; &gt; &gt; If we view the Javadoc as non-binding, I think my guiding 
influence
&gt; &gt; &gt; &gt; for the new design would be that the `closeAsync` method 
should result
&gt; &gt; &gt; &gt; in a "graceful" shutdown of the client.
&gt; &gt; &gt; &gt;
&gt; &gt; &gt; &gt;&gt; What exactly does "graceful" convey here?
&gt; &gt; &gt; &gt;
&gt; &gt; &gt; &gt; This is a great question, and will likely drive the design 
here. I
&gt; &gt; &gt; &gt; view graceful to mean that the producer attempts to avoid 
artificial
&gt; &gt; &gt; &gt; failures. That means trying to drain the queue instead of
&gt; &gt; &gt; &gt; automatically failing all of the queue's callbacks. The 
tradeoff is
&gt; &gt; &gt; &gt; that closing the producer takes longer. This reasoning 
would justify
&gt; &gt; &gt; &gt; my claim that we should first flush the 
`batchMessageContainer`
&gt; &gt; &gt; &gt; instead of failing the batch without any effort at 
delivery, as that
&gt; &gt; &gt; &gt; would be artificial.
&gt; &gt; &gt; &gt;
&gt; &gt; &gt; &gt;&gt; There is no guarantee that either case will ensure the 
message
&gt; &gt; &gt; &gt;&gt; is published.
&gt; &gt; &gt; &gt;
&gt; &gt; &gt; &gt; I don't think that implementing `closeAsync` with graceful 
shutdown
&gt; &gt; &gt; &gt; logic implies a guarantee of message publishing. Rather, it 
guarantees
&gt; &gt; &gt; &gt; that failures will be the result of a real exception or a 
timeout.
&gt; &gt; &gt; &gt; Since calling `closeAsync` prevents additional messages from
&gt; &gt; &gt; &gt; delivering, users leveraging this functionality might be 
operating
&gt; &gt; &gt; &gt; with "at most once" delivery semantics where they'd prefer 
to deliver
&gt; &gt; &gt; &gt; the messages if possible, but they aren't going to delay 
application
&gt; &gt; &gt; &gt; shutdown indefinitely to deliver its last messages. If 
users need
&gt; &gt; &gt; &gt; stronger guarantees about whether their messages are 
delivered, they
&gt; &gt; &gt; &gt; are probably already using the flush methods to ensure that 
the
&gt; &gt; &gt; &gt; producer's queues are empty before calling `closeAsync`.
&gt; &gt; &gt; &gt;
&gt; &gt; &gt; &gt; I also agree that in all of these cases, we're assuming 
that users are
&gt; &gt; &gt; &gt; capturing references to the async callbacks and then making 
business
&gt; &gt; &gt; &gt; logic decisions based on the results of those callbacks.
&gt; &gt; &gt; &gt;
&gt; &gt; &gt; &gt; Thanks,
&gt; &gt; &gt; &gt; Michael
&gt; &gt; &gt; &gt;
&gt; &gt; &gt; &gt; On Tue, Sep 28, 2021 at 4:58 AM Yunze Xu 
<y...@streamnative.io.invalid
&gt; &gt; &gt;
&gt; &gt; &gt; wrote:
&gt; &gt; &gt; &gt;&gt;
&gt; &gt; &gt; &gt;&gt; I can??t agree more, just like what I??ve said in PR 
12195:
&gt; &gt; &gt; &gt;&gt;
&gt; &gt; &gt; &gt;&gt;&gt; At any case, when you choose `sendAsync`, you 
should always make use
&gt; &gt; &gt; of the returned future to confirm the result of all messages. In 
Kafka,
&gt; &gt; &gt; it's the send callback.
&gt; &gt; &gt; &gt;&gt;
&gt; &gt; &gt; &gt;&gt; But I found many users are confused about the current 
behavior,
&gt; &gt; &gt; especially
&gt; &gt; &gt; &gt;&gt; those are used to Kafka??s close semantics. They might 
expect a simple
&gt; &gt; &gt; try
&gt; &gt; &gt; &gt;&gt; to flush existing messages, which works at a simple 
test environment,
&gt; &gt; &gt; even
&gt; &gt; &gt; &gt;&gt; there's no guarantee for exception cases.
&gt; &gt; &gt; &gt;&gt;
&gt; &gt; &gt; &gt;&gt;
&gt; &gt; &gt; &gt;&gt;
&gt; &gt; &gt; &gt;&gt;&gt; 2021??9??28?? ????4:37??Joe F 
<joefranc...@gmail.com&gt; ??????
&gt; &gt; &gt; &gt;&gt;&gt;
&gt; &gt; &gt; &gt;&gt;&gt; Clients should not depend on any of this behaviour, 
since the broker
&gt; &gt; &gt; is at
&gt; &gt; &gt; &gt;&gt;&gt; the other end of an unreliable&nbsp; network 
connection. The
&gt; &gt; &gt; &gt;&gt;&gt; semantic differences are kind of meaningless from a 
usability point,
&gt; &gt; &gt; since
&gt; &gt; &gt; &gt;&gt;&gt; flushing on close =/= published.&nbsp; What exactly 
does "graceful" convey
&gt; &gt; &gt; &gt;&gt;&gt; here?&nbsp; Flush the&nbsp; buffer on the client 
end and hope it makes it to
&gt; &gt; the
&gt; &gt; &gt; &gt;&gt;&gt; server.
&gt; &gt; &gt; &gt;&gt;&gt;
&gt; &gt; &gt; &gt;&gt;&gt; Is there a&nbsp; difference whether you flush(or 
process) pending messages
&gt; &gt; &gt; or
&gt; &gt; &gt; &gt;&gt;&gt; not? There is no guarantee that either case will 
ensure the message
&gt; &gt; is
&gt; &gt; &gt; &gt;&gt;&gt; published.
&gt; &gt; &gt; &gt;&gt;&gt;
&gt; &gt; &gt; &gt;&gt;&gt; The only way to ensure that messages are published 
is to wait for the
&gt; &gt; &gt; ack.
&gt; &gt; &gt; &gt;&gt;&gt; The correct model should be to wait for return on 
the blocking API,
&gt; &gt; or
&gt; &gt; &gt; wait
&gt; &gt; &gt; &gt;&gt;&gt; for future completion of the async API, then handle 
any publish
&gt; &gt; errors
&gt; &gt; &gt; and
&gt; &gt; &gt; &gt;&gt;&gt; then only close the producer.
&gt; &gt; &gt; &gt;&gt;&gt;
&gt; &gt; &gt; &gt;&gt;&gt;
&gt; &gt; &gt; &gt;&gt;&gt; On Mon, Sep 27, 2021 at 8:50 PM Yunze Xu
&gt; &gt; <y...@streamnative.io.invalid
&gt; &gt; &gt; &gt;
&gt; &gt; &gt; &gt;&gt;&gt; wrote:
&gt; &gt; &gt; &gt;&gt;&gt;
&gt; &gt; &gt; &gt;&gt;&gt;&gt; Hi all,
&gt; &gt; &gt; &gt;&gt;&gt;&gt;
&gt; &gt; &gt; &gt;&gt;&gt;&gt; Recently I found a PR 
(https://github.com/apache/pulsar/pull/12195
&gt; &gt; <
&gt; &gt; &gt; &gt;&gt;&gt;&gt; 
https://github.com/apache/pulsar/pull/12195&gt;) that
&gt; &gt; &gt; &gt;&gt;&gt;&gt; modifies the existing semantics of producer 
close. There're already
&gt; &gt; &gt; some
&gt; &gt; &gt; &gt;&gt;&gt;&gt; communications in this PR, but I think it's 
better to start a
&gt; &gt; &gt; discussion
&gt; &gt; &gt; &gt;&gt;&gt;&gt; here
&gt; &gt; &gt; &gt;&gt;&gt;&gt; to let more know.
&gt; &gt; &gt; &gt;&gt;&gt;&gt;
&gt; &gt; &gt; &gt;&gt;&gt;&gt; The existing implementation of producer close 
is:
&gt; &gt; &gt; &gt;&gt;&gt;&gt; 1. Cancel all timers, including send and batch 
container
&gt; &gt; &gt; &gt;&gt;&gt;&gt; (`batchMessageContainer`).
&gt; &gt; &gt; &gt;&gt;&gt;&gt; 2. Complete all pending messages 
(`pendingMessages`) with
&gt; &gt; &gt; &gt;&gt;&gt;&gt; `AlreadyCloseException`.
&gt; &gt; &gt; &gt;&gt;&gt;&gt;
&gt; &gt; &gt; &gt;&gt;&gt;&gt; See `ProducerImpl#closeAsync` for details.
&gt; &gt; &gt; &gt;&gt;&gt;&gt;
&gt; &gt; &gt; &gt;&gt;&gt;&gt; But the JavaDoc of `Producer#closeAsync` is:
&gt; &gt; &gt; &gt;&gt;&gt;&gt;
&gt; &gt; &gt; &gt;&gt;&gt;&gt;&gt; No more writes will be accepted from this 
producer. Waits until all
&gt; &gt; &gt; &gt;&gt;&gt;&gt; pending write request are persisted.
&gt; &gt; &gt; &gt;&gt;&gt;&gt;
&gt; &gt; &gt; &gt;&gt;&gt;&gt; Anyway, the document and implementation are 
inconsistent. But
&gt; &gt; &gt; specifically,
&gt; &gt; &gt; &gt;&gt;&gt;&gt; we need to define the behavior for how to 
process `pendingMessages`
&gt; &gt; &gt; and
&gt; &gt; &gt; &gt;&gt;&gt;&gt; `batchMessageContainer` when producer call 
`closeAsync`.
&gt; &gt; &gt; &gt;&gt;&gt;&gt;
&gt; &gt; &gt; &gt;&gt;&gt;&gt; 1. batchMessageContainer: contains the buffered 
single messages
&gt; &gt; &gt; &gt;&gt;&gt;&gt; (`Message<T&gt;`).
&gt; &gt; &gt; &gt;&gt;&gt;&gt; 2. pendingMessages: all inflight messages 
(`OpSendMsg`) in network.
&gt; &gt; &gt; &gt;&gt;&gt;&gt;
&gt; &gt; &gt; &gt;&gt;&gt;&gt; IMO, from the JavaDoc, only `pendingMessages` 
should be processed
&gt; &gt; and
&gt; &gt; &gt; the
&gt; &gt; &gt; &gt;&gt;&gt;&gt; messages in `batchMessageContainer` should be 
discarded.
&gt; &gt; &gt; &gt;&gt;&gt;&gt;
&gt; &gt; &gt; &gt;&gt;&gt;&gt; Since other clients might have already 
implemented the similar
&gt; &gt; &gt; semantics of
&gt; &gt; &gt; &gt;&gt;&gt;&gt; Java clients. If we changed the semantics now, 
the behaviors among
&gt; &gt; &gt; &gt;&gt;&gt;&gt; different
&gt; &gt; &gt; &gt;&gt;&gt;&gt; clients might be inconsistent.
&gt; &gt; &gt; &gt;&gt;&gt;&gt;
&gt; &gt; &gt; &gt;&gt;&gt;&gt; Should we add a configuration to support 
graceful close to follow
&gt; &gt; the
&gt; &gt; &gt; &gt;&gt;&gt;&gt; docs? Or
&gt; &gt; &gt; &gt;&gt;&gt;&gt; just change the current behavior?
&gt; &gt; &gt; &gt;&gt;&gt;&gt;
&gt; &gt; &gt; &gt;&gt;&gt;&gt; Thanks,
&gt; &gt; &gt; &gt;&gt;&gt;&gt; Yunze
&gt; &gt; &gt; &gt;&gt;
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt;

Reply via email to