Hi Colin, > Sure, we organize buffers by broker currently. However, we could set some maximum buffer size for records that haven't been assigned to a broker yet. OK, I think we're probably aligned then. I think we were using slightly different terminology (queue vs buffer) but we were actually violently agreeing.
> In general the Kafka producer is supposed to be used from a client thread. That thread is responsible for calling poll periodically to get the results of any send() operations it performed. (It's possible to use the producer from multiple threads as well.) > > The main point I was making is that metadata fetches can and should be done in the same way as any other network I/O in the producer. Thanks! I think I don't quite understand, but I'll do some research myself and try to understand it better. Best, Moses On Wed, Jun 2, 2021 at 2:21 PM Colin McCabe <cmcc...@apache.org> wrote: > On Tue, Jun 1, 2021, at 12:22, Nakamura wrote: > > I think we're talking past each other a bit. I know about non-blocking > > I/O. The problem I'm facing is how to preserve the existing semantics > > without blocking. Right now callers assume their work is enqueued > in-order > > after `KafkaProducer#send` returns. We can't simply return a future that > > represents the metadata fetch, because of that assumption. We need to > > maintain order somehow. That is what all of the different queues we're > > proposing are intended to do. > > Hi Nakamura, > > I guess the point I was making was that there is no connection between > first-in, first-out semantics and blocking. Nothing about FIFO semantics > requires blocking. > > > > How are the ordering semantics of `KafkaProducer#send` related to the > > > metadata fetch? > > KafkaProducer#send currently enqueues after it has the metadata, and it > > passes the TopicPartition struct as part of the data when enqueueing. We > > can either update that data structure to be able to work with partial > > metadata, or we can add a new queue on top. I outline both potential > > approaches in the current KIP. > > > > > That is not related to the metadata fetch. Also, I already proposed a > > > solution (returning an error) if this is a concern. > > Unfortunately it is, because `KafkaProducer#send` conflates the two of > > them. That seems to be the central difficulty of preserving the > semantics > > here. > > Sure, we organize buffers by broker currently. However, we could set some > maximum buffer size for records that haven't been assigned to a broker yet. > > > > > > The same client thread that always has been responsible for checking > poll. > > Please pretend I've never contributed to Kafka before :). Which thread is > > that? > > In general the Kafka producer is supposed to be used from a client thread. > That thread is responsible for calling poll periodically to get the results > of any send() operations it performed. (It's possible to use the producer > from multiple threads as well.) > > The main point I was making is that metadata fetches can and should be > done in the same way as any other network I/O in the producer. > > best, > Colin > > > > > Best, > > Moses > > > > On Tue, Jun 1, 2021 at 3:12 PM Ryanne Dolan <ryannedo...@gmail.com> > wrote: > > > > > Colin, the issue for me isn't so much whether non-blocking I/O is used > or > > > not, but the fact that the caller observes a long time between calling > > > send() and receiving the returned future. This behavior can be > considered > > > "blocking" whether or not I/O is involved. > > > > > > > How are the ordering semantics of `KafkaProducer#send` related to the > > > metadata fetch? > > > > I already proposed a solution (returning an error) > > > > > > There is a subtle difference between failing immediately vs blocking > for > > > metadata, related to ordering in the face of retries. Say we set the > send > > > timeout to max-long (or something high enough that we rarely encounter > > > timeouts in practice), and set max inflight requests to 1. Today, we > can > > > reasonably assume that calling send() in sequence to a specific > partition > > > will result in the corresponding sequence landing on that partition, > > > regardless of how the caller handles retries. The caller might not > handle > > > retries at all. But if we can fail immediately (e.g. when the metadata > > > isn't yet ready), then the caller must handle retries carefully. > > > Specifically, the caller must retry each send() before proceeding to > the > > > next. This basically means that the caller must block on each send() in > > > order to maintain the proper sequence -- how else would the caller know > > > whether it will need to retry or not? > > > > > > In other words, failing immediately punts the problem to the caller to > > > handle, while the caller is less-equipped to deal with it. I don't > think we > > > should do that, at least not in the default case. > > > > > > I actually don't have any objections to this approach so long as it's > > > opt-in. It sounds like you are suggesting to fix the bug for everyone, > but > > > I don't think we can do that without subtly breaking things. > > > > > > Ryanne > > > > > > On Tue, Jun 1, 2021 at 12:31 PM Colin McCabe <cmcc...@apache.org> > wrote: > > > > > > > On Tue, Jun 1, 2021, at 07:00, Nakamura wrote: > > > > > Hi Colin, > > > > > > > > > > Sorry, I still don't follow. > > > > > > > > > > Right now `KafkaProducer#send` seems to trigger a metadata fetch. > > > Today, > > > > > we block on that before returning. Is your proposal that we move > the > > > > > metadata fetch out of `KafkaProducer#send` entirely? > > > > > > > > > > > > > KafkaProducer#send is supposed to initiate non-blocking I/O, but not > wait > > > > for it to complete. > > > > > > > > There's more information about non-blocking I/O in Java here: > > > > https://en.wikipedia.org/wiki/Non-blocking_I/O_%28Java%29 > > > > > > > > > > > > > > Even if the metadata fetch moves to be non-blocking, I think we > still > > > > need > > > > > to deal with the problems we've discussed before if the fetch > happens > > > in > > > > > the `KafkaProducer#send` method. How do we maintain the ordering > > > > semantics > > > > > of `KafkaProducer#send`? > > > > > > > > How are the ordering semantics of `KafkaProducer#send` related to the > > > > metadata fetch? > > > > > > > > > How do we prevent our buffer from filling up? > > > > > > > > That is not related to the metadata fetch. Also, I already proposed a > > > > solution (returning an error) if this is a concern. > > > > > > > > > Which thread is responsible for checking poll()? > > > > > > > > The same client thread that always has been responsible for checking > > > poll. > > > > > > > > > > > > > > The only approach I can see that would avoid this would be moving > the > > > > > metadata fetch to happen at a different time. But it's not clear > to me > > > > > when would be a more appropriate time to do the metadata fetch than > > > > > `KafkaProducer#send`. > > > > > > > > > > > > > It's not about moving the metadata fetch to happen at a different > time. > > > > It's about using non-blocking I/O, like we do for other network I/O. > (And > > > > actually, if you want to get really technical, we do this for the > > > metadata > > > > fetch too, it's just that we have a hack that loops to transform it > back > > > > into blocking I/O.) > > > > > > > > best, > > > > Colin > > > > > > > > > I think there's something I'm missing here. Would you mind > helping me > > > > > figure out what it is? > > > > > > > > > > Best, > > > > > Moses > > > > > > > > > > On Sun, May 30, 2021 at 5:35 PM Colin McCabe <cmcc...@apache.org> > > > wrote: > > > > > > > > > > > On Tue, May 25, 2021, at 11:26, Nakamura wrote: > > > > > > > Hey Colin, > > > > > > > > > > > > > > For the metadata case, what would fixing the bug look like? I > > > agree > > > > that > > > > > > > we should fix it, but I don't have a clear picture in my mind > of > > > what > > > > > > > fixing it should look like. Can you elaborate? > > > > > > > > > > > > > > > > > > > If the blocking metadata fetch bug were fixed, neither the > producer > > > nor > > > > > > the consumer would block while fetching metadata. A poll() call > would > > > > > > initiate a metadata fetch if needed, and a subsequent call to > poll() > > > > would > > > > > > handle the results if needed. Basically the same paradigm we use > for > > > > other > > > > > > network communication in the producer and consumer. > > > > > > > > > > > > best, > > > > > > Colin > > > > > > > > > > > > > Best, > > > > > > > Moses > > > > > > > > > > > > > > On Mon, May 24, 2021 at 1:54 PM Colin McCabe < > cmcc...@apache.org> > > > > wrote: > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > I agree that we should give users the option of having a > fully > > > > async > > > > > > API, > > > > > > > > but I don't think external thread pools or queues are the > right > > > > > > direction > > > > > > > > to go here. They add performance overheads and don't address > the > > > > root > > > > > > > > causes of the problem. > > > > > > > > > > > > > > > > There are basically two scenarios where we block, currently. > One > > > is > > > > > > when > > > > > > > > we are doing a metadata fetch. I think this is clearly a > bug, or > > > at > > > > > > least > > > > > > > > an implementation limitation. From the user's point of view, > the > > > > fact > > > > > > that > > > > > > > > we are doing a metadata fetch is an implementation detail > that > > > > really > > > > > > > > shouldn't be exposed like this. We have talked about fixing > this > > > > in the > > > > > > > > past. I think we just should spend the time to do it. > > > > > > > > > > > > > > > > The second scenario is where the client has produced too much > > > data > > > > in > > > > > > too > > > > > > > > little time. This could happen if there is a network glitch, > or > > > the > > > > > > server > > > > > > > > is slower than expected. In this case, the behavior is > > > intentional > > > > and > > > > > > not > > > > > > > > a bug. To understand this, think about what would happen if > we > > > > didn't > > > > > > > > block. We would start buffering more and more data in memory, > > > until > > > > > > finally > > > > > > > > the application died with an out of memory error. That would > be > > > > > > frustrating > > > > > > > > for users and wouldn't add to the usability of Kafka. > > > > > > > > > > > > > > > > We could potentially have an option to handle the > out-of-memory > > > > > > scenario > > > > > > > > differently by returning an error code immediately rather > than > > > > > > blocking. > > > > > > > > Applications would have to be rewritten to handle this > properly, > > > > but > > > > > > it is > > > > > > > > a possibility. I suspect that most of them wouldn't use > this, but > > > > we > > > > > > could > > > > > > > > offer it as a possibility for async purists (which might > include > > > > > > certain > > > > > > > > frameworks). The big problem the users would have to solve is > > > what > > > > to > > > > > > do > > > > > > > > with the record that they were unable to produce due to the > > > buffer > > > > full > > > > > > > > issue. > > > > > > > > > > > > > > > > best, > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 20, 2021, at 10:35, Nakamura wrote: > > > > > > > > > > > > > > > > > > > > My suggestion was just do this in multiple steps/phases, > > > > firstly > > > > > > let's > > > > > > > > fix > > > > > > > > > > the issue of send being misleadingly asynchronous (i.e. > > > > internally > > > > > > its > > > > > > > > > > blocking) and then later one we can make the various > > > > > > > > > > threadpools configurable with a sane default. > > > > > > > > > > > > > > > > > > I like that approach. I updated the "Which thread should be > > > > > > responsible > > > > > > > > for > > > > > > > > > waiting" part of KIP-739 to add your suggestion as my > > > recommended > > > > > > > > approach, > > > > > > > > > thank you! If no one else has major concerns about that > > > > approach, > > > > > > I'll > > > > > > > > > move the alternatives to "rejected alternatives". > > > > > > > > > > > > > > > > > > On Thu, May 20, 2021 at 7:26 AM Matthew de Detrich > > > > > > > > > <matthew.dedetr...@aiven.io.invalid> wrote: > > > > > > > > > > > > > > > > > > > @ > > > > > > > > > > > > > > > > > > > > Nakamura > > > > > > > > > > On Wed, May 19, 2021 at 7:35 PM Nakamura < > nny...@gmail.com> > > > > wrote: > > > > > > > > > > > > > > > > > > > > > @Ryanne: > > > > > > > > > > > In my mind's eye I slightly prefer the throwing the > "cannot > > > > > > enqueue" > > > > > > > > > > > exception to satisfying the future immediately with the > > > > "cannot > > > > > > > > enqueue" > > > > > > > > > > > exception? But I agree, it would be worth doing more > > > > research. > > > > > > > > > > > > > > > > > > > > > > @Matthew: > > > > > > > > > > > > > > > > > > > > > > > 3. Using multiple thread pools is definitely > recommended > > > > for > > > > > > > > different > > > > > > > > > > > > types of tasks, for serialization which is CPU bound > you > > > > > > definitely > > > > > > > > > > would > > > > > > > > > > > > want to use a bounded thread pool that is fixed by > the > > > > number > > > > > > of > > > > > > > > CPU's > > > > > > > > > > > (or > > > > > > > > > > > > something along those lines). > > > > > > > > > > > > > > > > > > > https://gist.github.com/djspiewak/46b543800958cf61af6efa8e072bfd5c > > > > > > > > is > > > > > > > > > > a > > > > > > > > > > > > very good guide on this topic > > > > > > > > > > > I think this guide is good in general, but I would be > > > > hesitant to > > > > > > > > follow > > > > > > > > > > > its guidance re: offloading serialization without > > > > benchmarking > > > > > > it. > > > > > > > > My > > > > > > > > > > > understanding is that context-switches have gotten much > > > > cheaper, > > > > > > and > > > > > > > > that > > > > > > > > > > > gains from cache locality are small, but they're not > > > nothing. > > > > > > > > Especially > > > > > > > > > > > if the workload has a very small serialization cost, I > > > > wouldn't > > > > > > be > > > > > > > > > > shocked > > > > > > > > > > > if it made it slower. I feel pretty strongly that we > > > should > > > > do > > > > > > more > > > > > > > > > > > research here before unconditionally encouraging > > > > serialization > > > > > > in a > > > > > > > > > > > threadpool. If people think it's important to do it > here > > > > (eg if > > > > > > we > > > > > > > > think > > > > > > > > > > > it would mean another big API change) then we should > start > > > > > > thinking > > > > > > > > about > > > > > > > > > > > what benchmarking we can do to gain higher confidence > in > > > this > > > > > > kind of > > > > > > > > > > > change. However, I don't think it would change > semantics > > > as > > > > > > > > > > substantially > > > > > > > > > > > as we're proposing here, so I would vote for pushing > this > > > to > > > > a > > > > > > > > subsequent > > > > > > > > > > > KIP. > > > > > > > > > > > > > > > > > > > > > Of course, its all down to benchmarking, benchmarking and > > > > > > benchmarking. > > > > > > > > > > Ideally speaking you want to use all of the resources > > > > available to > > > > > > > > you, so > > > > > > > > > > if you have a bottleneck in serialization and you have > many > > > > cores > > > > > > free > > > > > > > > then > > > > > > > > > > using multiple cores may be more appropriate than a > single > > > > core. > > > > > > > > Typically > > > > > > > > > > I would expect that using a single thread to do > serialization > > > > is > > > > > > > > likely to > > > > > > > > > > be the most situation, I was just responding to an > earlier > > > > point > > > > > > that > > > > > > > > was > > > > > > > > > > made in regards to using ThreadPools for serialization > (note > > > > that > > > > > > you > > > > > > > > can > > > > > > > > > > also just use a ThreadPool that is pinned to a single > thread) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 4. Regarding providing the ability for users to > supply > > > > their > > > > > > own > > > > > > > > custom > > > > > > > > > > > > ThreadPool this is more of an ergonomics question > for the > > > > API. > > > > > > > > > > Especially > > > > > > > > > > > > when it gets to monitoring/tracing, giving the > ability > > > for > > > > > > users to > > > > > > > > > > > provide > > > > > > > > > > > > their own custom IO/CPU ThreadPools is ideal however > as > > > > stated > > > > > > > > doing so > > > > > > > > > > > > means a lot of boilerplatery changes to the API. > > > Typically > > > > > > > > speaking a > > > > > > > > > > lot > > > > > > > > > > > > of monitoring/tracing/diagnosing is done on > > > > > > > > > > ExecutionContext/ThreadPools > > > > > > > > > > > > (at least on a more rudimentary level) and hence > allowing > > > > > > users to > > > > > > > > > > supply > > > > > > > > > > > a > > > > > > > > > > > > global singleton ThreadPool for IO tasks and another > for > > > > CPU > > > > > > tasks > > > > > > > > > > makes > > > > > > > > > > > > their lives a lot easier. However due to the large > amount > > > > of > > > > > > > > changes to > > > > > > > > > > > the > > > > > > > > > > > > API, it may be more appropriate to just use internal > > > thread > > > > > > pools > > > > > > > > (for > > > > > > > > > > > now) > > > > > > > > > > > > since at least it's not any worse than what exists > > > > currently > > > > > > and > > > > > > > > this > > > > > > > > > > can > > > > > > > > > > > > be an improvement that is done later? > > > > > > > > > > > Is there an existing threadpool that you suggest we > reuse? > > > > Or > > > > > > are > > > > > > > > you > > > > > > > > > > > imagining that we make our own internal threadpool, and > > > then > > > > > > maybe > > > > > > > > expose > > > > > > > > > > > configuration flags to manipulate it? For what it's > > > worth, I > > > > > > like > > > > > > > > having > > > > > > > > > > > an internal threadpool (perhaps just FJP.commonpool) > and > > > then > > > > > > > > providing > > > > > > > > > > an > > > > > > > > > > > alternative to pass your own threadpool. That way > people > > > who > > > > > > want > > > > > > > > finer > > > > > > > > > > > control can get it, and everyone else can do OK with > the > > > > default. > > > > > > > > > > > > > > > > > > > > > Indeed that is what I am saying. The most ideal > situation is > > > > that > > > > > > > > there is > > > > > > > > > > a default internal threadpool that Kafka uses, however > users > > > of > > > > > > Kafka > > > > > > > > can > > > > > > > > > > configure there own threadpool. Having a singleton > ThreadPool > > > > for > > > > > > > > blocking > > > > > > > > > > IO, non blocking IO and CPU bound tasks which can be > plugged > > > in > > > > > > all of > > > > > > > > your > > > > > > > > > > libraries (including Kafka) makes resource management > much > > > > easier > > > > > > to > > > > > > > > do and > > > > > > > > > > also gives control of users to override specific > threadpools > > > > for > > > > > > > > > > exceptional cases (i.e. providing a Threadpool that is > pinned > > > > to a > > > > > > > > single > > > > > > > > > > core which tends to give the best latency results if > this is > > > > > > something > > > > > > > > that > > > > > > > > > > is critical for you). > > > > > > > > > > > > > > > > > > > > My suggestion was just do this in multiple steps/phases, > > > > firstly > > > > > > let's > > > > > > > > fix > > > > > > > > > > the issue of send being misleadingly asynchronous (i.e. > > > > internally > > > > > > its > > > > > > > > > > blocking) and then later one we can make the various > > > > > > > > > > threadpools configurable with a sane default. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, May 19, 2021 at 6:01 AM Matthew de Detrich > > > > > > > > > > > <matthew.dedetr...@aiven.io.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > > > Here are my two cents here (note that I am only > seeing > > > > this on > > > > > > a > > > > > > > > > > surface > > > > > > > > > > > > level) > > > > > > > > > > > > > > > > > > > > > > > > 1. If we are going this road it makes sense to do > this > > > > > > "properly" > > > > > > > > (i.e. > > > > > > > > > > > > using queues as Ryan suggested). The reason I am > saying > > > > this > > > > > > is > > > > > > > > that > > > > > > > > > > it > > > > > > > > > > > > seems that the original goal of the KIP is for it to > be > > > > used in > > > > > > > > other > > > > > > > > > > > > asynchronous systems and from my personal > experience, you > > > > > > really do > > > > > > > > > > need > > > > > > > > > > > to > > > > > > > > > > > > make the implementation properly asynchronous > otherwise > > > > it's > > > > > > > > really not > > > > > > > > > > > > that useful. > > > > > > > > > > > > 2. Due to the previous point and what was said by > others, > > > > this > > > > > > is > > > > > > > > > > likely > > > > > > > > > > > > going to break some existing semantics (i.e. people > are > > > > > > currently > > > > > > > > > > relying > > > > > > > > > > > > on blocking semantics) so adding another > > > method's/interface > > > > > > plus > > > > > > > > > > > > deprecating the older one is more annoying but ideal. > > > > > > > > > > > > 3. Using multiple thread pools is definitely > recommended > > > > for > > > > > > > > different > > > > > > > > > > > > types of tasks, for serialization which is CPU bound > you > > > > > > definitely > > > > > > > > > > would > > > > > > > > > > > > want to use a bounded thread pool that is fixed by > the > > > > number > > > > > > of > > > > > > > > CPU's > > > > > > > > > > > (or > > > > > > > > > > > > something along those lines). > > > > > > > > > > > > > > > > > > > https://gist.github.com/djspiewak/46b543800958cf61af6efa8e072bfd5c > > > > > > > > is > > > > > > > > > > a > > > > > > > > > > > > very good guide on this topic > > > > > > > > > > > > 4. Regarding providing the ability for users to > supply > > > > their > > > > > > own > > > > > > > > custom > > > > > > > > > > > > ThreadPool this is more of an ergonomics question > for the > > > > API. > > > > > > > > > > Especially > > > > > > > > > > > > when it gets to monitoring/tracing, giving the > ability > > > for > > > > > > users to > > > > > > > > > > > provide > > > > > > > > > > > > their own custom IO/CPU ThreadPools is ideal however > as > > > > stated > > > > > > > > doing so > > > > > > > > > > > > means a lot of boilerplatery changes to the API. > > > Typically > > > > > > > > speaking a > > > > > > > > > > lot > > > > > > > > > > > > of monitoring/tracing/diagnosing is done on > > > > > > > > > > ExecutionContext/ThreadPools > > > > > > > > > > > > (at least on a more rudimentary level) and hence > allowing > > > > > > users to > > > > > > > > > > > supply a > > > > > > > > > > > > global singleton ThreadPool for IO tasks and another > for > > > > CPU > > > > > > tasks > > > > > > > > > > makes > > > > > > > > > > > > their lives a lot easier. However due to the large > amount > > > > of > > > > > > > > changes to > > > > > > > > > > > the > > > > > > > > > > > > API, it may be more appropriate to just use internal > > > thread > > > > > > pools > > > > > > > > (for > > > > > > > > > > > now) > > > > > > > > > > > > since at least it's not any worse than what exists > > > > currently > > > > > > and > > > > > > > > this > > > > > > > > > > can > > > > > > > > > > > > be an improvement that is done later? > > > > > > > > > > > > > > > > > > > > > > > > On Wed, May 19, 2021 at 2:56 AM Ryanne Dolan < > > > > > > > > ryannedo...@gmail.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > I was thinking the sender would typically wrap > send() > > > in > > > > a > > > > > > > > > > > backoff/retry > > > > > > > > > > > > > loop, or else ignore any failures and drop sends > on the > > > > floor > > > > > > > > > > > > > (fire-and-forget), and in both cases I think > failing > > > > > > immediately > > > > > > > > is > > > > > > > > > > > > better > > > > > > > > > > > > > than blocking for a new spot in the queue or > > > > asynchronously > > > > > > > > failing > > > > > > > > > > > > > somehow. > > > > > > > > > > > > > > > > > > > > > > > > > > I think a failed future is adequate for the > "explicit > > > > > > > > backpressure > > > > > > > > > > > > signal" > > > > > > > > > > > > > while avoiding any blocking anywhere. I think if > we try > > > > to > > > > > > > > > > > asynchronously > > > > > > > > > > > > > signal the caller of failure (either by > asynchronously > > > > > > failing > > > > > > > > the > > > > > > > > > > > future > > > > > > > > > > > > > or invoking a callback off-thread or something) > we'd > > > > force > > > > > > the > > > > > > > > caller > > > > > > > > > > > to > > > > > > > > > > > > > either block or poll waiting for that signal, which > > > > somewhat > > > > > > > > defeats > > > > > > > > > > > the > > > > > > > > > > > > > purpose we're after. And of course blocking for a > spot > > > > in the > > > > > > > > queue > > > > > > > > > > > > > definitely defeats the purpose (tho perhaps > ameliorates > > > > the > > > > > > > > problem > > > > > > > > > > > > some). > > > > > > > > > > > > > > > > > > > > > > > > > > Throwing an exception to the caller directly (not > via > > > the > > > > > > > > future) is > > > > > > > > > > > > > another option with precedent in Kafka clients, > tho it > > > > > > doesn't > > > > > > > > seem > > > > > > > > > > as > > > > > > > > > > > > > ergonomic to me. > > > > > > > > > > > > > > > > > > > > > > > > > > It would be interesting to analyze some existing > usage > > > > and > > > > > > > > determine > > > > > > > > > > > how > > > > > > > > > > > > > difficult it would be to convert it to the various > > > > proposed > > > > > > APIs. > > > > > > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, May 18, 2021, 3:27 PM Nakamura < > > > nny...@gmail.com > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Ryanne, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hmm, that's an interesting idea. Basically it > would > > > > mean > > > > > > that > > > > > > > > > > after > > > > > > > > > > > > > > calling send, you would also have to check > whether > > > the > > > > > > returned > > > > > > > > > > > future > > > > > > > > > > > > > had > > > > > > > > > > > > > > failed with a specific exception. I would be > open to > > > > it, > > > > > > > > although > > > > > > > > > > I > > > > > > > > > > > > > think > > > > > > > > > > > > > > it might be slightly more surprising, since > right now > > > > the > > > > > > > > paradigm > > > > > > > > > > is > > > > > > > > > > > > > > "enqueue synchronously, the future represents > whether > > > > we > > > > > > > > succeeded > > > > > > > > > > in > > > > > > > > > > > > > > sending or not" and the new one would be "enqueue > > > > > > > > synchronously, > > > > > > > > > > the > > > > > > > > > > > > > future > > > > > > > > > > > > > > either represents whether we succeeded in > enqueueing > > > or > > > > > > not (in > > > > > > > > > > which > > > > > > > > > > > > > case > > > > > > > > > > > > > > it will be failed immediately if it failed to > > > enqueue) > > > > or > > > > > > > > whether > > > > > > > > > > we > > > > > > > > > > > > > > succeeded in sending or not". > > > > > > > > > > > > > > > > > > > > > > > > > > > > But you're right, it should be on the table, > thank > > > you > > > > for > > > > > > > > > > suggesting > > > > > > > > > > > > it! > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > Moses > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, May 18, 2021 at 12:23 PM Ryanne Dolan < > > > > > > > > > > ryannedo...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Moses, in the case of a full queue, could we > just > > > > return > > > > > > a > > > > > > > > failed > > > > > > > > > > > > > future > > > > > > > > > > > > > > > immediately? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, May 18, 2021, 10:39 AM Nakamura < > > > > > > nny...@gmail.com> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Alexandre, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for bringing this up, I think I could > use > > > > some > > > > > > > > feedback > > > > > > > > > > in > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > area. There are two mechanisms here, one for > > > > slowing > > > > > > down > > > > > > > > when > > > > > > > > > > > we > > > > > > > > > > > > > > don't > > > > > > > > > > > > > > > > have the relevant metadata, and the other for > > > > slowing > > > > > > down > > > > > > > > > > when a > > > > > > > > > > > > > queue > > > > > > > > > > > > > > > has > > > > > > > > > > > > > > > > filled up. Although the first one applies > > > > backpressure > > > > > > > > > > somewhat > > > > > > > > > > > > > > > > inadvertently, we could still get in trouble > if > > > > we're > > > > > > not > > > > > > > > > > > providing > > > > > > > > > > > > > > > > information to the mechanism that monitors > > > whether > > > > > > we're > > > > > > > > > > queueing > > > > > > > > > > > > too > > > > > > > > > > > > > > > > much. As for the second one, that is a > classic > > > > > > > > backpressure > > > > > > > > > > use > > > > > > > > > > > > > case, > > > > > > > > > > > > > > so > > > > > > > > > > > > > > > > it's definitely important that we don't drop > that > > > > > > ability. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Right now backpressure is applied by > blocking, > > > > which > > > > > > is a > > > > > > > > > > natural > > > > > > > > > > > > way > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > apply backpressure in synchronous systems, > but > > > can > > > > > > lead to > > > > > > > > > > > > > unnecessary > > > > > > > > > > > > > > > > slowdowns in asynchronous systems. In my > > > opinion, > > > > the > > > > > > > > safest > > > > > > > > > > way > > > > > > > > > > > > to > > > > > > > > > > > > > > > apply > > > > > > > > > > > > > > > > backpressure in an asynchronous model is to > have > > > an > > > > > > > > explicit > > > > > > > > > > > > > > backpressure > > > > > > > > > > > > > > > > signal. A good example would be returning an > > > > > > exception, > > > > > > > > and > > > > > > > > > > > > > providing > > > > > > > > > > > > > > an > > > > > > > > > > > > > > > > optional hook to add a callback onto so that > you > > > > can be > > > > > > > > > > notified > > > > > > > > > > > > when > > > > > > > > > > > > > > > it's > > > > > > > > > > > > > > > > ready to accept more messages. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > However, this would be a really big change > to how > > > > > > users use > > > > > > > > > > > > > > > > KafkaProducer#send, so I don't know how much > > > > appetite > > > > > > we > > > > > > > > have > > > > > > > > > > for > > > > > > > > > > > > > > making > > > > > > > > > > > > > > > > that kind of change. Maybe it would be > simpler > > > to > > > > > > remove > > > > > > > > the > > > > > > > > > > > > "don't > > > > > > > > > > > > > > > block > > > > > > > > > > > > > > > > when the per-topic queue is full" from the > scope > > > of > > > > > > this > > > > > > > > KIP, > > > > > > > > > > and > > > > > > > > > > > > > only > > > > > > > > > > > > > > > > focus on when metadata is available? The > > > downside > > > > is > > > > > > that > > > > > > > > we > > > > > > > > > > > will > > > > > > > > > > > > > > > probably > > > > > > > > > > > > > > > > want to change the API again later to fix > this, > > > so > > > > it > > > > > > > > might be > > > > > > > > > > > > better > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > just rip the bandaid off now. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > One slightly nasty thing here is that because > > > > queueing > > > > > > > > order is > > > > > > > > > > > > > > > important, > > > > > > > > > > > > > > > > if we want to use exceptions, we will want > to be > > > > able > > > > > > to > > > > > > > > signal > > > > > > > > > > > the > > > > > > > > > > > > > > > failure > > > > > > > > > > > > > > > > to enqueue to the caller in such a way that > they > > > > can > > > > > > still > > > > > > > > > > > enforce > > > > > > > > > > > > > > > message > > > > > > > > > > > > > > > > order if they want. So we can't embed the > > > failure > > > > > > > > directly in > > > > > > > > > > > the > > > > > > > > > > > > > > > returned > > > > > > > > > > > > > > > > future, we should either return two futures > > > > (nested, > > > > > > or as > > > > > > > > a > > > > > > > > > > > tuple) > > > > > > > > > > > > > or > > > > > > > > > > > > > > > else > > > > > > > > > > > > > > > > throw an exception to explain a backpressure. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > So there are a few things we should work out > > > here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. Should we keep the "too many bytes > enqueued" > > > > part of > > > > > > > > this in > > > > > > > > > > > > > scope? > > > > > > > > > > > > > > > (I > > > > > > > > > > > > > > > > would say yes, so that we can minimize churn > in > > > > this > > > > > > API) > > > > > > > > > > > > > > > > 2. How should we signal backpressure so that > it's > > > > > > > > appropriate > > > > > > > > > > for > > > > > > > > > > > > > > > > asynchronous systems? (I would say that we > > > should > > > > > > throw an > > > > > > > > > > > > > exception. > > > > > > > > > > > > > > > If > > > > > > > > > > > > > > > > we choose this and we want to pursue the > queueing > > > > > > path, we > > > > > > > > > > would > > > > > > > > > > > > > *not* > > > > > > > > > > > > > > > want > > > > > > > > > > > > > > > > to enqueue messages that would push us over > the > > > > limit, > > > > > > and > > > > > > > > > > would > > > > > > > > > > > > only > > > > > > > > > > > > > > > want > > > > > > > > > > > > > > > > to enqueue messages when we're waiting for > > > > metadata, > > > > > > and we > > > > > > > > > > would > > > > > > > > > > > > > want > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > keep track of the total number of bytes for > those > > > > > > > > messages). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > What do you think? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > Moses > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sun, May 16, 2021 at 6:16 AM Alexandre > > > Dupriez < > > > > > > > > > > > > > > > > alexandre.dupr...@gmail.com> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hello Nakamura, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for proposing this change. I can > see how > > > > the > > > > > > > > blocking > > > > > > > > > > > > > > behaviour > > > > > > > > > > > > > > > > > can be a problem when integrating with > reactive > > > > > > > > frameworks > > > > > > > > > > such > > > > > > > > > > > > as > > > > > > > > > > > > > > > > > Akka. One of the questions I would have is > how > > > > you > > > > > > would > > > > > > > > > > handle > > > > > > > > > > > > > back > > > > > > > > > > > > > > > > > pressure and avoid memory exhaustion when > the > > > > > > producer's > > > > > > > > > > buffer > > > > > > > > > > > > is > > > > > > > > > > > > > > > > > full and tasks would start to accumulate > in the > > > > > > > > out-of-band > > > > > > > > > > > queue > > > > > > > > > > > > > or > > > > > > > > > > > > > > > > > thread pool introduced with this KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > Alexandre > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Le ven. 14 mai 2021 à 15:55, Ryanne Dolan < > > > > > > > > > > > ryannedo...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > a > > > > > > > > > > > > > > > > écrit > > > > > > > > > > > > > > > > > : > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Makes sense! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, May 14, 2021, 9:39 AM Nakamura < > > > > > > > > nny...@gmail.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey Ryanne, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I see what you're saying about serde > > > > blocking, > > > > > > but I > > > > > > > > > > think > > > > > > > > > > > we > > > > > > > > > > > > > > > should > > > > > > > > > > > > > > > > > > > consider it out of scope for this > patch. > > > > Right > > > > > > now > > > > > > > > we've > > > > > > > > > > > > > nailed > > > > > > > > > > > > > > > > down a > > > > > > > > > > > > > > > > > > > couple of use cases where we can > > > > unambiguously > > > > > > say, > > > > > > > > "I > > > > > > > > > > can > > > > > > > > > > > > make > > > > > > > > > > > > > > > > > progress > > > > > > > > > > > > > > > > > > > now" or "I cannot make progress now", > which > > > > > > makes it > > > > > > > > > > > possible > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > offload to > > > > > > > > > > > > > > > > > > > a different thread only if we are > unable to > > > > make > > > > > > > > > > progress. > > > > > > > > > > > > > > > Extending > > > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > > > to CPU work like serde would mean > always > > > > > > offloading, > > > > > > > > > > which > > > > > > > > > > > > > would > > > > > > > > > > > > > > > be a > > > > > > > > > > > > > > > > > > > really big performance change. It > might be > > > > worth > > > > > > > > > > exploring > > > > > > > > > > > > > > anyway, > > > > > > > > > > > > > > > > > but I'd > > > > > > > > > > > > > > > > > > > rather keep this patch focused on > improving > > > > > > > > ergonomics, > > > > > > > > > > > > rather > > > > > > > > > > > > > > than > > > > > > > > > > > > > > > > > > > muddying the waters with evaluating > > > > performance > > > > > > very > > > > > > > > > > > deeply. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think if we really do want to support > > > > serde or > > > > > > > > > > > interceptors > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > do > > > > > > > > > > > > > > > > > IO on > > > > > > > > > > > > > > > > > > > the send path (which seems like an > > > > anti-pattern > > > > > > to > > > > > > > > me), > > > > > > > > > > we > > > > > > > > > > > > > should > > > > > > > > > > > > > > > > > consider > > > > > > > > > > > > > > > > > > > making that a separate SIP, and > probably > > > also > > > > > > > > consider > > > > > > > > > > > > changing > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > API to > > > > > > > > > > > > > > > > > > > use Futures (or CompletionStages). > But I > > > > would > > > > > > > > rather > > > > > > > > > > > avoid > > > > > > > > > > > > > > scope > > > > > > > > > > > > > > > > > creep, > > > > > > > > > > > > > > > > > > > so that we have a better chance of > fixing > > > > this > > > > > > part > > > > > > > > of > > > > > > > > > > the > > > > > > > > > > > > > > problem. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, I think some exceptions will move > to > > > > being > > > > > > async > > > > > > > > > > > instead > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > sync. > > > > > > > > > > > > > > > > > > > They'll still be surfaced in the > Future, so > > > > I'm > > > > > > not > > > > > > > > so > > > > > > > > > > > > > confident > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > it > > > > > > > > > > > > > > > > > > > would be that big a shock to users > though. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > Moses > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 13, 2021 at 7:44 PM Ryanne > > > Dolan > > > > < > > > > > > > > > > > > > > > ryannedo...@gmail.com> > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > re serialization, my concern is that > > > > > > serialization > > > > > > > > > > often > > > > > > > > > > > > > > accounts > > > > > > > > > > > > > > > > > for a > > > > > > > > > > > > > > > > > > > lot > > > > > > > > > > > > > > > > > > > > of the cycles spent before returning > the > > > > > > future. > > > > > > > > It's > > > > > > > > > > not > > > > > > > > > > > > > > > blocking > > > > > > > > > > > > > > > > > per > > > > > > > > > > > > > > > > > > > se, > > > > > > > > > > > > > > > > > > > > but it's the same effect from the > > > caller's > > > > > > > > perspective. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Moreover, serde impls often block > > > > themselves, > > > > > > e.g. > > > > > > > > when > > > > > > > > > > > > > > fetching > > > > > > > > > > > > > > > > > schemas > > > > > > > > > > > > > > > > > > > > from a registry. I suppose it's also > > > > possible > > > > > > to > > > > > > > > block > > > > > > > > > > in > > > > > > > > > > > > > > > > > Interceptors > > > > > > > > > > > > > > > > > > > > (e.g. writing audit events or > metrics), > > > > which > > > > > > > > happens > > > > > > > > > > > > before > > > > > > > > > > > > > > > serdes > > > > > > > > > > > > > > > > > iiuc. > > > > > > > > > > > > > > > > > > > > So any blocking in either of those > > > plugins > > > > > > would > > > > > > > > block > > > > > > > > > > > the > > > > > > > > > > > > > send > > > > > > > > > > > > > > > > > unless we > > > > > > > > > > > > > > > > > > > > queue first. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > So I think we want to queue first > and do > > > > > > everything > > > > > > > > > > > > > off-thread > > > > > > > > > > > > > > > when > > > > > > > > > > > > > > > > > using > > > > > > > > > > > > > > > > > > > > the new API, whatever that looks > like. I > > > > just > > > > > > want > > > > > > > > to > > > > > > > > > > > make > > > > > > > > > > > > > sure > > > > > > > > > > > > > > > we > > > > > > > > > > > > > > > > > don't > > > > > > > > > > > > > > > > > > > do > > > > > > > > > > > > > > > > > > > > that for clients that wouldn't > expect it. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Another consideration is exception > > > > handling. > > > > > > If we > > > > > > > > > > queue > > > > > > > > > > > > > right > > > > > > > > > > > > > > > > away, > > > > > > > > > > > > > > > > > > > we'll > > > > > > > > > > > > > > > > > > > > defer some exceptions that currently > are > > > > > > thrown to > > > > > > > > the > > > > > > > > > > > > caller > > > > > > > > > > > > > > > > > (before the > > > > > > > > > > > > > > > > > > > > future is returned). In the new API, > the > > > > send() > > > > > > > > > > wouldn't > > > > > > > > > > > > > throw > > > > > > > > > > > > > > > any > > > > > > > > > > > > > > > > > > > > exceptions, and instead the future > would > > > > fail. > > > > > > I > > > > > > > > think > > > > > > > > > > > that > > > > > > > > > > > > > > might > > > > > > > > > > > > > > > > > mean > > > > > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > > > > a new method signature is required. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 13, 2021, 2:57 PM > Nakamura < > > > > > > > > > > > > > > nakamura.mo...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey Ryanne, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I agree we should add an additional > > > > > > constructor > > > > > > > > (or > > > > > > > > > > > else > > > > > > > > > > > > an > > > > > > > > > > > > > > > > > additional > > > > > > > > > > > > > > > > > > > > > overload in KafkaProducer#send, > but the > > > > new > > > > > > > > > > constructor > > > > > > > > > > > > > would > > > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > > easier > > > > > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > > understand) if we're targeting the > > > "user > > > > > > > > provides the > > > > > > > > > > > > > thread" > > > > > > > > > > > > > > > > > approach. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > From looking at the code, I think > we > > > can > > > > keep > > > > > > > > record > > > > > > > > > > > > > > > > serialization > > > > > > > > > > > > > > > > > on > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > user thread, if we consider that an > > > > important > > > > > > > > part of > > > > > > > > > > > the > > > > > > > > > > > > > > > > > semantics of > > > > > > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > > > > > method. It doesn't seem like > > > > serialization > > > > > > > > depends > > > > > > > > > > on > > > > > > > > > > > > > > knowing > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > cluster, > > > > > > > > > > > > > > > > > > > > > I think it's incidental that it > comes > > > > after > > > > > > the > > > > > > > > first > > > > > > > > > > > > > > > "blocking" > > > > > > > > > > > > > > > > > > > segment > > > > > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > > > > > the method. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > > Moses > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 13, 2021 at 2:38 PM > Ryanne > > > > Dolan > > > > > > < > > > > > > > > > > > > > > > > > ryannedo...@gmail.com> > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey Moses, I like the direction > here. > > > > My > > > > > > > > thinking > > > > > > > > > > is > > > > > > > > > > > > > that a > > > > > > > > > > > > > > > > > single > > > > > > > > > > > > > > > > > > > > > > additional work queue, s.t. > send() > > > can > > > > > > enqueue > > > > > > > > and > > > > > > > > > > > > > return, > > > > > > > > > > > > > > > > seems > > > > > > > > > > > > > > > > > like > > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > lightest touch. However, I don't > > > think > > > > we > > > > > > can > > > > > > > > > > > trivially > > > > > > > > > > > > > > > process > > > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > > > > > queue > > > > > > > > > > > > > > > > > > > > > > in an internal thread pool > without > > > > subtly > > > > > > > > changing > > > > > > > > > > > > > behavior > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > some > > > > > > > > > > > > > > > > > > > > > users. > > > > > > > > > > > > > > > > > > > > > > For example, users will often run > > > > send() in > > > > > > > > > > multiple > > > > > > > > > > > > > > threads > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > order > > > > > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > > > serialize faster, but that > wouldn't > > > > work > > > > > > quite > > > > > > > > the > > > > > > > > > > > same > > > > > > > > > > > > > if > > > > > > > > > > > > > > > > there > > > > > > > > > > > > > > > > > were > > > > > > > > > > > > > > > > > > > > an > > > > > > > > > > > > > > > > > > > > > > internal thread pool. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For this reason I'm thinking we > need > > > to > > > > > > make > > > > > > > > sure > > > > > > > > > > any > > > > > > > > > > > > > such > > > > > > > > > > > > > > > > > changes > > > > > > > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > > > > > > > opt-in. Maybe a new constructor > with > > > an > > > > > > > > additional > > > > > > > > > > > > > > > > ThreadFactory > > > > > > > > > > > > > > > > > > > > > parameter. > > > > > > > > > > > > > > > > > > > > > > That would at least clearly > indicate > > > > that > > > > > > work > > > > > > > > will > > > > > > > > > > > > > happen > > > > > > > > > > > > > > > > > > > off-thread, > > > > > > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > > > > would require opt-in for the new > > > > behavior. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Under the hood, this > ThreadFactory > > > > could be > > > > > > > > used to > > > > > > > > > > > > > create > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > worker > > > > > > > > > > > > > > > > > > > > > > thread that process queued sends, > > > which > > > > > > could > > > > > > > > > > fan-out > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > per-partition > > > > > > > > > > > > > > > > > > > > > > threads from there. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > So then you'd have two ways to > send: > > > > the > > > > > > > > existing > > > > > > > > > > > way, > > > > > > > > > > > > > > where > > > > > > > > > > > > > > > > > serde > > > > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > > > > interceptors and whatnot are > executed > > > > on > > > > > > the > > > > > > > > > > calling > > > > > > > > > > > > > > thread, > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > new > > > > > > > > > > > > > > > > > > > > > > way, which returns right away and > > > uses > > > > an > > > > > > > > internal > > > > > > > > > > > > > > Executor. > > > > > > > > > > > > > > > As > > > > > > > > > > > > > > > > > you > > > > > > > > > > > > > > > > > > > > point > > > > > > > > > > > > > > > > > > > > > > out, the semantics would be > identical > > > > in > > > > > > either > > > > > > > > > > case, > > > > > > > > > > > > and > > > > > > > > > > > > > > it > > > > > > > > > > > > > > > > > would be > > > > > > > > > > > > > > > > > > > > > very > > > > > > > > > > > > > > > > > > > > > > easy for clients to switch. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 13, 2021, 9:00 AM > > > Nakamura > > > > < > > > > > > > > > > > > nny...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey Folks, > > > > > > > > > > > > > > > > > > > > > > > I just posted a new proposal > > > > > > > > > > > > > > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > in the wiki. I think we have > an > > > > > > opportunity > > > > > > > > to > > > > > > > > > > > > improve > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > KafkaProducer#send user > experience. > > > > It > > > > > > would > > > > > > > > > > > > certainly > > > > > > > > > > > > > > > make > > > > > > > > > > > > > > > > > our > > > > > > > > > > > > > > > > > > > > lives > > > > > > > > > > > > > > > > > > > > > > > easier. Please take a look! > There > > > > are > > > > > > two > > > > > > > > > > > > subproblems > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > I > > > > > > > > > > > > > > > > > > > could > > > > > > > > > > > > > > > > > > > > > use > > > > > > > > > > > > > > > > > > > > > > > guidance on, so I would > appreciate > > > > > > feedback > > > > > > > > on > > > > > > > > > > both > > > > > > > > > > > > of > > > > > > > > > > > > > > > them. > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > > > > Moses > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > > > > > Matthew de Detrich > > > > > > > > > > > > > > > > > > > > > > > > *Aiven Deutschland GmbH* > > > > > > > > > > > > > > > > > > > > > > > > Immanuelkirchstraße 26, 10405 Berlin > > > > > > > > > > > > > > > > > > > > > > > > Amtsgericht Charlottenburg, HRB 209739 B > > > > > > > > > > > > > > > > > > > > > > > > *m:* +491603708037 > > > > > > > > > > > > > > > > > > > > > > > > *w:* aiven.io *e:* matthew.dedetr...@aiven.io > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > Matthew de Detrich > > > > > > > > > > > > > > > > > > > > *Aiven Deutschland GmbH* > > > > > > > > > > > > > > > > > > > > Immanuelkirchstraße 26, 10405 Berlin > > > > > > > > > > > > > > > > > > > > Amtsgericht Charlottenburg, HRB 209739 B > > > > > > > > > > > > > > > > > > > > *m:* +491603708037 > > > > > > > > > > > > > > > > > > > > *w:* aiven.io *e:* matthew.dedetr...@aiven.io > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >