Hi Erik, I'm glad that this alternative is agreeable to you! Just to be clear though, it does still technically require a KIP since it's still a change to public API (just a much smaller one). And if we do decide to pursue this approach, we'll have to define the expected contract for this method so that it can be maintained correctly and documented for developers who may otherwise be confused and misuse the method.
Cheers, Chris On Sat, Jul 22, 2023 at 12:02 PM Erik van Oosten <e.vanoos...@grons.nl.invalid> wrote: > Hi all, > > I have created https://github.com/apache/kafka/pull/14071 to implement > Chris' idea. > > Kind regards, > Erik. > > > Op 22-07-2023 om 16:39 schreef Chris Egerton: > > Hi Erik, > > > > I don't think Matthias is bringing FUD to the discussion. Many of the > > people who maintain Kafka are familiar with Kafka client internals and > the > > Java programming language, but not necessarily other JVM languages or > > asynchronous runtimes. I think it's reasonable to ask for a code snippet > or > > two that demonstrates what you'd like to do with the consumer today that > > you can't because of restrictions around concurrent access, and this is > not > > already addressed in the KIP. Linking to a docs page on Kotlin coroutines > > is helpful but still requires reviewers to gather a lot of context on > their > > own that could more easily be provided in the KIP, and although the > > description of KAFKA-7143 is more detailed, I find it a little hard to > > follow as someone who isn't already familiar with the environment the > user > > is working in. > > > > It's also worth mentioning that what's proposed in the KIP is only > blocked > > by the private access modifier on the KafkaConsumer::acquire and > > KafkaConsumer::release methods. If we upgraded the visibility of these > > methods from private to protected, it would be possible for subclasses to > > implement the proposal in KIP-944, without any KIPs or other changes to > the > > official Java clients library. > > > > Best, > > > > Chris > > > > On Sat, Jul 22, 2023 at 4:24 AM Erik van Oosten > > <e.vanoos...@grons.nl.invalid> wrote: > > > >> Hi Matthias, > >> > >> I am getting a bit frustrated here. All the concerns and questions I > >> have seen so far are addressed in KIP-944. > >> > >> Please let me know if they are not clear enough, but please do not come > >> with FUD. > >> > >> Kind regards, > >> Erik. > >> > >> > >> Op 21-07-2023 om 21:13 schreef Matthias J. Sax: > >>> I am not a clients (or threading) expert, but I tend to agree to > >>> Colin's concerns. > >>> > >>> In particular, it would be nice to see an example how you intent to > >>> use the API (I am not familiar with Kotlin or it's co-routins), to > >>> better understand what this changes help to solve to begin with. > >>> > >>> Opening up the consumer sounds potentially dangerous and we should > >>> weight opportunity and risk before making a decision. So far, I see > >>> risks but do not understand the opportunity you are after. > >>> > >>> > >>> -Matthias > >>> > >>> On 7/14/23 11:43 AM, Kirk True wrote: > >>>> Hi Erik, > >>>> > >>>> Thanks for the KIP! > >>>> > >>>> I empathize with your frustration over the radio silence. It gets > >>>> like that sometimes, and I apologize for my lack of feedback. > >>>> > >>>> I’d personally like to see this lively exchange move over to the > >>>> DISCUSS thread you’d created before. > >>>> > >>>> Thanks, > >>>> Kirk > >>>> > >>>>> On Jul 14, 2023, at 1:33 AM, Erik van Oosten > >>>>> <e.vanoos...@grons.nl.INVALID> wrote: > >>>>> > >>>>> Hi Colin, > >>>>> > >>>>> The way I understood Philp's message is that KIP-944 also plays nice > >>>>> with KIP-945. But I might be mistaken. > >>>>> > >>>>> Regardless, KIP-945 does /not/ resolve the underlying problem (the > >>>>> need for nested consumer invocations) because it has the explicit > >>>>> goal of not changing the user facing API. > >>>>> > >>>>>> ... KIP-945 but haven't posted a DISCUSS thread yet > >>>>> There is a thread called 'KafkaConsumer refactor proposal', but > >>>>> indeed no official discussion yet. > >>>>> > >>>>>> I really don't want to be debugging complex interactions between > >>>>>> Java thread-local variables and green threads. > >>>>> In that email thread, I proposed an API change in which callbacks > >>>>> are no longer needed. The proposal completely removes the need for > >>>>> such complex interactions. In addition, it gives clients the ability > >>>>> to process at full speed even while a coorperative rebalance is > >>>>> ongoing. > >>>>> > >>>>> Regards, > >>>>> Erik. > >>>>> > >>>>> Op 14-07-2023 om 00:36 schreef Colin McCabe: > >>>>>> HI Philip & Erik, > >>>>>> > >>>>>> Hmm... if we agree that KIP-945 addresses this use case, I think it > >>>>>> would be better to just focus on that KIP. Fundamentally it's a > >>>>>> better and cleaner model than a complex scheme involving > >>>>>> thread-local variables. I really don't want to be debugging complex > >>>>>> interactions between Java thread-local variables and green threads. > >>>>>> > >>>>>> It also generally helps to have some use-cases in mind when writing > >>>>>> these things. If we get feedback about what would be useful for > >>>>>> async runtimes, that would probably help improve and focus KIP-945. > >>>>>> By the way, I can see you have a draft on the wiki for KIP-945 but > >>>>>> haven't posted a DISCUSS thread yet, so I assume it's not ready for > >>>>>> review yet ;) > >>>>>> > >>>>>> best, > >>>>>> Colin > >>>>>> > >>>>>> > >>>>>> On Tue, Jul 11, 2023, at 12:24, Philip Nee wrote: > >>>>>>> Hey Erik - Another thing I want to add to my comment is. We are > >>>>>>> in-process > >>>>>>> of re-writing the KafkaConsumer, and I think your proposal would > >>>>>>> work in > >>>>>>> the new consumer because we are going to separate the user thread > >>>>>>> and the > >>>>>>> background thread. Here is the 1-pager, and we are in process of > >>>>>>> converting this in to KIP-945. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> P > >>>>>>> > >>>>>>> On Tue, Jul 11, 2023 at 10:33 AM Philip Nee <philip...@gmail.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hey Erik, > >>>>>>>> > >>>>>>>> Sorry for holding up this email for a few days since Colin's > >>>>>>>> response > >>>>>>>> includes some of my concerns. I'm in favor of this KIP, and I > >>>>>>>> think your > >>>>>>>> approach seems safe. Of course, I probably missed something > >>>>>>>> therefore I > >>>>>>>> think this KIP needs to cover different use cases to demonstrate > >>>>>>>> it doesn't > >>>>>>>> cause any unsafe access. I think this can be demonstrated via > >>>>>>>> diagrams and > >>>>>>>> some code in the KIP. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> P > >>>>>>>> > >>>>>>>> On Sat, Jul 8, 2023 at 12:28 PM Erik van Oosten > >>>>>>>> <e.vanoos...@grons.nl.invalid> wrote: > >>>>>>>> > >>>>>>>>> Hello Colin, > >>>>>>>>> > >>>>>>>>> >> In KIP-944, the callback thread can only delegate to > >>>>>>>>> another thread > >>>>>>>>> after reading from and writing to a threadlocal variable, > >>>>>>>>> providing the > >>>>>>>>> barriers right there. > >>>>>>>>> > >>>>>>>>> > I don't see any documentation that accessing thread local > >>>>>>>>> variables > >>>>>>>>> provides a total store or load barrier. Do you have such > >>>>>>>>> documentation? > >>>>>>>>> It seems like if this were the case, we could eliminate volatile > >>>>>>>>> variables from most of the code base. > >>>>>>>>> > >>>>>>>>> Now I was imprecise. The thread-locals are only somewhat > >>>>>>>>> involved. In > >>>>>>>>> the KIP proposal the callback thread reads an access key from a > >>>>>>>>> thread-local variable. It then needs to pass that access key to > >>>>>>>>> another > >>>>>>>>> thread, which then can set it on its own thread-local variable. > >>>>>>>>> The act > >>>>>>>>> of passing a value from one thread to another implies that a > memory > >>>>>>>>> barrier needs to be passed. However, this is all not so relevant > >>>>>>>>> since > >>>>>>>>> there is no need to pass the access key back when the other > >>>>>>>>> thread is > >>>>>>>>> done. > >>>>>>>>> > >>>>>>>>> But now I think about it a bit more, the locking mechanism runs > >>>>>>>>> in a > >>>>>>>>> synchronized block. If I remember correctly this should be > >>>>>>>>> enough to > >>>>>>>>> pass read and write barriers. > >>>>>>>>> > >>>>>>>>> >> In the current implementation the consumer is also invoked > >>>>>>>>> from > >>>>>>>>> random threads. If it works now, it should continue to work. > >>>>>>>>> > I'm not sure what you're referring to. Can you expand on > this? > >>>>>>>>> > >>>>>>>>> Any invocation of the consumer (e.g. method poll) is not from a > >>>>>>>>> thread > >>>>>>>>> managed by the consumer. This is what I was assuming you meant > >>>>>>>>> with the > >>>>>>>>> term 'random thread'. > >>>>>>>>> > >>>>>>>>> > Hmm, not sure what you mean by "cooperate with blocking > >>>>>>>>> code." If you > >>>>>>>>> have 10 green threads you're multiplexing on to one CPU thread, > >>>>>>>>> and that > >>>>>>>>> CPU thread gets blocked because of what one green thread is > >>>>>>>>> doing, the > >>>>>>>>> other 9 green threads are blocked too, right? I guess it's > "just" a > >>>>>>>>> performance problem, but it still seems like it could be a > >>>>>>>>> serious one. > >>>>>>>>> > >>>>>>>>> There are several ways to deal with this. All async runtimes I > know > >>>>>>>>> (Akka, Zio, Cats-effects) support this by letting you mark a > >>>>>>>>> task as > >>>>>>>>> blocking. The runtime will then either schedule it to another > >>>>>>>>> thread-pool, or it will grow the thread-pool to accommodate. In > >>>>>>>>> any case > >>>>>>>>> 'the other 9 green threads' will simply be scheduled to another > >>>>>>>>> real > >>>>>>>>> thread. In addition, some of these runtimes detect long running > >>>>>>>>> tasks > >>>>>>>>> and will reschedule waiting tasks to another thread. This is all > >>>>>>>>> a bit > >>>>>>>>> off topic though. > >>>>>>>>> > >>>>>>>>> > I don't see why this has to be "inherently multi-threaded." > >>>>>>>>> Why can't > >>>>>>>>> we have the other threads report back what messages they've > >>>>>>>>> processed to > >>>>>>>>> the worker thread. Then it will be able to handle these callbacks > >>>>>>>>> without involving the other threads. > >>>>>>>>> > >>>>>>>>> Please consider the context which is that we are running inside > the > >>>>>>>>> callback of the rebalance listener. The only way to execute > >>>>>>>>> something > >>>>>>>>> and also have a timeout on it is to run the something on another > >>>>>>>>> thread. > >>>>>>>>> > >>>>>>>>> Kind regards, > >>>>>>>>> Erik. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Op 08-07-2023 om 19:17 schreef Colin McCabe: > >>>>>>>>>> On Sat, Jul 8, 2023, at 02:41, Erik van Oosten wrote: > >>>>>>>>>>> Hi Colin, > >>>>>>>>>>> > >>>>>>>>>>> Thanks for your thoughts and taking the time to reply. > >>>>>>>>>>> > >>>>>>>>>>> Let me take away your concerns. None of your worries are an > >>>>>>>>>>> issue with > >>>>>>>>>>> the algorithm described in KIP-944. Here it goes: > >>>>>>>>>>> > >>>>>>>>>>> > It's not clear ot me that it's safe to access the Kafka > >>>>>>>>>>> consumer or > >>>>>>>>>>>> producer concurrently from different threads. > >>>>>>>>>>> Concurrent access is /not/ a design goal of KIP-944. In fact, > >>>>>>>>>>> it goes > >>>>>>>>>>> through great lengths to make sure that this cannot happen. > >>>>>>>>>>> > >>>>>>>>>>> *The only design goal is to allow callbacks to call the > >>>>>>>>>>> consumer from > >>>>>>>>>>> another thread.* > >>>>>>>>>>> > >>>>>>>>>>> To make sure there are no more misunderstandings about this, I > >>>>>>>>>>> have > >>>>>>>>>>> added this goal to the KIP. > >>>>>>>>>>> > >>>>>>>>>> Hi Erik, > >>>>>>>>>> > >>>>>>>>>> Sorry, I spoke imprecisely. My concern is not concurrent > >>>>>>>>>> access, but > >>>>>>>>> multithreaded access in general. Basically cache line visibility > >>>>>>>>> issues. > >>>>>>>>>>> > This is true even if the accesses happen at different > times, > >>>>>>>>> because > >>>>>>>>>>>> modern CPUs require memory barriers to guarantee inter-thread > >>>>>>>>> visibilty > >>>>>>>>>>>> of loads and stores. > >>>>>>>>>>> In KIP-944, the callback thread can only delegate to another > >>>>>>>>>>> thread > >>>>>>>>>>> after reading from and writing to a threadlocal variable, > >>>>>>>>>>> providing the > >>>>>>>>>>> barriers right there. > >>>>>>>>>>> > >>>>>>>>>> I don't see any documentation that accessing thread local > >>>>>>>>>> variables > >>>>>>>>> provides a total store or load barrier. Do you have such > >>>>>>>>> documentation? It > >>>>>>>>> seems like if this were the case, we could eliminate volatile > >>>>>>>>> variables > >>>>>>>>> from most of the code base. > >>>>>>>>>>> > I know that there are at least a few locks in the > >>>>>>>>>>> consumer code > >>>>>>>>> now, > >>>>>>>>>>>> due to our need to send heartbeats from a worker thread. I > >>>>>>>>>>>> don't think > >>>>>>>>>>>> those would be sufficient to protect a client that is making > >>>>>>>>>>>> calls > >>>>>>>>> from > >>>>>>>>>>>> random threads. > >>>>>>>>>>> In the current implementation the consumer is also invoked > >>>>>>>>>>> from random > >>>>>>>>>>> threads. If it works now, it should continue to work. > >>>>>>>>>>> > >>>>>>>>>> I'm not sure what you're referring to. Can you expand on this? > >>>>>>>>>> > >>>>>>>>>>> > There has been some discussion of moving to a more > >>>>>>>>>>> traditional > >>>>>>>>> model > >>>>>>>>>>>> where people make calls to the client and the clients passes > >>>>>>>>>>>> the given > >>>>>>>>>>>> data to a single background worker thread. This would avoid a > >>>>>>>>>>>> lot lof > >>>>>>>>>>>> the footguns of the current model and probably better reflect > >>>>>>>>>>>> how > >>>>>>>>> people > >>>>>>>>>>>> actually use the client. > >>>>>>>>>>> That is awesome. However, I'd rather not wait for that. > >>>>>>>>>>> > >>>>>>>>>>> > Another issue is that neither the producer nor the > >>>>>>>>>>> consumer is > >>>>>>>>> fully > >>>>>>>>>>>> nonblocking. There are some corner cases where we do in fact > >>>>>>>>>>>> block. > >>>>>>>>> From > >>>>>>>>>>>> memory, the producer blocks in some "buffer full" cases, and > the > >>>>>>>>>>>> consumer blocks sometimes when fetching metadata. > >>>>>>>>>>> I am aware of that. This is not an issue; all async runtimes > can > >>>>>>>>>>> cooperate with blocking code. > >>>>>>>>>>> > >>>>>>>>>> Hmm, not sure what you mean by "cooperate with blocking code." > >>>>>>>>>> If you > >>>>>>>>> have 10 green threads you're multiplexing on to one CPU thread, > >>>>>>>>> and that > >>>>>>>>> CPU thread gets blocked because of what one green thread is > >>>>>>>>> doing, the > >>>>>>>>> other 9 green threads are blocked too, right? I guess it's > "just" a > >>>>>>>>> performance problem, but it still seems like it could be a > >>>>>>>>> serious one. > >>>>>>>>>>> > I suspect it would be more appropriate for Kotlin > >>>>>>>>>>> coroutines, Zio > >>>>>>>>>>>> coroutines and so on to adopt this "pass messages to and from > a > >>>>>>>>>>>> background worker thread" model than to try to re-engineer > >>>>>>>>>>>> the Kafka > >>>>>>>>>>>> client ot work from random threads. > >>>>>>>>>>> In both zio-kafka and fs2-kafka this is already the approach > >>>>>>>>>>> we are > >>>>>>>>> taking. > >>>>>>>>>>> Unfortunately, the Kafka consumer forces us to perform some > >>>>>>>>>>> work in > >>>>>>>>>>> callbacks: > >>>>>>>>>>> > >>>>>>>>>>> * commit completed callback: register that the callback is > >>>>>>>>>>> complete, > >>>>>>>>>>> * partition revoked callback: in this callback we need to > >>>>>>>>>>> submit > >>>>>>>>>>> commits from everything consumed and processed so far, > >>>>>>>>>>> using > >>>>>>>>>>> timeouts if processing takes to long. In an async > >>>>>>>>>>> runtime, this is > >>>>>>>>>>> an inherently multi-threaded process. Especially, we > >>>>>>>>>>> cannot do > >>>>>>>>>>> timeouts without involving multiple threads. > >>>>>>>>>>> > >>>>>>>>>> I don't see why this has to be "inherently multi-threaded." Why > >>>>>>>>>> can't > >>>>>>>>> we have the other threads report back what messages they've > >>>>>>>>> processed to > >>>>>>>>> the worker thread. Then it will be able to handle these > >>>>>>>>> callbacks without > >>>>>>>>> involving the other threads. > >>>>>>>>>> regards, > >>>>>>>>>> Colin > >>>>>>>>>> > >>>>>>>>>>> I have extended the KIP's motivation to explain the major use > >>>>>>>>>>> case. > >>>>>>>>>>> > >>>>>>>>>>> Please read KIP-944 again. Even though the description is > >>>>>>>>>>> extensive > >>>>>>>>>>> (this callback from callback stuff is tricky), you will find > >>>>>>>>>>> that my > >>>>>>>>>>> goals are modest. > >>>>>>>>>>> > >>>>>>>>>>> Also the implementation is just a few lines. With > >>>>>>>>>>> understanding of the > >>>>>>>>>>> idea it should not be a lot of work to follow it. > >>>>>>>>>>> > >>>>>>>>>>> Kind regards, > >>>>>>>>>>> Erik. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Op 07-07-2023 om 19:57 schreef Colin McCabe: > >>>>>>>>>>>> Hi Erik, > >>>>>>>>>>>> > >>>>>>>>>>>> It's not clear ot me that it's safe to access the Kafka > >>>>>>>>>>>> consumer or > >>>>>>>>> producer concurrently from different threads. There are data > >>>>>>>>> structures > >>>>>>>>> that aren't protected by locks, so I wouldn't necessarily expect > >>>>>>>>> accessing > >>>>>>>>> and mutating them in a concurrent way to work. This is true even > >>>>>>>>> if the > >>>>>>>>> accesses happen at different times, because modern CPUs require > >>>>>>>>> memory > >>>>>>>>> barriers to guarantee inter-thread visibilty of loads and stores. > >>>>>>>>>>>> I am writing this is without doing a detailed dive into the > >>>>>>>>>>>> code (I > >>>>>>>>> haven't been into the consumer / producer code in a bit.) > >>>>>>>>> Someone who has > >>>>>>>>> worked more on the consumer recently might be able to give > specific > >>>>>>>>> examples of things that wouldn't work. > >>>>>>>>>>>> I know that there are at least a few locks in the consumer > >>>>>>>>>>>> code now, > >>>>>>>>> due to our need to send heartbeats from a worker thread. I don't > >>>>>>>>> think > >>>>>>>>> those would be sufficient to protect a client that is making > >>>>>>>>> calls from > >>>>>>>>> random threads. > >>>>>>>>>>>> There has been some discussion of moving to a more > >>>>>>>>>>>> traditional model > >>>>>>>>> where people make calls to the client and the clients passes the > >>>>>>>>> given data > >>>>>>>>> to a single background worker thread. This would avoid a lot lof > >>>>>>>>> the > >>>>>>>>> footguns of the current model and probably better reflect how > >>>>>>>>> people > >>>>>>>>> actually use the client. > >>>>>>>>>>>> Another issue is that neither the producer nor the consumer > >>>>>>>>>>>> is fully > >>>>>>>>> nonblocking. There are some corner cases where we do in fact > >>>>>>>>> block. From > >>>>>>>>> memory, the producer blocks in some "buffer full" cases, and the > >>>>>>>>> consumer > >>>>>>>>> blocks sometimes when fetching metadata. > >>>>>>>>>>>> I suspect it would be more appropriate for Kotlin coroutines, > >>>>>>>>>>>> Zio > >>>>>>>>> coroutines and so on to adopt this "pass messages to and from a > >>>>>>>>> background > >>>>>>>>> worker thread" model than to try to re-engineer the Kafka > >>>>>>>>> client ot work > >>>>>>>>> from random threads. > >>>>>>>>>>>> There is actually somed good advice about how to handle > >>>>>>>>>>>> multiple > >>>>>>>>> threads in the KafkaConsumer.java header file itself. Check the > >>>>>>>>> sections > >>>>>>>>> "One Consumer Per Thread" and "Decouple Consumption and > >>>>>>>>> Processing." What > >>>>>>>>> I'm recommending here is essentially the latter. > >>>>>>>>>>>> I do understand that it's frustrating to not get a quick > >>>>>>>>>>>> response. > >>>>>>>>> However, overall I think this one needs a lot more discussion > >>>>>>>>> before > >>>>>>>>> getting anywhere near a vote. I will leave a -1 just as a > >>>>>>>>> procedural step. > >>>>>>>>> Maybe some of the people working in the client area can also > >>>>>>>>> chime in. > >>>>>>>>>>>> best, > >>>>>>>>>>>> Colin > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On Thu, Jul 6, 2023, at 12:02, Erik van Oosten wrote: > >>>>>>>>>>>>> Dear PMCs, > >>>>>>>>>>>>> > >>>>>>>>>>>>> So far there have been 0 responses to KIP-944. I understand > >>>>>>>>>>>>> this may > >>>>>>>>> not > >>>>>>>>>>>>> be something that keeps you busy, but this KIP is important > >>>>>>>>>>>>> to people > >>>>>>>>>>>>> that use async runtimes like Zio, Cats and Kotlin. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Is there anything you need to come to a decision? > >>>>>>>>>>>>> > >>>>>>>>>>>>> Kind regards, > >>>>>>>>>>>>> Erik. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Op 05-07-2023 om 11:38 schreef Erik van Oosten: > >>>>>>>>>>>>>> Hello all, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I'd like to call a vote on KIP-944 Support async runtimes in > >>>>>>>>> consumer. > >>>>>>>>>>>>>> It has has been 'under discussion' for 7 days now. 'Under > >>>>>>>>> discussion' > >>>>>>>>>>>>>> between quotes, because there were 0 comments so far. I > >>>>>>>>>>>>>> hope the KIP > >>>>>>>>>>>>>> is clear! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> KIP description: > https://cwiki.apache.org/confluence/x/chw0Dw > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Kind regards, > >>>>>>>>>>>>>> Erik. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >> -- > >> Erik van Oosten > >> e.vanoos...@grons.nl > >> https://day-to-day-stuff.blogspot.com > >> > >> > -- > Erik van Oosten > e.vanoos...@grons.nl > https://day-to-day-stuff.blogspot.com > >