Thanks for bringing this thread to the mailing list, Yunze.

I think the right change is to update the `closeAsync` method to first
flush `batchMessageContainer` and to then asynchronously wait for the
`pendingMessages` queue to drain. We could add a new timeout or rely
on the already implemented `sendTimeout` config to put an upper time
limit on `closeAsync`. My reasoning as well as responses to Joe and
Yunze follow:

> we need to define the behavior for how to process `pendingMessages`
> and `batchMessageContainer` when producer call `closeAsync`.

Yes, this is exactly the clarification required, and I agree that the
Javadoc is ambiguous and that the implementation doesn't align with
the Javadoc.

If we view the Javadoc as binding, then the fundamental question is
what messages are "pending". The `pendingMessages` seem pretty easy to
classify as "pending" given that they are already in flight on the
network.

I also consider `batchMessageContainer` to be "pending" because a
client application already has callbacks for the messages in this
container. These callbacks are expected to complete when the batch
message delivery completes. Since the client application already has a
reference to a callback, it isn't a problem that the producer
implementation initiates the flush logic. (Note that the current
design fails the `pendingMessages` but does not fail the
`batchMessageContainer` when `closeAsync` is called, so the callbacks
for that container are currently left incomplete forever if the client
is closed with an unsent batch. We will need to address this design in
the work that comes from this discussion.)

Further, the `ProducerImpl#failPendingMessages` method includes logic
to call `ProducerImpl#failPendingBatchMessages`, which implies that
these batched, but not sent, messages have been historically
considered "pending".

If we view the Javadoc as non-binding, I think my guiding influence
for the new design would be that the `closeAsync` method should result
in a "graceful" shutdown of the client.

> What exactly does "graceful" convey here?

This is a great question, and will likely drive the design here. I
view graceful to mean that the producer attempts to avoid artificial
failures. That means trying to drain the queue instead of
automatically failing all of the queue's callbacks. The tradeoff is
that closing the producer takes longer. This reasoning would justify
my claim that we should first flush the `batchMessageContainer`
instead of failing the batch without any effort at delivery, as that
would be artificial.

> There is no guarantee that either case will ensure the message
> is published.

I don't think that implementing `closeAsync` with graceful shutdown
logic implies a guarantee of message publishing. Rather, it guarantees
that failures will be the result of a real exception or a timeout.
Since calling `closeAsync` prevents additional messages from
delivering, users leveraging this functionality might be operating
with "at most once" delivery semantics where they'd prefer to deliver
the messages if possible, but they aren't going to delay application
shutdown indefinitely to deliver its last messages. If users need
stronger guarantees about whether their messages are delivered, they
are probably already using the flush methods to ensure that the
producer's queues are empty before calling `closeAsync`.

I also agree that in all of these cases, we're assuming that users are
capturing references to the async callbacks and then making business
logic decisions based on the results of those callbacks.

Thanks,
Michael

On Tue, Sep 28, 2021 at 4:58 AM Yunze Xu <y...@streamnative.io.invalid> wrote:
>
> I can’t agree more, just like what I’ve said in PR 12195:
>
> > At any case, when you choose `sendAsync`, you should always make use of the 
> > returned future to confirm the result of all messages. In Kafka, it's the 
> > send callback.
>
> But I found many users are confused about the current behavior, especially
> those are used to Kafka’s close semantics. They might expect a simple try
> to flush existing messages, which works at a simple test environment, even
> there's no guarantee for exception cases.
>
>
>
> > 2021年9月28日 下午4:37,Joe F <joefranc...@gmail.com> 写道:
> >
> > Clients should not depend on any of this behaviour, since the broker is at
> > the other end of an unreliable  network connection. The
> > semantic differences are kind of meaningless from a usability point, since
> > flushing on close =/= published.  What exactly does "graceful" convey
> > here?  Flush the  buffer on the client end and hope it makes it to the
> > server.
> >
> > Is there a  difference whether you flush(or process) pending messages  or
> > not? There is no guarantee that either case will ensure the message is
> > published.
> >
> > The only way to ensure that messages are published is to wait for the ack.
> > The correct model should be to wait for return on the blocking API, or wait
> > for future completion of the async API, then handle any publish errors and
> > then only close the producer.
> >
> >
> > On Mon, Sep 27, 2021 at 8:50 PM Yunze Xu <y...@streamnative.io.invalid>
> > wrote:
> >
> >> Hi all,
> >>
> >> Recently I found a PR (https://github.com/apache/pulsar/pull/12195 <
> >> https://github.com/apache/pulsar/pull/12195>) that
> >> modifies the existing semantics of producer close. There're already some
> >> communications in this PR, but I think it's better to start a discussion
> >> here
> >> to let more know.
> >>
> >> The existing implementation of producer close is:
> >> 1. Cancel all timers, including send and batch container
> >> (`batchMessageContainer`).
> >> 2. Complete all pending messages (`pendingMessages`) with
> >> `AlreadyCloseException`.
> >>
> >> See `ProducerImpl#closeAsync` for details.
> >>
> >> But the JavaDoc of `Producer#closeAsync` is:
> >>
> >>> No more writes will be accepted from this producer. Waits until all
> >> pending write request are persisted.
> >>
> >> Anyway, the document and implementation are inconsistent. But specifically,
> >> we need to define the behavior for how to process `pendingMessages` and
> >> `batchMessageContainer` when producer call `closeAsync`.
> >>
> >> 1. batchMessageContainer: contains the buffered single messages
> >> (`Message<T>`).
> >> 2. pendingMessages: all inflight messages (`OpSendMsg`) in network.
> >>
> >> IMO, from the JavaDoc, only `pendingMessages` should be processed and the
> >> messages in `batchMessageContainer` should be discarded.
> >>
> >> Since other clients might have already implemented the similar semantics of
> >> Java clients. If we changed the semantics now, the behaviors among
> >> different
> >> clients might be inconsistent.
> >>
> >> Should we add a configuration to support graceful close to follow the
> >> docs? Or
> >> just change the current behavior?
> >>
> >> Thanks,
> >> Yunze
>

Reply via email to