Matthew, it's more than performance tho. In many frameworks the number of request threads is purposefully constrained, and blocking one means you have one less to handle requests with. When you're handling a large amount of requests with a small number of threads, any blocking can lead to thread exhaustion.
For this reason, you'll often see send() wrapped in a future or thread pool. But it's surprising that this would be required, since send() already returns a future. Additionally, even when send() does not actually block, it does a lot of work on the caller's thread, which is likewise surprising given a future is returned. The effect is the same: less threads are available to handle requests, and you risk thread exhaustion. I think we may incidentally improve performance if we introduce an internal thread pool, but the primary motivation here, at least for me, is to fix the lie the API is telling, not to improve performance. Ryanne On Wed, May 26, 2021, 6:51 AM Matthew de Detrich <matthew.dedetr...@aiven.io.invalid> wrote: > I think we may need to clarify terminology here, at least to me blocking > means suspending a current thread to wait for some operation (which is > wasteful if we are dealing with IO bound tasks). In other words, the > "blocking" is an implementation detail on how to wait rather than whether > we need to wait or not, so to me this is more of a performance question. > > In the scenario you describe of kafka clients producing too many messages, > as you said buffering is what should be done but I wouldn't classify this > as blocking. > > On Mon, May 24, 2021 at 7:54 PM Colin McCabe <cmcc...@apache.org> wrote: > > > Hi all, > > > > I agree that we should give users the option of having a fully async API, > > but I don't think external thread pools or queues are the right direction > > to go here. They add performance overheads and don't address the root > > causes of the problem. > > > > There are basically two scenarios where we block, currently. One is when > > we are doing a metadata fetch. I think this is clearly a bug, or at least > > an implementation limitation. From the user's point of view, the fact > that > > we are doing a metadata fetch is an implementation detail that really > > shouldn't be exposed like this. We have talked about fixing this in the > > past. I think we just should spend the time to do it. > > > > The second scenario is where the client has produced too much data in too > > little time. This could happen if there is a network glitch, or the > server > > is slower than expected. In this case, the behavior is intentional and > not > > a bug. To understand this, think about what would happen if we didn't > > block. We would start buffering more and more data in memory, until > finally > > the application died with an out of memory error. That would be > frustrating > > for users and wouldn't add to the usability of Kafka. > > > > We could potentially have an option to handle the out-of-memory scenario > > differently by returning an error code immediately rather than blocking. > > Applications would have to be rewritten to handle this properly, but it > is > > a possibility. I suspect that most of them wouldn't use this, but we > could > > offer it as a possibility for async purists (which might include certain > > frameworks). The big problem the users would have to solve is what to do > > with the record that they were unable to produce due to the buffer full > > issue. > > > > best, > > Colin > > > > > > On Thu, May 20, 2021, at 10:35, Nakamura wrote: > > > > > > > > My suggestion was just do this in multiple steps/phases, firstly > let's > > fix > > > > the issue of send being misleadingly asynchronous (i.e. internally > its > > > > blocking) and then later one we can make the various > > > > threadpools configurable with a sane default. > > > > > > I like that approach. I updated the "Which thread should be responsible > > for > > > waiting" part of KIP-739 to add your suggestion as my recommended > > approach, > > > thank you! If no one else has major concerns about that approach, I'll > > > move the alternatives to "rejected alternatives". > > > > > > On Thu, May 20, 2021 at 7:26 AM Matthew de Detrich > > > <matthew.dedetr...@aiven.io.invalid> wrote: > > > > > > > @ > > > > > > > > Nakamura > > > > On Wed, May 19, 2021 at 7:35 PM Nakamura <nny...@gmail.com> wrote: > > > > > > > > > @Ryanne: > > > > > In my mind's eye I slightly prefer the throwing the "cannot > enqueue" > > > > > exception to satisfying the future immediately with the "cannot > > enqueue" > > > > > exception? But I agree, it would be worth doing more research. > > > > > > > > > > @Matthew: > > > > > > > > > > > 3. Using multiple thread pools is definitely recommended for > > different > > > > > > types of tasks, for serialization which is CPU bound you > definitely > > > > would > > > > > > want to use a bounded thread pool that is fixed by the number of > > CPU's > > > > > (or > > > > > > something along those lines). > > > > > > > https://gist.github.com/djspiewak/46b543800958cf61af6efa8e072bfd5c > > is > > > > a > > > > > > very good guide on this topic > > > > > I think this guide is good in general, but I would be hesitant to > > follow > > > > > its guidance re: offloading serialization without benchmarking it. > > My > > > > > understanding is that context-switches have gotten much cheaper, > and > > that > > > > > gains from cache locality are small, but they're not nothing. > > Especially > > > > > if the workload has a very small serialization cost, I wouldn't be > > > > shocked > > > > > if it made it slower. I feel pretty strongly that we should do > more > > > > > research here before unconditionally encouraging serialization in a > > > > > threadpool. If people think it's important to do it here (eg if we > > think > > > > > it would mean another big API change) then we should start thinking > > about > > > > > what benchmarking we can do to gain higher confidence in this kind > of > > > > > change. However, I don't think it would change semantics as > > > > substantially > > > > > as we're proposing here, so I would vote for pushing this to a > > subsequent > > > > > KIP. > > > > > > > > > Of course, its all down to benchmarking, benchmarking and > benchmarking. > > > > Ideally speaking you want to use all of the resources available to > > you, so > > > > if you have a bottleneck in serialization and you have many cores > free > > then > > > > using multiple cores may be more appropriate than a single core. > > Typically > > > > I would expect that using a single thread to do serialization is > > likely to > > > > be the most situation, I was just responding to an earlier point that > > was > > > > made in regards to using ThreadPools for serialization (note that you > > can > > > > also just use a ThreadPool that is pinned to a single thread) > > > > > > > > > > > > > > > > > > > > > > > 4. Regarding providing the ability for users to supply their own > > custom > > > > > > ThreadPool this is more of an ergonomics question for the API. > > > > Especially > > > > > > when it gets to monitoring/tracing, giving the ability for users > to > > > > > provide > > > > > > their own custom IO/CPU ThreadPools is ideal however as stated > > doing so > > > > > > means a lot of boilerplatery changes to the API. Typically > > speaking a > > > > lot > > > > > > of monitoring/tracing/diagnosing is done on > > > > ExecutionContext/ThreadPools > > > > > > (at least on a more rudimentary level) and hence allowing users > to > > > > supply > > > > > a > > > > > > global singleton ThreadPool for IO tasks and another for CPU > tasks > > > > makes > > > > > > their lives a lot easier. However due to the large amount of > > changes to > > > > > the > > > > > > API, it may be more appropriate to just use internal thread pools > > (for > > > > > now) > > > > > > since at least it's not any worse than what exists currently and > > this > > > > can > > > > > > be an improvement that is done later? > > > > > Is there an existing threadpool that you suggest we reuse? Or are > > you > > > > > imagining that we make our own internal threadpool, and then maybe > > expose > > > > > configuration flags to manipulate it? For what it's worth, I like > > having > > > > > an internal threadpool (perhaps just FJP.commonpool) and then > > providing > > > > an > > > > > alternative to pass your own threadpool. That way people who want > > finer > > > > > control can get it, and everyone else can do OK with the default. > > > > > > > > > Indeed that is what I am saying. The most ideal situation is that > > there is > > > > a default internal threadpool that Kafka uses, however users of Kafka > > can > > > > configure there own threadpool. Having a singleton ThreadPool for > > blocking > > > > IO, non blocking IO and CPU bound tasks which can be plugged in all > of > > your > > > > libraries (including Kafka) makes resource management much easier to > > do and > > > > also gives control of users to override specific threadpools for > > > > exceptional cases (i.e. providing a Threadpool that is pinned to a > > single > > > > core which tends to give the best latency results if this is > something > > that > > > > is critical for you). > > > > > > > > My suggestion was just do this in multiple steps/phases, firstly > let's > > fix > > > > the issue of send being misleadingly asynchronous (i.e. internally > its > > > > blocking) and then later one we can make the various > > > > threadpools configurable with a sane default. > > > > > > > > > > > > > > > > > > > On Wed, May 19, 2021 at 6:01 AM Matthew de Detrich > > > > > <matthew.dedetr...@aiven.io.invalid> wrote: > > > > > > > > > > > Here are my two cents here (note that I am only seeing this on a > > > > surface > > > > > > level) > > > > > > > > > > > > 1. If we are going this road it makes sense to do this "properly" > > (i.e. > > > > > > using queues as Ryan suggested). The reason I am saying this is > > that > > > > it > > > > > > seems that the original goal of the KIP is for it to be used in > > other > > > > > > asynchronous systems and from my personal experience, you really > do > > > > need > > > > > to > > > > > > make the implementation properly asynchronous otherwise it's > > really not > > > > > > that useful. > > > > > > 2. Due to the previous point and what was said by others, this is > > > > likely > > > > > > going to break some existing semantics (i.e. people are currently > > > > relying > > > > > > on blocking semantics) so adding another method's/interface plus > > > > > > deprecating the older one is more annoying but ideal. > > > > > > 3. Using multiple thread pools is definitely recommended for > > different > > > > > > types of tasks, for serialization which is CPU bound you > definitely > > > > would > > > > > > want to use a bounded thread pool that is fixed by the number of > > CPU's > > > > > (or > > > > > > something along those lines). > > > > > > > https://gist.github.com/djspiewak/46b543800958cf61af6efa8e072bfd5c > > is > > > > a > > > > > > very good guide on this topic > > > > > > 4. Regarding providing the ability for users to supply their own > > custom > > > > > > ThreadPool this is more of an ergonomics question for the API. > > > > Especially > > > > > > when it gets to monitoring/tracing, giving the ability for users > to > > > > > provide > > > > > > their own custom IO/CPU ThreadPools is ideal however as stated > > doing so > > > > > > means a lot of boilerplatery changes to the API. Typically > > speaking a > > > > lot > > > > > > of monitoring/tracing/diagnosing is done on > > > > ExecutionContext/ThreadPools > > > > > > (at least on a more rudimentary level) and hence allowing users > to > > > > > supply a > > > > > > global singleton ThreadPool for IO tasks and another for CPU > tasks > > > > makes > > > > > > their lives a lot easier. However due to the large amount of > > changes to > > > > > the > > > > > > API, it may be more appropriate to just use internal thread pools > > (for > > > > > now) > > > > > > since at least it's not any worse than what exists currently and > > this > > > > can > > > > > > be an improvement that is done later? > > > > > > > > > > > > On Wed, May 19, 2021 at 2:56 AM Ryanne Dolan < > > ryannedo...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > I was thinking the sender would typically wrap send() in a > > > > > backoff/retry > > > > > > > loop, or else ignore any failures and drop sends on the floor > > > > > > > (fire-and-forget), and in both cases I think failing > immediately > > is > > > > > > better > > > > > > > than blocking for a new spot in the queue or asynchronously > > failing > > > > > > > somehow. > > > > > > > > > > > > > > I think a failed future is adequate for the "explicit > > backpressure > > > > > > signal" > > > > > > > while avoiding any blocking anywhere. I think if we try to > > > > > asynchronously > > > > > > > signal the caller of failure (either by asynchronously failing > > the > > > > > future > > > > > > > or invoking a callback off-thread or something) we'd force the > > caller > > > > > to > > > > > > > either block or poll waiting for that signal, which somewhat > > defeats > > > > > the > > > > > > > purpose we're after. And of course blocking for a spot in the > > queue > > > > > > > definitely defeats the purpose (tho perhaps ameliorates the > > problem > > > > > > some). > > > > > > > > > > > > > > Throwing an exception to the caller directly (not via the > > future) is > > > > > > > another option with precedent in Kafka clients, tho it doesn't > > seem > > > > as > > > > > > > ergonomic to me. > > > > > > > > > > > > > > It would be interesting to analyze some existing usage and > > determine > > > > > how > > > > > > > difficult it would be to convert it to the various proposed > APIs. > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > On Tue, May 18, 2021, 3:27 PM Nakamura <nny...@gmail.com> > wrote: > > > > > > > > > > > > > > > Hi Ryanne, > > > > > > > > > > > > > > > > Hmm, that's an interesting idea. Basically it would mean > that > > > > after > > > > > > > > calling send, you would also have to check whether the > returned > > > > > future > > > > > > > had > > > > > > > > failed with a specific exception. I would be open to it, > > although > > > > I > > > > > > > think > > > > > > > > it might be slightly more surprising, since right now the > > paradigm > > > > is > > > > > > > > "enqueue synchronously, the future represents whether we > > succeeded > > > > in > > > > > > > > sending or not" and the new one would be "enqueue > > synchronously, > > > > the > > > > > > > future > > > > > > > > either represents whether we succeeded in enqueueing or not > (in > > > > which > > > > > > > case > > > > > > > > it will be failed immediately if it failed to enqueue) or > > whether > > > > we > > > > > > > > succeeded in sending or not". > > > > > > > > > > > > > > > > But you're right, it should be on the table, thank you for > > > > suggesting > > > > > > it! > > > > > > > > > > > > > > > > Best, > > > > > > > > Moses > > > > > > > > > > > > > > > > On Tue, May 18, 2021 at 12:23 PM Ryanne Dolan < > > > > ryannedo...@gmail.com > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Moses, in the case of a full queue, could we just return a > > failed > > > > > > > future > > > > > > > > > immediately? > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > On Tue, May 18, 2021, 10:39 AM Nakamura <nny...@gmail.com> > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Alexandre, > > > > > > > > > > > > > > > > > > > > Thanks for bringing this up, I think I could use some > > feedback > > > > in > > > > > > > this > > > > > > > > > > area. There are two mechanisms here, one for slowing > down > > when > > > > > we > > > > > > > > don't > > > > > > > > > > have the relevant metadata, and the other for slowing > down > > > > when a > > > > > > > queue > > > > > > > > > has > > > > > > > > > > filled up. Although the first one applies backpressure > > > > somewhat > > > > > > > > > > inadvertently, we could still get in trouble if we're not > > > > > providing > > > > > > > > > > information to the mechanism that monitors whether we're > > > > queueing > > > > > > too > > > > > > > > > > much. As for the second one, that is a classic > > backpressure > > > > use > > > > > > > case, > > > > > > > > so > > > > > > > > > > it's definitely important that we don't drop that > ability. > > > > > > > > > > > > > > > > > > > > Right now backpressure is applied by blocking, which is a > > > > natural > > > > > > way > > > > > > > > to > > > > > > > > > > apply backpressure in synchronous systems, but can lead > to > > > > > > > unnecessary > > > > > > > > > > slowdowns in asynchronous systems. In my opinion, the > > safest > > > > way > > > > > > to > > > > > > > > > apply > > > > > > > > > > backpressure in an asynchronous model is to have an > > explicit > > > > > > > > backpressure > > > > > > > > > > signal. A good example would be returning an exception, > > and > > > > > > > providing > > > > > > > > an > > > > > > > > > > optional hook to add a callback onto so that you can be > > > > notified > > > > > > when > > > > > > > > > it's > > > > > > > > > > ready to accept more messages. > > > > > > > > > > > > > > > > > > > > However, this would be a really big change to how users > use > > > > > > > > > > KafkaProducer#send, so I don't know how much appetite we > > have > > > > for > > > > > > > > making > > > > > > > > > > that kind of change. Maybe it would be simpler to remove > > the > > > > > > "don't > > > > > > > > > block > > > > > > > > > > when the per-topic queue is full" from the scope of this > > KIP, > > > > and > > > > > > > only > > > > > > > > > > focus on when metadata is available? The downside is > that > > we > > > > > will > > > > > > > > > probably > > > > > > > > > > want to change the API again later to fix this, so it > > might be > > > > > > better > > > > > > > > to > > > > > > > > > > just rip the bandaid off now. > > > > > > > > > > > > > > > > > > > > One slightly nasty thing here is that because queueing > > order is > > > > > > > > > important, > > > > > > > > > > if we want to use exceptions, we will want to be able to > > signal > > > > > the > > > > > > > > > failure > > > > > > > > > > to enqueue to the caller in such a way that they can > still > > > > > enforce > > > > > > > > > message > > > > > > > > > > order if they want. So we can't embed the failure > > directly in > > > > > the > > > > > > > > > returned > > > > > > > > > > future, we should either return two futures (nested, or > as > > a > > > > > tuple) > > > > > > > or > > > > > > > > > else > > > > > > > > > > throw an exception to explain a backpressure. > > > > > > > > > > > > > > > > > > > > So there are a few things we should work out here: > > > > > > > > > > > > > > > > > > > > 1. Should we keep the "too many bytes enqueued" part of > > this in > > > > > > > scope? > > > > > > > > > (I > > > > > > > > > > would say yes, so that we can minimize churn in this API) > > > > > > > > > > 2. How should we signal backpressure so that it's > > appropriate > > > > for > > > > > > > > > > asynchronous systems? (I would say that we should throw > an > > > > > > > exception. > > > > > > > > > If > > > > > > > > > > we choose this and we want to pursue the queueing path, > we > > > > would > > > > > > > *not* > > > > > > > > > want > > > > > > > > > > to enqueue messages that would push us over the limit, > and > > > > would > > > > > > only > > > > > > > > > want > > > > > > > > > > to enqueue messages when we're waiting for metadata, and > we > > > > would > > > > > > > want > > > > > > > > to > > > > > > > > > > keep track of the total number of bytes for those > > messages). > > > > > > > > > > > > > > > > > > > > What do you think? > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Moses > > > > > > > > > > > > > > > > > > > > On Sun, May 16, 2021 at 6:16 AM Alexandre Dupriez < > > > > > > > > > > alexandre.dupr...@gmail.com> wrote: > > > > > > > > > > > > > > > > > > > > > Hello Nakamura, > > > > > > > > > > > > > > > > > > > > > > Thanks for proposing this change. I can see how the > > blocking > > > > > > > > behaviour > > > > > > > > > > > can be a problem when integrating with reactive > > frameworks > > > > such > > > > > > as > > > > > > > > > > > Akka. One of the questions I would have is how you > would > > > > handle > > > > > > > back > > > > > > > > > > > pressure and avoid memory exhaustion when the > producer's > > > > buffer > > > > > > is > > > > > > > > > > > full and tasks would start to accumulate in the > > out-of-band > > > > > queue > > > > > > > or > > > > > > > > > > > thread pool introduced with this KIP. > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > Alexandre > > > > > > > > > > > > > > > > > > > > > > Le ven. 14 mai 2021 à 15:55, Ryanne Dolan < > > > > > ryannedo...@gmail.com > > > > > > > > > > > > > > a > > > > > > > > > > écrit > > > > > > > > > > > : > > > > > > > > > > > > > > > > > > > > > > > > Makes sense! > > > > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > On Fri, May 14, 2021, 9:39 AM Nakamura < > > nny...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hey Ryanne, > > > > > > > > > > > > > > > > > > > > > > > > > > I see what you're saying about serde blocking, but > I > > > > think > > > > > we > > > > > > > > > should > > > > > > > > > > > > > consider it out of scope for this patch. Right now > > we've > > > > > > > nailed > > > > > > > > > > down a > > > > > > > > > > > > > couple of use cases where we can unambiguously say, > > "I > > > > can > > > > > > make > > > > > > > > > > > progress > > > > > > > > > > > > > now" or "I cannot make progress now", which makes > it > > > > > possible > > > > > > > to > > > > > > > > > > > offload to > > > > > > > > > > > > > a different thread only if we are unable to make > > > > progress. > > > > > > > > > Extending > > > > > > > > > > > this > > > > > > > > > > > > > to CPU work like serde would mean always > offloading, > > > > which > > > > > > > would > > > > > > > > > be a > > > > > > > > > > > > > really big performance change. It might be worth > > > > exploring > > > > > > > > anyway, > > > > > > > > > > > but I'd > > > > > > > > > > > > > rather keep this patch focused on improving > > ergonomics, > > > > > > rather > > > > > > > > than > > > > > > > > > > > > > muddying the waters with evaluating performance > very > > > > > deeply. > > > > > > > > > > > > > > > > > > > > > > > > > > I think if we really do want to support serde or > > > > > interceptors > > > > > > > > that > > > > > > > > > do > > > > > > > > > > > IO on > > > > > > > > > > > > > the send path (which seems like an anti-pattern to > > me), > > > > we > > > > > > > should > > > > > > > > > > > consider > > > > > > > > > > > > > making that a separate SIP, and probably also > > consider > > > > > > changing > > > > > > > > the > > > > > > > > > > > API to > > > > > > > > > > > > > use Futures (or CompletionStages). But I would > > rather > > > > > avoid > > > > > > > > scope > > > > > > > > > > > creep, > > > > > > > > > > > > > so that we have a better chance of fixing this part > > of > > > > the > > > > > > > > problem. > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, I think some exceptions will move to being > async > > > > > instead > > > > > > > of > > > > > > > > > > sync. > > > > > > > > > > > > > They'll still be surfaced in the Future, so I'm not > > so > > > > > > > confident > > > > > > > > > that > > > > > > > > > > > it > > > > > > > > > > > > > would be that big a shock to users though. > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > Moses > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 13, 2021 at 7:44 PM Ryanne Dolan < > > > > > > > > > ryannedo...@gmail.com> > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > re serialization, my concern is that > serialization > > > > often > > > > > > > > accounts > > > > > > > > > > > for a > > > > > > > > > > > > > lot > > > > > > > > > > > > > > of the cycles spent before returning the future. > > It's > > > > not > > > > > > > > > blocking > > > > > > > > > > > per > > > > > > > > > > > > > se, > > > > > > > > > > > > > > but it's the same effect from the caller's > > perspective. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Moreover, serde impls often block themselves, > e.g. > > when > > > > > > > > fetching > > > > > > > > > > > schemas > > > > > > > > > > > > > > from a registry. I suppose it's also possible to > > block > > > > in > > > > > > > > > > > Interceptors > > > > > > > > > > > > > > (e.g. writing audit events or metrics), which > > happens > > > > > > before > > > > > > > > > serdes > > > > > > > > > > > iiuc. > > > > > > > > > > > > > > So any blocking in either of those plugins would > > block > > > > > the > > > > > > > send > > > > > > > > > > > unless we > > > > > > > > > > > > > > queue first. > > > > > > > > > > > > > > > > > > > > > > > > > > > > So I think we want to queue first and do > everything > > > > > > > off-thread > > > > > > > > > when > > > > > > > > > > > using > > > > > > > > > > > > > > the new API, whatever that looks like. I just > want > > to > > > > > make > > > > > > > sure > > > > > > > > > we > > > > > > > > > > > don't > > > > > > > > > > > > > do > > > > > > > > > > > > > > that for clients that wouldn't expect it. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Another consideration is exception handling. If > we > > > > queue > > > > > > > right > > > > > > > > > > away, > > > > > > > > > > > > > we'll > > > > > > > > > > > > > > defer some exceptions that currently are thrown > to > > the > > > > > > caller > > > > > > > > > > > (before the > > > > > > > > > > > > > > future is returned). In the new API, the send() > > > > wouldn't > > > > > > > throw > > > > > > > > > any > > > > > > > > > > > > > > exceptions, and instead the future would fail. I > > think > > > > > that > > > > > > > > might > > > > > > > > > > > mean > > > > > > > > > > > > > that > > > > > > > > > > > > > > a new method signature is required. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 13, 2021, 2:57 PM Nakamura < > > > > > > > > nakamura.mo...@gmail.com > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey Ryanne, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I agree we should add an additional constructor > > (or > > > > > else > > > > > > an > > > > > > > > > > > additional > > > > > > > > > > > > > > > overload in KafkaProducer#send, but the new > > > > constructor > > > > > > > would > > > > > > > > > be > > > > > > > > > > > easier > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > understand) if we're targeting the "user > > provides the > > > > > > > thread" > > > > > > > > > > > approach. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > From looking at the code, I think we can keep > > record > > > > > > > > > > serialization > > > > > > > > > > > on > > > > > > > > > > > > > the > > > > > > > > > > > > > > > user thread, if we consider that an important > > part of > > > > > the > > > > > > > > > > > semantics of > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > method. It doesn't seem like serialization > > depends > > > > on > > > > > > > > knowing > > > > > > > > > > the > > > > > > > > > > > > > > cluster, > > > > > > > > > > > > > > > I think it's incidental that it comes after the > > first > > > > > > > > > "blocking" > > > > > > > > > > > > > segment > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > the method. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > Moses > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 13, 2021 at 2:38 PM Ryanne Dolan < > > > > > > > > > > > ryannedo...@gmail.com> > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey Moses, I like the direction here. My > > thinking > > > > is > > > > > > > that a > > > > > > > > > > > single > > > > > > > > > > > > > > > > additional work queue, s.t. send() can > enqueue > > and > > > > > > > return, > > > > > > > > > > seems > > > > > > > > > > > like > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > lightest touch. However, I don't think we can > > > > > trivially > > > > > > > > > process > > > > > > > > > > > that > > > > > > > > > > > > > > > queue > > > > > > > > > > > > > > > > in an internal thread pool without subtly > > changing > > > > > > > behavior > > > > > > > > > for > > > > > > > > > > > some > > > > > > > > > > > > > > > users. > > > > > > > > > > > > > > > > For example, users will often run send() in > > > > multiple > > > > > > > > threads > > > > > > > > > in > > > > > > > > > > > order > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > serialize faster, but that wouldn't work > quite > > the > > > > > same > > > > > > > if > > > > > > > > > > there > > > > > > > > > > > were > > > > > > > > > > > > > > an > > > > > > > > > > > > > > > > internal thread pool. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For this reason I'm thinking we need to make > > sure > > > > any > > > > > > > such > > > > > > > > > > > changes > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > opt-in. Maybe a new constructor with an > > additional > > > > > > > > > > ThreadFactory > > > > > > > > > > > > > > > parameter. > > > > > > > > > > > > > > > > That would at least clearly indicate that > work > > will > > > > > > > happen > > > > > > > > > > > > > off-thread, > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > would require opt-in for the new behavior. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Under the hood, this ThreadFactory could be > > used to > > > > > > > create > > > > > > > > > the > > > > > > > > > > > worker > > > > > > > > > > > > > > > > thread that process queued sends, which could > > > > fan-out > > > > > > to > > > > > > > > > > > > > per-partition > > > > > > > > > > > > > > > > threads from there. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > So then you'd have two ways to send: the > > existing > > > > > way, > > > > > > > > where > > > > > > > > > > > serde > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > interceptors and whatnot are executed on the > > > > calling > > > > > > > > thread, > > > > > > > > > > and > > > > > > > > > > > the > > > > > > > > > > > > > > new > > > > > > > > > > > > > > > > way, which returns right away and uses an > > internal > > > > > > > > Executor. > > > > > > > > > As > > > > > > > > > > > you > > > > > > > > > > > > > > point > > > > > > > > > > > > > > > > out, the semantics would be identical in > either > > > > case, > > > > > > and > > > > > > > > it > > > > > > > > > > > would be > > > > > > > > > > > > > > > very > > > > > > > > > > > > > > > > easy for clients to switch. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 13, 2021, 9:00 AM Nakamura < > > > > > > nny...@gmail.com > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey Folks, > > > > > > > > > > > > > > > > > I just posted a new proposal > > > > > > > > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > in the wiki. I think we have an > opportunity > > to > > > > > > improve > > > > > > > > the > > > > > > > > > > > > > > > > > KafkaProducer#send user experience. It > would > > > > > > certainly > > > > > > > > > make > > > > > > > > > > > our > > > > > > > > > > > > > > lives > > > > > > > > > > > > > > > > > easier. Please take a look! There are two > > > > > > subproblems > > > > > > > > > that > > > > > > > > > > I > > > > > > > > > > > > > could > > > > > > > > > > > > > > > use > > > > > > > > > > > > > > > > > guidance on, so I would appreciate feedback > > on > > > > both > > > > > > of > > > > > > > > > them. > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > Moses > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > Matthew de Detrich > > > > > > > > > > > > *Aiven Deutschland GmbH* > > > > > > > > > > > > Immanuelkirchstraße 26, 10405 Berlin > > > > > > > > > > > > Amtsgericht Charlottenburg, HRB 209739 B > > > > > > > > > > > > *m:* +491603708037 > > > > > > > > > > > > *w:* aiven.io *e:* matthew.dedetr...@aiven.io > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Matthew de Detrich > > > > > > > > *Aiven Deutschland GmbH* > > > > > > > > Immanuelkirchstraße 26, 10405 Berlin > > > > > > > > Amtsgericht Charlottenburg, HRB 209739 B > > > > > > > > *m:* +491603708037 > > > > > > > > *w:* aiven.io *e:* matthew.dedetr...@aiven.io > > > > > > > > > > > > -- > > Matthew de Detrich > > *Aiven Deutschland GmbH* > > Immanuelkirchstraße 26, 10405 Berlin > > Amtsgericht Charlottenburg, HRB 209739 B > > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen > > *m:* +491603708037 > > *w:* aiven.io *e:* matthew.dedetr...@aiven.io >