It’s a good point that `ProducerImpl#failPendingBatchMessages` treats
messages in batch container also as pending messages.

I agree with your definition of "graceful close”. It’s more like a “at most 
once”
semantics, like the original JavaDoc said

> pending writes will not be retried

Thanks,
Yunze

> 2021年9月29日 上午5:24,Michael Marshall <mikemars...@gmail.com> 写道:
> 
> Thanks for bringing this thread to the mailing list, Yunze.
> 
> I think the right change is to update the `closeAsync` method to first
> flush `batchMessageContainer` and to then asynchronously wait for the
> `pendingMessages` queue to drain. We could add a new timeout or rely
> on the already implemented `sendTimeout` config to put an upper time
> limit on `closeAsync`. My reasoning as well as responses to Joe and
> Yunze follow:
> 
>> we need to define the behavior for how to process `pendingMessages`
>> and `batchMessageContainer` when producer call `closeAsync`.
> 
> Yes, this is exactly the clarification required, and I agree that the
> Javadoc is ambiguous and that the implementation doesn't align with
> the Javadoc.
> 
> If we view the Javadoc as binding, then the fundamental question is
> what messages are "pending". The `pendingMessages` seem pretty easy to
> classify as "pending" given that they are already in flight on the
> network.
> 
> I also consider `batchMessageContainer` to be "pending" because a
> client application already has callbacks for the messages in this
> container. These callbacks are expected to complete when the batch
> message delivery completes. Since the client application already has a
> reference to a callback, it isn't a problem that the producer
> implementation initiates the flush logic. (Note that the current
> design fails the `pendingMessages` but does not fail the
> `batchMessageContainer` when `closeAsync` is called, so the callbacks
> for that container are currently left incomplete forever if the client
> is closed with an unsent batch. We will need to address this design in
> the work that comes from this discussion.)
> 
> Further, the `ProducerImpl#failPendingMessages` method includes logic
> to call `ProducerImpl#failPendingBatchMessages`, which implies that
> these batched, but not sent, messages have been historically
> considered "pending".
> 
> If we view the Javadoc as non-binding, I think my guiding influence
> for the new design would be that the `closeAsync` method should result
> in a "graceful" shutdown of the client.
> 
>> What exactly does "graceful" convey here?
> 
> This is a great question, and will likely drive the design here. I
> view graceful to mean that the producer attempts to avoid artificial
> failures. That means trying to drain the queue instead of
> automatically failing all of the queue's callbacks. The tradeoff is
> that closing the producer takes longer. This reasoning would justify
> my claim that we should first flush the `batchMessageContainer`
> instead of failing the batch without any effort at delivery, as that
> would be artificial.
> 
>> There is no guarantee that either case will ensure the message
>> is published.
> 
> I don't think that implementing `closeAsync` with graceful shutdown
> logic implies a guarantee of message publishing. Rather, it guarantees
> that failures will be the result of a real exception or a timeout.
> Since calling `closeAsync` prevents additional messages from
> delivering, users leveraging this functionality might be operating
> with "at most once" delivery semantics where they'd prefer to deliver
> the messages if possible, but they aren't going to delay application
> shutdown indefinitely to deliver its last messages. If users need
> stronger guarantees about whether their messages are delivered, they
> are probably already using the flush methods to ensure that the
> producer's queues are empty before calling `closeAsync`.
> 
> I also agree that in all of these cases, we're assuming that users are
> capturing references to the async callbacks and then making business
> logic decisions based on the results of those callbacks.
> 
> Thanks,
> Michael
> 
> On Tue, Sep 28, 2021 at 4:58 AM Yunze Xu <y...@streamnative.io.invalid> wrote:
>> 
>> I can’t agree more, just like what I’ve said in PR 12195:
>> 
>>> At any case, when you choose `sendAsync`, you should always make use of the 
>>> returned future to confirm the result of all messages. In Kafka, it's the 
>>> send callback.
>> 
>> But I found many users are confused about the current behavior, especially
>> those are used to Kafka’s close semantics. They might expect a simple try
>> to flush existing messages, which works at a simple test environment, even
>> there's no guarantee for exception cases.
>> 
>> 
>> 
>>> 2021年9月28日 下午4:37,Joe F <joefranc...@gmail.com> 写道:
>>> 
>>> Clients should not depend on any of this behaviour, since the broker is at
>>> the other end of an unreliable  network connection. The
>>> semantic differences are kind of meaningless from a usability point, since
>>> flushing on close =/= published.  What exactly does "graceful" convey
>>> here?  Flush the  buffer on the client end and hope it makes it to the
>>> server.
>>> 
>>> Is there a  difference whether you flush(or process) pending messages  or
>>> not? There is no guarantee that either case will ensure the message is
>>> published.
>>> 
>>> The only way to ensure that messages are published is to wait for the ack.
>>> The correct model should be to wait for return on the blocking API, or wait
>>> for future completion of the async API, then handle any publish errors and
>>> then only close the producer.
>>> 
>>> 
>>> On Mon, Sep 27, 2021 at 8:50 PM Yunze Xu <y...@streamnative.io.invalid>
>>> wrote:
>>> 
>>>> Hi all,
>>>> 
>>>> Recently I found a PR (https://github.com/apache/pulsar/pull/12195 <
>>>> https://github.com/apache/pulsar/pull/12195>) that
>>>> modifies the existing semantics of producer close. There're already some
>>>> communications in this PR, but I think it's better to start a discussion
>>>> here
>>>> to let more know.
>>>> 
>>>> The existing implementation of producer close is:
>>>> 1. Cancel all timers, including send and batch container
>>>> (`batchMessageContainer`).
>>>> 2. Complete all pending messages (`pendingMessages`) with
>>>> `AlreadyCloseException`.
>>>> 
>>>> See `ProducerImpl#closeAsync` for details.
>>>> 
>>>> But the JavaDoc of `Producer#closeAsync` is:
>>>> 
>>>>> No more writes will be accepted from this producer. Waits until all
>>>> pending write request are persisted.
>>>> 
>>>> Anyway, the document and implementation are inconsistent. But specifically,
>>>> we need to define the behavior for how to process `pendingMessages` and
>>>> `batchMessageContainer` when producer call `closeAsync`.
>>>> 
>>>> 1. batchMessageContainer: contains the buffered single messages
>>>> (`Message<T>`).
>>>> 2. pendingMessages: all inflight messages (`OpSendMsg`) in network.
>>>> 
>>>> IMO, from the JavaDoc, only `pendingMessages` should be processed and the
>>>> messages in `batchMessageContainer` should be discarded.
>>>> 
>>>> Since other clients might have already implemented the similar semantics of
>>>> Java clients. If we changed the semantics now, the behaviors among
>>>> different
>>>> clients might be inconsistent.
>>>> 
>>>> Should we add a configuration to support graceful close to follow the
>>>> docs? Or
>>>> just change the current behavior?
>>>> 
>>>> Thanks,
>>>> Yunze
>> 

Reply via email to