Moses, in the case of a full queue, could we just return a failed future immediately?
Ryanne On Tue, May 18, 2021, 10:39 AM Nakamura <nny...@gmail.com> wrote: > Hi Alexandre, > > Thanks for bringing this up, I think I could use some feedback in this > area. There are two mechanisms here, one for slowing down when we don't > have the relevant metadata, and the other for slowing down when a queue has > filled up. Although the first one applies backpressure somewhat > inadvertently, we could still get in trouble if we're not providing > information to the mechanism that monitors whether we're queueing too > much. As for the second one, that is a classic backpressure use case, so > it's definitely important that we don't drop that ability. > > Right now backpressure is applied by blocking, which is a natural way to > apply backpressure in synchronous systems, but can lead to unnecessary > slowdowns in asynchronous systems. In my opinion, the safest way to apply > backpressure in an asynchronous model is to have an explicit backpressure > signal. A good example would be returning an exception, and providing an > optional hook to add a callback onto so that you can be notified when it's > ready to accept more messages. > > However, this would be a really big change to how users use > KafkaProducer#send, so I don't know how much appetite we have for making > that kind of change. Maybe it would be simpler to remove the "don't block > when the per-topic queue is full" from the scope of this KIP, and only > focus on when metadata is available? The downside is that we will probably > want to change the API again later to fix this, so it might be better to > just rip the bandaid off now. > > One slightly nasty thing here is that because queueing order is important, > if we want to use exceptions, we will want to be able to signal the failure > to enqueue to the caller in such a way that they can still enforce message > order if they want. So we can't embed the failure directly in the returned > future, we should either return two futures (nested, or as a tuple) or else > throw an exception to explain a backpressure. > > So there are a few things we should work out here: > > 1. Should we keep the "too many bytes enqueued" part of this in scope? (I > would say yes, so that we can minimize churn in this API) > 2. How should we signal backpressure so that it's appropriate for > asynchronous systems? (I would say that we should throw an exception. If > we choose this and we want to pursue the queueing path, we would *not* want > to enqueue messages that would push us over the limit, and would only want > to enqueue messages when we're waiting for metadata, and we would want to > keep track of the total number of bytes for those messages). > > What do you think? > > Best, > Moses > > On Sun, May 16, 2021 at 6:16 AM Alexandre Dupriez < > alexandre.dupr...@gmail.com> wrote: > > > Hello Nakamura, > > > > Thanks for proposing this change. I can see how the blocking behaviour > > can be a problem when integrating with reactive frameworks such as > > Akka. One of the questions I would have is how you would handle back > > pressure and avoid memory exhaustion when the producer's buffer is > > full and tasks would start to accumulate in the out-of-band queue or > > thread pool introduced with this KIP. > > > > Thanks, > > Alexandre > > > > Le ven. 14 mai 2021 à 15:55, Ryanne Dolan <ryannedo...@gmail.com> a > écrit > > : > > > > > > Makes sense! > > > > > > Ryanne > > > > > > On Fri, May 14, 2021, 9:39 AM Nakamura <nny...@gmail.com> wrote: > > > > > > > Hey Ryanne, > > > > > > > > I see what you're saying about serde blocking, but I think we should > > > > consider it out of scope for this patch. Right now we've nailed > down a > > > > couple of use cases where we can unambiguously say, "I can make > > progress > > > > now" or "I cannot make progress now", which makes it possible to > > offload to > > > > a different thread only if we are unable to make progress. Extending > > this > > > > to CPU work like serde would mean always offloading, which would be a > > > > really big performance change. It might be worth exploring anyway, > > but I'd > > > > rather keep this patch focused on improving ergonomics, rather than > > > > muddying the waters with evaluating performance very deeply. > > > > > > > > I think if we really do want to support serde or interceptors that do > > IO on > > > > the send path (which seems like an anti-pattern to me), we should > > consider > > > > making that a separate SIP, and probably also consider changing the > > API to > > > > use Futures (or CompletionStages). But I would rather avoid scope > > creep, > > > > so that we have a better chance of fixing this part of the problem. > > > > > > > > Yes, I think some exceptions will move to being async instead of > sync. > > > > They'll still be surfaced in the Future, so I'm not so confident that > > it > > > > would be that big a shock to users though. > > > > > > > > Best, > > > > Moses > > > > > > > > On Thu, May 13, 2021 at 7:44 PM Ryanne Dolan <ryannedo...@gmail.com> > > > > wrote: > > > > > > > > > re serialization, my concern is that serialization often accounts > > for a > > > > lot > > > > > of the cycles spent before returning the future. It's not blocking > > per > > > > se, > > > > > but it's the same effect from the caller's perspective. > > > > > > > > > > Moreover, serde impls often block themselves, e.g. when fetching > > schemas > > > > > from a registry. I suppose it's also possible to block in > > Interceptors > > > > > (e.g. writing audit events or metrics), which happens before serdes > > iiuc. > > > > > So any blocking in either of those plugins would block the send > > unless we > > > > > queue first. > > > > > > > > > > So I think we want to queue first and do everything off-thread when > > using > > > > > the new API, whatever that looks like. I just want to make sure we > > don't > > > > do > > > > > that for clients that wouldn't expect it. > > > > > > > > > > Another consideration is exception handling. If we queue right > away, > > > > we'll > > > > > defer some exceptions that currently are thrown to the caller > > (before the > > > > > future is returned). In the new API, the send() wouldn't throw any > > > > > exceptions, and instead the future would fail. I think that might > > mean > > > > that > > > > > a new method signature is required. > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > On Thu, May 13, 2021, 2:57 PM Nakamura <nakamura.mo...@gmail.com> > > wrote: > > > > > > > > > > > Hey Ryanne, > > > > > > > > > > > > I agree we should add an additional constructor (or else an > > additional > > > > > > overload in KafkaProducer#send, but the new constructor would be > > easier > > > > > to > > > > > > understand) if we're targeting the "user provides the thread" > > approach. > > > > > > > > > > > > From looking at the code, I think we can keep record > serialization > > on > > > > the > > > > > > user thread, if we consider that an important part of the > > semantics of > > > > > this > > > > > > method. It doesn't seem like serialization depends on knowing > the > > > > > cluster, > > > > > > I think it's incidental that it comes after the first "blocking" > > > > segment > > > > > in > > > > > > the method. > > > > > > > > > > > > Best, > > > > > > Moses > > > > > > > > > > > > On Thu, May 13, 2021 at 2:38 PM Ryanne Dolan < > > ryannedo...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Hey Moses, I like the direction here. My thinking is that a > > single > > > > > > > additional work queue, s.t. send() can enqueue and return, > seems > > like > > > > > the > > > > > > > lightest touch. However, I don't think we can trivially process > > that > > > > > > queue > > > > > > > in an internal thread pool without subtly changing behavior for > > some > > > > > > users. > > > > > > > For example, users will often run send() in multiple threads in > > order > > > > > to > > > > > > > serialize faster, but that wouldn't work quite the same if > there > > were > > > > > an > > > > > > > internal thread pool. > > > > > > > > > > > > > > For this reason I'm thinking we need to make sure any such > > changes > > > > are > > > > > > > opt-in. Maybe a new constructor with an additional > ThreadFactory > > > > > > parameter. > > > > > > > That would at least clearly indicate that work will happen > > > > off-thread, > > > > > > and > > > > > > > would require opt-in for the new behavior. > > > > > > > > > > > > > > Under the hood, this ThreadFactory could be used to create the > > worker > > > > > > > thread that process queued sends, which could fan-out to > > > > per-partition > > > > > > > threads from there. > > > > > > > > > > > > > > So then you'd have two ways to send: the existing way, where > > serde > > > > and > > > > > > > interceptors and whatnot are executed on the calling thread, > and > > the > > > > > new > > > > > > > way, which returns right away and uses an internal Executor. As > > you > > > > > point > > > > > > > out, the semantics would be identical in either case, and it > > would be > > > > > > very > > > > > > > easy for clients to switch. > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > On Thu, May 13, 2021, 9:00 AM Nakamura <nny...@gmail.com> > wrote: > > > > > > > > > > > > > > > Hey Folks, > > > > > > > > I just posted a new proposal > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446 > > > > > > > > > > > > > > > > > in the wiki. I think we have an opportunity to improve the > > > > > > > > KafkaProducer#send user experience. It would certainly make > > our > > > > > lives > > > > > > > > easier. Please take a look! There are two subproblems that > I > > > > could > > > > > > use > > > > > > > > guidance on, so I would appreciate feedback on both of them. > > > > > > > > Best, > > > > > > > > Moses > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >