On Tue, Jun 1, 2021, at 07:00, Nakamura wrote: > Hi Colin, > > Sorry, I still don't follow. > > Right now `KafkaProducer#send` seems to trigger a metadata fetch. Today, > we block on that before returning. Is your proposal that we move the > metadata fetch out of `KafkaProducer#send` entirely? >
KafkaProducer#send is supposed to initiate non-blocking I/O, but not wait for it to complete. There's more information about non-blocking I/O in Java here: https://en.wikipedia.org/wiki/Non-blocking_I/O_%28Java%29 > > Even if the metadata fetch moves to be non-blocking, I think we still need > to deal with the problems we've discussed before if the fetch happens in > the `KafkaProducer#send` method. How do we maintain the ordering semantics > of `KafkaProducer#send`? How are the ordering semantics of `KafkaProducer#send` related to the metadata fetch? > How do we prevent our buffer from filling up? That is not related to the metadata fetch. Also, I already proposed a solution (returning an error) if this is a concern. > Which thread is responsible for checking poll()? The same client thread that always has been responsible for checking poll. > > The only approach I can see that would avoid this would be moving the > metadata fetch to happen at a different time. But it's not clear to me > when would be a more appropriate time to do the metadata fetch than > `KafkaProducer#send`. > It's not about moving the metadata fetch to happen at a different time. It's about using non-blocking I/O, like we do for other network I/O. (And actually, if you want to get really technical, we do this for the metadata fetch too, it's just that we have a hack that loops to transform it back into blocking I/O.) best, Colin > I think there's something I'm missing here. Would you mind helping me > figure out what it is? > > Best, > Moses > > On Sun, May 30, 2021 at 5:35 PM Colin McCabe <cmcc...@apache.org> wrote: > > > On Tue, May 25, 2021, at 11:26, Nakamura wrote: > > > Hey Colin, > > > > > > For the metadata case, what would fixing the bug look like? I agree that > > > we should fix it, but I don't have a clear picture in my mind of what > > > fixing it should look like. Can you elaborate? > > > > > > > If the blocking metadata fetch bug were fixed, neither the producer nor > > the consumer would block while fetching metadata. A poll() call would > > initiate a metadata fetch if needed, and a subsequent call to poll() would > > handle the results if needed. Basically the same paradigm we use for other > > network communication in the producer and consumer. > > > > best, > > Colin > > > > > Best, > > > Moses > > > > > > On Mon, May 24, 2021 at 1:54 PM Colin McCabe <cmcc...@apache.org> wrote: > > > > > > > Hi all, > > > > > > > > I agree that we should give users the option of having a fully async > > API, > > > > but I don't think external thread pools or queues are the right > > direction > > > > to go here. They add performance overheads and don't address the root > > > > causes of the problem. > > > > > > > > There are basically two scenarios where we block, currently. One is > > when > > > > we are doing a metadata fetch. I think this is clearly a bug, or at > > least > > > > an implementation limitation. From the user's point of view, the fact > > that > > > > we are doing a metadata fetch is an implementation detail that really > > > > shouldn't be exposed like this. We have talked about fixing this in the > > > > past. I think we just should spend the time to do it. > > > > > > > > The second scenario is where the client has produced too much data in > > too > > > > little time. This could happen if there is a network glitch, or the > > server > > > > is slower than expected. In this case, the behavior is intentional and > > not > > > > a bug. To understand this, think about what would happen if we didn't > > > > block. We would start buffering more and more data in memory, until > > finally > > > > the application died with an out of memory error. That would be > > frustrating > > > > for users and wouldn't add to the usability of Kafka. > > > > > > > > We could potentially have an option to handle the out-of-memory > > scenario > > > > differently by returning an error code immediately rather than > > blocking. > > > > Applications would have to be rewritten to handle this properly, but > > it is > > > > a possibility. I suspect that most of them wouldn't use this, but we > > could > > > > offer it as a possibility for async purists (which might include > > certain > > > > frameworks). The big problem the users would have to solve is what to > > do > > > > with the record that they were unable to produce due to the buffer full > > > > issue. > > > > > > > > best, > > > > Colin > > > > > > > > > > > > On Thu, May 20, 2021, at 10:35, Nakamura wrote: > > > > > > > > > > > > My suggestion was just do this in multiple steps/phases, firstly > > let's > > > > fix > > > > > > the issue of send being misleadingly asynchronous (i.e. internally > > its > > > > > > blocking) and then later one we can make the various > > > > > > threadpools configurable with a sane default. > > > > > > > > > > I like that approach. I updated the "Which thread should be > > responsible > > > > for > > > > > waiting" part of KIP-739 to add your suggestion as my recommended > > > > approach, > > > > > thank you! If no one else has major concerns about that approach, > > I'll > > > > > move the alternatives to "rejected alternatives". > > > > > > > > > > On Thu, May 20, 2021 at 7:26 AM Matthew de Detrich > > > > > <matthew.dedetr...@aiven.io.invalid> wrote: > > > > > > > > > > > @ > > > > > > > > > > > > Nakamura > > > > > > On Wed, May 19, 2021 at 7:35 PM Nakamura <nny...@gmail.com> wrote: > > > > > > > > > > > > > @Ryanne: > > > > > > > In my mind's eye I slightly prefer the throwing the "cannot > > enqueue" > > > > > > > exception to satisfying the future immediately with the "cannot > > > > enqueue" > > > > > > > exception? But I agree, it would be worth doing more research. > > > > > > > > > > > > > > @Matthew: > > > > > > > > > > > > > > > 3. Using multiple thread pools is definitely recommended for > > > > different > > > > > > > > types of tasks, for serialization which is CPU bound you > > definitely > > > > > > would > > > > > > > > want to use a bounded thread pool that is fixed by the number > > of > > > > CPU's > > > > > > > (or > > > > > > > > something along those lines). > > > > > > > > > > https://gist.github.com/djspiewak/46b543800958cf61af6efa8e072bfd5c > > > > is > > > > > > a > > > > > > > > very good guide on this topic > > > > > > > I think this guide is good in general, but I would be hesitant to > > > > follow > > > > > > > its guidance re: offloading serialization without benchmarking > > it. > > > > My > > > > > > > understanding is that context-switches have gotten much cheaper, > > and > > > > that > > > > > > > gains from cache locality are small, but they're not nothing. > > > > Especially > > > > > > > if the workload has a very small serialization cost, I wouldn't > > be > > > > > > shocked > > > > > > > if it made it slower. I feel pretty strongly that we should do > > more > > > > > > > research here before unconditionally encouraging serialization > > in a > > > > > > > threadpool. If people think it's important to do it here (eg if > > we > > > > think > > > > > > > it would mean another big API change) then we should start > > thinking > > > > about > > > > > > > what benchmarking we can do to gain higher confidence in this > > kind of > > > > > > > change. However, I don't think it would change semantics as > > > > > > substantially > > > > > > > as we're proposing here, so I would vote for pushing this to a > > > > subsequent > > > > > > > KIP. > > > > > > > > > > > > > Of course, its all down to benchmarking, benchmarking and > > benchmarking. > > > > > > Ideally speaking you want to use all of the resources available to > > > > you, so > > > > > > if you have a bottleneck in serialization and you have many cores > > free > > > > then > > > > > > using multiple cores may be more appropriate than a single core. > > > > Typically > > > > > > I would expect that using a single thread to do serialization is > > > > likely to > > > > > > be the most situation, I was just responding to an earlier point > > that > > > > was > > > > > > made in regards to using ThreadPools for serialization (note that > > you > > > > can > > > > > > also just use a ThreadPool that is pinned to a single thread) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 4. Regarding providing the ability for users to supply their > > own > > > > custom > > > > > > > > ThreadPool this is more of an ergonomics question for the API. > > > > > > Especially > > > > > > > > when it gets to monitoring/tracing, giving the ability for > > users to > > > > > > > provide > > > > > > > > their own custom IO/CPU ThreadPools is ideal however as stated > > > > doing so > > > > > > > > means a lot of boilerplatery changes to the API. Typically > > > > speaking a > > > > > > lot > > > > > > > > of monitoring/tracing/diagnosing is done on > > > > > > ExecutionContext/ThreadPools > > > > > > > > (at least on a more rudimentary level) and hence allowing > > users to > > > > > > supply > > > > > > > a > > > > > > > > global singleton ThreadPool for IO tasks and another for CPU > > tasks > > > > > > makes > > > > > > > > their lives a lot easier. However due to the large amount of > > > > changes to > > > > > > > the > > > > > > > > API, it may be more appropriate to just use internal thread > > pools > > > > (for > > > > > > > now) > > > > > > > > since at least it's not any worse than what exists currently > > and > > > > this > > > > > > can > > > > > > > > be an improvement that is done later? > > > > > > > Is there an existing threadpool that you suggest we reuse? Or > > are > > > > you > > > > > > > imagining that we make our own internal threadpool, and then > > maybe > > > > expose > > > > > > > configuration flags to manipulate it? For what it's worth, I > > like > > > > having > > > > > > > an internal threadpool (perhaps just FJP.commonpool) and then > > > > providing > > > > > > an > > > > > > > alternative to pass your own threadpool. That way people who > > want > > > > finer > > > > > > > control can get it, and everyone else can do OK with the default. > > > > > > > > > > > > > Indeed that is what I am saying. The most ideal situation is that > > > > there is > > > > > > a default internal threadpool that Kafka uses, however users of > > Kafka > > > > can > > > > > > configure there own threadpool. Having a singleton ThreadPool for > > > > blocking > > > > > > IO, non blocking IO and CPU bound tasks which can be plugged in > > all of > > > > your > > > > > > libraries (including Kafka) makes resource management much easier > > to > > > > do and > > > > > > also gives control of users to override specific threadpools for > > > > > > exceptional cases (i.e. providing a Threadpool that is pinned to a > > > > single > > > > > > core which tends to give the best latency results if this is > > something > > > > that > > > > > > is critical for you). > > > > > > > > > > > > My suggestion was just do this in multiple steps/phases, firstly > > let's > > > > fix > > > > > > the issue of send being misleadingly asynchronous (i.e. internally > > its > > > > > > blocking) and then later one we can make the various > > > > > > threadpools configurable with a sane default. > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, May 19, 2021 at 6:01 AM Matthew de Detrich > > > > > > > <matthew.dedetr...@aiven.io.invalid> wrote: > > > > > > > > > > > > > > > Here are my two cents here (note that I am only seeing this on > > a > > > > > > surface > > > > > > > > level) > > > > > > > > > > > > > > > > 1. If we are going this road it makes sense to do this > > "properly" > > > > (i.e. > > > > > > > > using queues as Ryan suggested). The reason I am saying this > > is > > > > that > > > > > > it > > > > > > > > seems that the original goal of the KIP is for it to be used in > > > > other > > > > > > > > asynchronous systems and from my personal experience, you > > really do > > > > > > need > > > > > > > to > > > > > > > > make the implementation properly asynchronous otherwise it's > > > > really not > > > > > > > > that useful. > > > > > > > > 2. Due to the previous point and what was said by others, this > > is > > > > > > likely > > > > > > > > going to break some existing semantics (i.e. people are > > currently > > > > > > relying > > > > > > > > on blocking semantics) so adding another method's/interface > > plus > > > > > > > > deprecating the older one is more annoying but ideal. > > > > > > > > 3. Using multiple thread pools is definitely recommended for > > > > different > > > > > > > > types of tasks, for serialization which is CPU bound you > > definitely > > > > > > would > > > > > > > > want to use a bounded thread pool that is fixed by the number > > of > > > > CPU's > > > > > > > (or > > > > > > > > something along those lines). > > > > > > > > > > https://gist.github.com/djspiewak/46b543800958cf61af6efa8e072bfd5c > > > > is > > > > > > a > > > > > > > > very good guide on this topic > > > > > > > > 4. Regarding providing the ability for users to supply their > > own > > > > custom > > > > > > > > ThreadPool this is more of an ergonomics question for the API. > > > > > > Especially > > > > > > > > when it gets to monitoring/tracing, giving the ability for > > users to > > > > > > > provide > > > > > > > > their own custom IO/CPU ThreadPools is ideal however as stated > > > > doing so > > > > > > > > means a lot of boilerplatery changes to the API. Typically > > > > speaking a > > > > > > lot > > > > > > > > of monitoring/tracing/diagnosing is done on > > > > > > ExecutionContext/ThreadPools > > > > > > > > (at least on a more rudimentary level) and hence allowing > > users to > > > > > > > supply a > > > > > > > > global singleton ThreadPool for IO tasks and another for CPU > > tasks > > > > > > makes > > > > > > > > their lives a lot easier. However due to the large amount of > > > > changes to > > > > > > > the > > > > > > > > API, it may be more appropriate to just use internal thread > > pools > > > > (for > > > > > > > now) > > > > > > > > since at least it's not any worse than what exists currently > > and > > > > this > > > > > > can > > > > > > > > be an improvement that is done later? > > > > > > > > > > > > > > > > On Wed, May 19, 2021 at 2:56 AM Ryanne Dolan < > > > > ryannedo...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > I was thinking the sender would typically wrap send() in a > > > > > > > backoff/retry > > > > > > > > > loop, or else ignore any failures and drop sends on the floor > > > > > > > > > (fire-and-forget), and in both cases I think failing > > immediately > > > > is > > > > > > > > better > > > > > > > > > than blocking for a new spot in the queue or asynchronously > > > > failing > > > > > > > > > somehow. > > > > > > > > > > > > > > > > > > I think a failed future is adequate for the "explicit > > > > backpressure > > > > > > > > signal" > > > > > > > > > while avoiding any blocking anywhere. I think if we try to > > > > > > > asynchronously > > > > > > > > > signal the caller of failure (either by asynchronously > > failing > > > > the > > > > > > > future > > > > > > > > > or invoking a callback off-thread or something) we'd force > > the > > > > caller > > > > > > > to > > > > > > > > > either block or poll waiting for that signal, which somewhat > > > > defeats > > > > > > > the > > > > > > > > > purpose we're after. And of course blocking for a spot in the > > > > queue > > > > > > > > > definitely defeats the purpose (tho perhaps ameliorates the > > > > problem > > > > > > > > some). > > > > > > > > > > > > > > > > > > Throwing an exception to the caller directly (not via the > > > > future) is > > > > > > > > > another option with precedent in Kafka clients, tho it > > doesn't > > > > seem > > > > > > as > > > > > > > > > ergonomic to me. > > > > > > > > > > > > > > > > > > It would be interesting to analyze some existing usage and > > > > determine > > > > > > > how > > > > > > > > > difficult it would be to convert it to the various proposed > > APIs. > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > On Tue, May 18, 2021, 3:27 PM Nakamura <nny...@gmail.com> > > wrote: > > > > > > > > > > > > > > > > > > > Hi Ryanne, > > > > > > > > > > > > > > > > > > > > Hmm, that's an interesting idea. Basically it would mean > > that > > > > > > after > > > > > > > > > > calling send, you would also have to check whether the > > returned > > > > > > > future > > > > > > > > > had > > > > > > > > > > failed with a specific exception. I would be open to it, > > > > although > > > > > > I > > > > > > > > > think > > > > > > > > > > it might be slightly more surprising, since right now the > > > > paradigm > > > > > > is > > > > > > > > > > "enqueue synchronously, the future represents whether we > > > > succeeded > > > > > > in > > > > > > > > > > sending or not" and the new one would be "enqueue > > > > synchronously, > > > > > > the > > > > > > > > > future > > > > > > > > > > either represents whether we succeeded in enqueueing or > > not (in > > > > > > which > > > > > > > > > case > > > > > > > > > > it will be failed immediately if it failed to enqueue) or > > > > whether > > > > > > we > > > > > > > > > > succeeded in sending or not". > > > > > > > > > > > > > > > > > > > > But you're right, it should be on the table, thank you for > > > > > > suggesting > > > > > > > > it! > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Moses > > > > > > > > > > > > > > > > > > > > On Tue, May 18, 2021 at 12:23 PM Ryanne Dolan < > > > > > > ryannedo...@gmail.com > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Moses, in the case of a full queue, could we just return > > a > > > > failed > > > > > > > > > future > > > > > > > > > > > immediately? > > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > On Tue, May 18, 2021, 10:39 AM Nakamura < > > nny...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi Alexandre, > > > > > > > > > > > > > > > > > > > > > > > > Thanks for bringing this up, I think I could use some > > > > feedback > > > > > > in > > > > > > > > > this > > > > > > > > > > > > area. There are two mechanisms here, one for slowing > > down > > > > when > > > > > > > we > > > > > > > > > > don't > > > > > > > > > > > > have the relevant metadata, and the other for slowing > > down > > > > > > when a > > > > > > > > > queue > > > > > > > > > > > has > > > > > > > > > > > > filled up. Although the first one applies backpressure > > > > > > somewhat > > > > > > > > > > > > inadvertently, we could still get in trouble if we're > > not > > > > > > > providing > > > > > > > > > > > > information to the mechanism that monitors whether > > we're > > > > > > queueing > > > > > > > > too > > > > > > > > > > > > much. As for the second one, that is a classic > > > > backpressure > > > > > > use > > > > > > > > > case, > > > > > > > > > > so > > > > > > > > > > > > it's definitely important that we don't drop that > > ability. > > > > > > > > > > > > > > > > > > > > > > > > Right now backpressure is applied by blocking, which > > is a > > > > > > natural > > > > > > > > way > > > > > > > > > > to > > > > > > > > > > > > apply backpressure in synchronous systems, but can > > lead to > > > > > > > > > unnecessary > > > > > > > > > > > > slowdowns in asynchronous systems. In my opinion, the > > > > safest > > > > > > way > > > > > > > > to > > > > > > > > > > > apply > > > > > > > > > > > > backpressure in an asynchronous model is to have an > > > > explicit > > > > > > > > > > backpressure > > > > > > > > > > > > signal. A good example would be returning an > > exception, > > > > and > > > > > > > > > providing > > > > > > > > > > an > > > > > > > > > > > > optional hook to add a callback onto so that you can be > > > > > > notified > > > > > > > > when > > > > > > > > > > > it's > > > > > > > > > > > > ready to accept more messages. > > > > > > > > > > > > > > > > > > > > > > > > However, this would be a really big change to how > > users use > > > > > > > > > > > > KafkaProducer#send, so I don't know how much appetite > > we > > > > have > > > > > > for > > > > > > > > > > making > > > > > > > > > > > > that kind of change. Maybe it would be simpler to > > remove > > > > the > > > > > > > > "don't > > > > > > > > > > > block > > > > > > > > > > > > when the per-topic queue is full" from the scope of > > this > > > > KIP, > > > > > > and > > > > > > > > > only > > > > > > > > > > > > focus on when metadata is available? The downside is > > that > > > > we > > > > > > > will > > > > > > > > > > > probably > > > > > > > > > > > > want to change the API again later to fix this, so it > > > > might be > > > > > > > > better > > > > > > > > > > to > > > > > > > > > > > > just rip the bandaid off now. > > > > > > > > > > > > > > > > > > > > > > > > One slightly nasty thing here is that because queueing > > > > order is > > > > > > > > > > > important, > > > > > > > > > > > > if we want to use exceptions, we will want to be able > > to > > > > signal > > > > > > > the > > > > > > > > > > > failure > > > > > > > > > > > > to enqueue to the caller in such a way that they can > > still > > > > > > > enforce > > > > > > > > > > > message > > > > > > > > > > > > order if they want. So we can't embed the failure > > > > directly in > > > > > > > the > > > > > > > > > > > returned > > > > > > > > > > > > future, we should either return two futures (nested, > > or as > > > > a > > > > > > > tuple) > > > > > > > > > or > > > > > > > > > > > else > > > > > > > > > > > > throw an exception to explain a backpressure. > > > > > > > > > > > > > > > > > > > > > > > > So there are a few things we should work out here: > > > > > > > > > > > > > > > > > > > > > > > > 1. Should we keep the "too many bytes enqueued" part of > > > > this in > > > > > > > > > scope? > > > > > > > > > > > (I > > > > > > > > > > > > would say yes, so that we can minimize churn in this > > API) > > > > > > > > > > > > 2. How should we signal backpressure so that it's > > > > appropriate > > > > > > for > > > > > > > > > > > > asynchronous systems? (I would say that we should > > throw an > > > > > > > > > exception. > > > > > > > > > > > If > > > > > > > > > > > > we choose this and we want to pursue the queueing > > path, we > > > > > > would > > > > > > > > > *not* > > > > > > > > > > > want > > > > > > > > > > > > to enqueue messages that would push us over the limit, > > and > > > > > > would > > > > > > > > only > > > > > > > > > > > want > > > > > > > > > > > > to enqueue messages when we're waiting for metadata, > > and we > > > > > > would > > > > > > > > > want > > > > > > > > > > to > > > > > > > > > > > > keep track of the total number of bytes for those > > > > messages). > > > > > > > > > > > > > > > > > > > > > > > > What do you think? > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > Moses > > > > > > > > > > > > > > > > > > > > > > > > On Sun, May 16, 2021 at 6:16 AM Alexandre Dupriez < > > > > > > > > > > > > alexandre.dupr...@gmail.com> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hello Nakamura, > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for proposing this change. I can see how the > > > > blocking > > > > > > > > > > behaviour > > > > > > > > > > > > > can be a problem when integrating with reactive > > > > frameworks > > > > > > such > > > > > > > > as > > > > > > > > > > > > > Akka. One of the questions I would have is how you > > would > > > > > > handle > > > > > > > > > back > > > > > > > > > > > > > pressure and avoid memory exhaustion when the > > producer's > > > > > > buffer > > > > > > > > is > > > > > > > > > > > > > full and tasks would start to accumulate in the > > > > out-of-band > > > > > > > queue > > > > > > > > > or > > > > > > > > > > > > > thread pool introduced with this KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > Alexandre > > > > > > > > > > > > > > > > > > > > > > > > > > Le ven. 14 mai 2021 à 15:55, Ryanne Dolan < > > > > > > > ryannedo...@gmail.com > > > > > > > > > > > > > > > > > > a > > > > > > > > > > > > écrit > > > > > > > > > > > > > : > > > > > > > > > > > > > > > > > > > > > > > > > > > > Makes sense! > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, May 14, 2021, 9:39 AM Nakamura < > > > > nny...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey Ryanne, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I see what you're saying about serde blocking, > > but I > > > > > > think > > > > > > > we > > > > > > > > > > > should > > > > > > > > > > > > > > > consider it out of scope for this patch. Right > > now > > > > we've > > > > > > > > > nailed > > > > > > > > > > > > down a > > > > > > > > > > > > > > > couple of use cases where we can unambiguously > > say, > > > > "I > > > > > > can > > > > > > > > make > > > > > > > > > > > > > progress > > > > > > > > > > > > > > > now" or "I cannot make progress now", which > > makes it > > > > > > > possible > > > > > > > > > to > > > > > > > > > > > > > offload to > > > > > > > > > > > > > > > a different thread only if we are unable to make > > > > > > progress. > > > > > > > > > > > Extending > > > > > > > > > > > > > this > > > > > > > > > > > > > > > to CPU work like serde would mean always > > offloading, > > > > > > which > > > > > > > > > would > > > > > > > > > > > be a > > > > > > > > > > > > > > > really big performance change. It might be worth > > > > > > exploring > > > > > > > > > > anyway, > > > > > > > > > > > > > but I'd > > > > > > > > > > > > > > > rather keep this patch focused on improving > > > > ergonomics, > > > > > > > > rather > > > > > > > > > > than > > > > > > > > > > > > > > > muddying the waters with evaluating performance > > very > > > > > > > deeply. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think if we really do want to support serde or > > > > > > > interceptors > > > > > > > > > > that > > > > > > > > > > > do > > > > > > > > > > > > > IO on > > > > > > > > > > > > > > > the send path (which seems like an anti-pattern > > to > > > > me), > > > > > > we > > > > > > > > > should > > > > > > > > > > > > > consider > > > > > > > > > > > > > > > making that a separate SIP, and probably also > > > > consider > > > > > > > > changing > > > > > > > > > > the > > > > > > > > > > > > > API to > > > > > > > > > > > > > > > use Futures (or CompletionStages). But I would > > > > rather > > > > > > > avoid > > > > > > > > > > scope > > > > > > > > > > > > > creep, > > > > > > > > > > > > > > > so that we have a better chance of fixing this > > part > > > > of > > > > > > the > > > > > > > > > > problem. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, I think some exceptions will move to being > > async > > > > > > > instead > > > > > > > > > of > > > > > > > > > > > > sync. > > > > > > > > > > > > > > > They'll still be surfaced in the Future, so I'm > > not > > > > so > > > > > > > > > confident > > > > > > > > > > > that > > > > > > > > > > > > > it > > > > > > > > > > > > > > > would be that big a shock to users though. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > Moses > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 13, 2021 at 7:44 PM Ryanne Dolan < > > > > > > > > > > > ryannedo...@gmail.com> > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > re serialization, my concern is that > > serialization > > > > > > often > > > > > > > > > > accounts > > > > > > > > > > > > > for a > > > > > > > > > > > > > > > lot > > > > > > > > > > > > > > > > of the cycles spent before returning the > > future. > > > > It's > > > > > > not > > > > > > > > > > > blocking > > > > > > > > > > > > > per > > > > > > > > > > > > > > > se, > > > > > > > > > > > > > > > > but it's the same effect from the caller's > > > > perspective. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Moreover, serde impls often block themselves, > > e.g. > > > > when > > > > > > > > > > fetching > > > > > > > > > > > > > schemas > > > > > > > > > > > > > > > > from a registry. I suppose it's also possible > > to > > > > block > > > > > > in > > > > > > > > > > > > > Interceptors > > > > > > > > > > > > > > > > (e.g. writing audit events or metrics), which > > > > happens > > > > > > > > before > > > > > > > > > > > serdes > > > > > > > > > > > > > iiuc. > > > > > > > > > > > > > > > > So any blocking in either of those plugins > > would > > > > block > > > > > > > the > > > > > > > > > send > > > > > > > > > > > > > unless we > > > > > > > > > > > > > > > > queue first. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > So I think we want to queue first and do > > everything > > > > > > > > > off-thread > > > > > > > > > > > when > > > > > > > > > > > > > using > > > > > > > > > > > > > > > > the new API, whatever that looks like. I just > > want > > > > to > > > > > > > make > > > > > > > > > sure > > > > > > > > > > > we > > > > > > > > > > > > > don't > > > > > > > > > > > > > > > do > > > > > > > > > > > > > > > > that for clients that wouldn't expect it. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Another consideration is exception handling. > > If we > > > > > > queue > > > > > > > > > right > > > > > > > > > > > > away, > > > > > > > > > > > > > > > we'll > > > > > > > > > > > > > > > > defer some exceptions that currently are > > thrown to > > > > the > > > > > > > > caller > > > > > > > > > > > > > (before the > > > > > > > > > > > > > > > > future is returned). In the new API, the send() > > > > > > wouldn't > > > > > > > > > throw > > > > > > > > > > > any > > > > > > > > > > > > > > > > exceptions, and instead the future would fail. > > I > > > > think > > > > > > > that > > > > > > > > > > might > > > > > > > > > > > > > mean > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > a new method signature is required. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 13, 2021, 2:57 PM Nakamura < > > > > > > > > > > nakamura.mo...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey Ryanne, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I agree we should add an additional > > constructor > > > > (or > > > > > > > else > > > > > > > > an > > > > > > > > > > > > > additional > > > > > > > > > > > > > > > > > overload in KafkaProducer#send, but the new > > > > > > constructor > > > > > > > > > would > > > > > > > > > > > be > > > > > > > > > > > > > easier > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > understand) if we're targeting the "user > > > > provides the > > > > > > > > > thread" > > > > > > > > > > > > > approach. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > From looking at the code, I think we can keep > > > > record > > > > > > > > > > > > serialization > > > > > > > > > > > > > on > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > user thread, if we consider that an important > > > > part of > > > > > > > the > > > > > > > > > > > > > semantics of > > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > method. It doesn't seem like serialization > > > > depends > > > > > > on > > > > > > > > > > knowing > > > > > > > > > > > > the > > > > > > > > > > > > > > > > cluster, > > > > > > > > > > > > > > > > > I think it's incidental that it comes after > > the > > > > first > > > > > > > > > > > "blocking" > > > > > > > > > > > > > > > segment > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > the method. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > Moses > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 13, 2021 at 2:38 PM Ryanne Dolan > > < > > > > > > > > > > > > > ryannedo...@gmail.com> > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey Moses, I like the direction here. My > > > > thinking > > > > > > is > > > > > > > > > that a > > > > > > > > > > > > > single > > > > > > > > > > > > > > > > > > additional work queue, s.t. send() can > > enqueue > > > > and > > > > > > > > > return, > > > > > > > > > > > > seems > > > > > > > > > > > > > like > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > lightest touch. However, I don't think we > > can > > > > > > > trivially > > > > > > > > > > > process > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > queue > > > > > > > > > > > > > > > > > > in an internal thread pool without subtly > > > > changing > > > > > > > > > behavior > > > > > > > > > > > for > > > > > > > > > > > > > some > > > > > > > > > > > > > > > > > users. > > > > > > > > > > > > > > > > > > For example, users will often run send() in > > > > > > multiple > > > > > > > > > > threads > > > > > > > > > > > in > > > > > > > > > > > > > order > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > serialize faster, but that wouldn't work > > quite > > > > the > > > > > > > same > > > > > > > > > if > > > > > > > > > > > > there > > > > > > > > > > > > > were > > > > > > > > > > > > > > > > an > > > > > > > > > > > > > > > > > > internal thread pool. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For this reason I'm thinking we need to > > make > > > > sure > > > > > > any > > > > > > > > > such > > > > > > > > > > > > > changes > > > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > > > opt-in. Maybe a new constructor with an > > > > additional > > > > > > > > > > > > ThreadFactory > > > > > > > > > > > > > > > > > parameter. > > > > > > > > > > > > > > > > > > That would at least clearly indicate that > > work > > > > will > > > > > > > > > happen > > > > > > > > > > > > > > > off-thread, > > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > would require opt-in for the new behavior. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Under the hood, this ThreadFactory could be > > > > used to > > > > > > > > > create > > > > > > > > > > > the > > > > > > > > > > > > > worker > > > > > > > > > > > > > > > > > > thread that process queued sends, which > > could > > > > > > fan-out > > > > > > > > to > > > > > > > > > > > > > > > per-partition > > > > > > > > > > > > > > > > > > threads from there. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > So then you'd have two ways to send: the > > > > existing > > > > > > > way, > > > > > > > > > > where > > > > > > > > > > > > > serde > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > interceptors and whatnot are executed on > > the > > > > > > calling > > > > > > > > > > thread, > > > > > > > > > > > > and > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > new > > > > > > > > > > > > > > > > > > way, which returns right away and uses an > > > > internal > > > > > > > > > > Executor. > > > > > > > > > > > As > > > > > > > > > > > > > you > > > > > > > > > > > > > > > > point > > > > > > > > > > > > > > > > > > out, the semantics would be identical in > > either > > > > > > case, > > > > > > > > and > > > > > > > > > > it > > > > > > > > > > > > > would be > > > > > > > > > > > > > > > > > very > > > > > > > > > > > > > > > > > > easy for clients to switch. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 13, 2021, 9:00 AM Nakamura < > > > > > > > > nny...@gmail.com > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey Folks, > > > > > > > > > > > > > > > > > > > I just posted a new proposal > > > > > > > > > > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > in the wiki. I think we have an > > opportunity > > > > to > > > > > > > > improve > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > KafkaProducer#send user experience. It > > would > > > > > > > > certainly > > > > > > > > > > > make > > > > > > > > > > > > > our > > > > > > > > > > > > > > > > lives > > > > > > > > > > > > > > > > > > > easier. Please take a look! There are > > two > > > > > > > > subproblems > > > > > > > > > > > that > > > > > > > > > > > > I > > > > > > > > > > > > > > > could > > > > > > > > > > > > > > > > > use > > > > > > > > > > > > > > > > > > > guidance on, so I would appreciate > > feedback > > > > on > > > > > > both > > > > > > > > of > > > > > > > > > > > them. > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > Moses > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > Matthew de Detrich > > > > > > > > > > > > > > > > *Aiven Deutschland GmbH* > > > > > > > > > > > > > > > > Immanuelkirchstraße 26, 10405 Berlin > > > > > > > > > > > > > > > > Amtsgericht Charlottenburg, HRB 209739 B > > > > > > > > > > > > > > > > *m:* +491603708037 > > > > > > > > > > > > > > > > *w:* aiven.io *e:* matthew.dedetr...@aiven.io > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > Matthew de Detrich > > > > > > > > > > > > *Aiven Deutschland GmbH* > > > > > > > > > > > > Immanuelkirchstraße 26, 10405 Berlin > > > > > > > > > > > > Amtsgericht Charlottenburg, HRB 209739 B > > > > > > > > > > > > *m:* +491603708037 > > > > > > > > > > > > *w:* aiven.io *e:* matthew.dedetr...@aiven.io > > > > > > > > > > > > > > > > > > > > >