Moses, in the case of a full queue, could we just return a failed future
immediately?

Ryanne

On Tue, May 18, 2021, 10:39 AM Nakamura <nny...@gmail.com> wrote:

> Hi Alexandre,
>
> Thanks for bringing this up, I think I could use some feedback in this
> area.  There are two mechanisms here, one for slowing down when we don't
> have the relevant metadata, and the other for slowing down when a queue has
> filled up.  Although the first one applies backpressure somewhat
> inadvertently, we could still get in trouble if we're not providing
> information to the mechanism that monitors whether we're queueing too
> much.  As for the second one, that is a classic backpressure use case, so
> it's definitely important that we don't drop that ability.
>
> Right now backpressure is applied by blocking, which is a natural way to
> apply backpressure in synchronous systems, but can lead to unnecessary
> slowdowns in asynchronous systems.  In my opinion, the safest way to apply
> backpressure in an asynchronous model is to have an explicit backpressure
> signal.  A good example would be returning an exception, and providing an
> optional hook to add a callback onto so that you can be notified when it's
> ready to accept more messages.
>
> However, this would be a really big change to how users use
> KafkaProducer#send, so I don't know how much appetite we have for making
> that kind of change.  Maybe it would be simpler to remove the "don't block
> when the per-topic queue is full" from the scope of this KIP, and only
> focus on when metadata is available?  The downside is that we will probably
> want to change the API again later to fix this, so it might be better to
> just rip the bandaid off now.
>
> One slightly nasty thing here is that because queueing order is important,
> if we want to use exceptions, we will want to be able to signal the failure
> to enqueue to the caller in such a way that they can still enforce message
> order if they want.  So we can't embed the failure directly in the returned
> future, we should either return two futures (nested, or as a tuple) or else
> throw an exception to explain a backpressure.
>
> So there are a few things we should work out here:
>
> 1. Should we keep the "too many bytes enqueued" part of this in scope?  (I
> would say yes, so that we can minimize churn in this API)
> 2. How should we signal backpressure so that it's appropriate for
> asynchronous systems?  (I would say that we should throw an exception.  If
> we choose this and we want to pursue the queueing path, we would *not* want
> to enqueue messages that would push us over the limit, and would only want
> to enqueue messages when we're waiting for metadata, and we would want to
> keep track of the total number of bytes for those messages).
>
> What do you think?
>
> Best,
> Moses
>
> On Sun, May 16, 2021 at 6:16 AM Alexandre Dupriez <
> alexandre.dupr...@gmail.com> wrote:
>
> > Hello Nakamura,
> >
> > Thanks for proposing this change. I can see how the blocking behaviour
> > can be a problem when integrating with reactive frameworks such as
> > Akka. One of the questions I would have is how you would handle back
> > pressure and avoid memory exhaustion when the producer's buffer is
> > full and tasks would start to accumulate in the out-of-band queue or
> > thread pool introduced with this KIP.
> >
> > Thanks,
> > Alexandre
> >
> > Le ven. 14 mai 2021 à 15:55, Ryanne Dolan <ryannedo...@gmail.com> a
> écrit
> > :
> > >
> > > Makes sense!
> > >
> > > Ryanne
> > >
> > > On Fri, May 14, 2021, 9:39 AM Nakamura <nny...@gmail.com> wrote:
> > >
> > > > Hey Ryanne,
> > > >
> > > > I see what you're saying about serde blocking, but I think we should
> > > > consider it out of scope for this patch.  Right now we've nailed
> down a
> > > > couple of use cases where we can unambiguously say, "I can make
> > progress
> > > > now" or "I cannot make progress now", which makes it possible to
> > offload to
> > > > a different thread only if we are unable to make progress.  Extending
> > this
> > > > to CPU work like serde would mean always offloading, which would be a
> > > > really big performance change.  It might be worth exploring anyway,
> > but I'd
> > > > rather keep this patch focused on improving ergonomics, rather than
> > > > muddying the waters with evaluating performance very deeply.
> > > >
> > > > I think if we really do want to support serde or interceptors that do
> > IO on
> > > > the send path (which seems like an anti-pattern to me), we should
> > consider
> > > > making that a separate SIP, and probably also consider changing the
> > API to
> > > > use Futures (or CompletionStages).  But I would rather avoid scope
> > creep,
> > > > so that we have a better chance of fixing this part of the problem.
> > > >
> > > > Yes, I think some exceptions will move to being async instead of
> sync.
> > > > They'll still be surfaced in the Future, so I'm not so confident that
> > it
> > > > would be that big a shock to users though.
> > > >
> > > > Best,
> > > > Moses
> > > >
> > > > On Thu, May 13, 2021 at 7:44 PM Ryanne Dolan <ryannedo...@gmail.com>
> > > > wrote:
> > > >
> > > > > re serialization, my concern is that serialization often accounts
> > for a
> > > > lot
> > > > > of the cycles spent before returning the future. It's not blocking
> > per
> > > > se,
> > > > > but it's the same effect from the caller's perspective.
> > > > >
> > > > > Moreover, serde impls often block themselves, e.g. when fetching
> > schemas
> > > > > from a registry. I suppose it's also possible to block in
> > Interceptors
> > > > > (e.g. writing audit events or metrics), which happens before serdes
> > iiuc.
> > > > > So any blocking in either of those plugins would block the send
> > unless we
> > > > > queue first.
> > > > >
> > > > > So I think we want to queue first and do everything off-thread when
> > using
> > > > > the new API, whatever that looks like. I just want to make sure we
> > don't
> > > > do
> > > > > that for clients that wouldn't expect it.
> > > > >
> > > > > Another consideration is exception handling. If we queue right
> away,
> > > > we'll
> > > > > defer some exceptions that currently are thrown to the caller
> > (before the
> > > > > future is returned). In the new API, the send() wouldn't throw any
> > > > > exceptions, and instead the future would fail. I think that might
> > mean
> > > > that
> > > > > a new method signature is required.
> > > > >
> > > > > Ryanne
> > > > >
> > > > >
> > > > > On Thu, May 13, 2021, 2:57 PM Nakamura <nakamura.mo...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hey Ryanne,
> > > > > >
> > > > > > I agree we should add an additional constructor (or else an
> > additional
> > > > > > overload in KafkaProducer#send, but the new constructor would be
> > easier
> > > > > to
> > > > > > understand) if we're targeting the "user provides the thread"
> > approach.
> > > > > >
> > > > > > From looking at the code, I think we can keep record
> serialization
> > on
> > > > the
> > > > > > user thread, if we consider that an important part of the
> > semantics of
> > > > > this
> > > > > > method.  It doesn't seem like serialization depends on knowing
> the
> > > > > cluster,
> > > > > > I think it's incidental that it comes after the first "blocking"
> > > > segment
> > > > > in
> > > > > > the method.
> > > > > >
> > > > > > Best,
> > > > > > Moses
> > > > > >
> > > > > > On Thu, May 13, 2021 at 2:38 PM Ryanne Dolan <
> > ryannedo...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hey Moses, I like the direction here. My thinking is that a
> > single
> > > > > > > additional work queue, s.t. send() can enqueue and return,
> seems
> > like
> > > > > the
> > > > > > > lightest touch. However, I don't think we can trivially process
> > that
> > > > > > queue
> > > > > > > in an internal thread pool without subtly changing behavior for
> > some
> > > > > > users.
> > > > > > > For example, users will often run send() in multiple threads in
> > order
> > > > > to
> > > > > > > serialize faster, but that wouldn't work quite the same if
> there
> > were
> > > > > an
> > > > > > > internal thread pool.
> > > > > > >
> > > > > > > For this reason I'm thinking we need to make sure any such
> > changes
> > > > are
> > > > > > > opt-in. Maybe a new constructor with an additional
> ThreadFactory
> > > > > > parameter.
> > > > > > > That would at least clearly indicate that work will happen
> > > > off-thread,
> > > > > > and
> > > > > > > would require opt-in for the new behavior.
> > > > > > >
> > > > > > > Under the hood, this ThreadFactory could be used to create the
> > worker
> > > > > > > thread that process queued sends, which could fan-out to
> > > > per-partition
> > > > > > > threads from there.
> > > > > > >
> > > > > > > So then you'd have two ways to send: the existing way, where
> > serde
> > > > and
> > > > > > > interceptors and whatnot are executed on the calling thread,
> and
> > the
> > > > > new
> > > > > > > way, which returns right away and uses an internal Executor. As
> > you
> > > > > point
> > > > > > > out, the semantics would be identical in either case, and it
> > would be
> > > > > > very
> > > > > > > easy for clients to switch.
> > > > > > >
> > > > > > > Ryanne
> > > > > > >
> > > > > > > On Thu, May 13, 2021, 9:00 AM Nakamura <nny...@gmail.com>
> wrote:
> > > > > > >
> > > > > > > > Hey Folks,
> > > > > > > > I just posted a new proposal
> > > > > > > > <
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446
> > > > > > > > >
> > > > > > > > in the wiki.  I think we have an opportunity to improve the
> > > > > > > > KafkaProducer#send user experience.  It would certainly make
> > our
> > > > > lives
> > > > > > > > easier.  Please take a look!  There are two subproblems that
> I
> > > > could
> > > > > > use
> > > > > > > > guidance on, so I would appreciate feedback on both of them.
> > > > > > > > Best,
> > > > > > > > Moses
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
>

Reply via email to