Hi Erik, Thanks for the KIP!
I empathize with your frustration over the radio silence. It gets like that sometimes, and I apologize for my lack of feedback. I’d personally like to see this lively exchange move over to the DISCUSS thread you’d created before. Thanks, Kirk > On Jul 14, 2023, at 1:33 AM, Erik van Oosten <e.vanoos...@grons.nl.INVALID> > wrote: > > Hi Colin, > > The way I understood Philp's message is that KIP-944 also plays nice with > KIP-945. But I might be mistaken. > > Regardless, KIP-945 does /not/ resolve the underlying problem (the need for > nested consumer invocations) because it has the explicit goal of not changing > the user facing API. > > > ... KIP-945 but haven't posted a DISCUSS thread yet > > There is a thread called 'KafkaConsumer refactor proposal', but indeed no > official discussion yet. > > > I really don't want to be debugging complex interactions between Java > > thread-local variables and green threads. > > In that email thread, I proposed an API change in which callbacks are no > longer needed. The proposal completely removes the need for such complex > interactions. In addition, it gives clients the ability to process at full > speed even while a coorperative rebalance is ongoing. > > Regards, > Erik. > > Op 14-07-2023 om 00:36 schreef Colin McCabe: >> HI Philip & Erik, >> >> Hmm... if we agree that KIP-945 addresses this use case, I think it would be >> better to just focus on that KIP. Fundamentally it's a better and cleaner >> model than a complex scheme involving thread-local variables. I really don't >> want to be debugging complex interactions between Java thread-local >> variables and green threads. >> >> It also generally helps to have some use-cases in mind when writing these >> things. If we get feedback about what would be useful for async runtimes, >> that would probably help improve and focus KIP-945. By the way, I can see >> you have a draft on the wiki for KIP-945 but haven't posted a DISCUSS thread >> yet, so I assume it's not ready for review yet ;) >> >> best, >> Colin >> >> >> On Tue, Jul 11, 2023, at 12:24, Philip Nee wrote: >>> Hey Erik - Another thing I want to add to my comment is. We are in-process >>> of re-writing the KafkaConsumer, and I think your proposal would work in >>> the new consumer because we are going to separate the user thread and the >>> background thread. Here is the 1-pager, and we are in process of >>> converting this in to KIP-945. >>> >>> Thanks, >>> P >>> >>> On Tue, Jul 11, 2023 at 10:33 AM Philip Nee <philip...@gmail.com> wrote: >>> >>>> Hey Erik, >>>> >>>> Sorry for holding up this email for a few days since Colin's response >>>> includes some of my concerns. I'm in favor of this KIP, and I think your >>>> approach seems safe. Of course, I probably missed something therefore I >>>> think this KIP needs to cover different use cases to demonstrate it doesn't >>>> cause any unsafe access. I think this can be demonstrated via diagrams and >>>> some code in the KIP. >>>> >>>> Thanks, >>>> P >>>> >>>> On Sat, Jul 8, 2023 at 12:28 PM Erik van Oosten >>>> <e.vanoos...@grons.nl.invalid> wrote: >>>> >>>>> Hello Colin, >>>>> >>>>> >> In KIP-944, the callback thread can only delegate to another thread >>>>> after reading from and writing to a threadlocal variable, providing the >>>>> barriers right there. >>>>> >>>>> > I don't see any documentation that accessing thread local variables >>>>> provides a total store or load barrier. Do you have such documentation? >>>>> It seems like if this were the case, we could eliminate volatile >>>>> variables from most of the code base. >>>>> >>>>> Now I was imprecise. The thread-locals are only somewhat involved. In >>>>> the KIP proposal the callback thread reads an access key from a >>>>> thread-local variable. It then needs to pass that access key to another >>>>> thread, which then can set it on its own thread-local variable. The act >>>>> of passing a value from one thread to another implies that a memory >>>>> barrier needs to be passed. However, this is all not so relevant since >>>>> there is no need to pass the access key back when the other thread is >>>>> done. >>>>> >>>>> But now I think about it a bit more, the locking mechanism runs in a >>>>> synchronized block. If I remember correctly this should be enough to >>>>> pass read and write barriers. >>>>> >>>>> >> In the current implementation the consumer is also invoked from >>>>> random threads. If it works now, it should continue to work. >>>>> > I'm not sure what you're referring to. Can you expand on this? >>>>> >>>>> Any invocation of the consumer (e.g. method poll) is not from a thread >>>>> managed by the consumer. This is what I was assuming you meant with the >>>>> term 'random thread'. >>>>> >>>>> > Hmm, not sure what you mean by "cooperate with blocking code." If you >>>>> have 10 green threads you're multiplexing on to one CPU thread, and that >>>>> CPU thread gets blocked because of what one green thread is doing, the >>>>> other 9 green threads are blocked too, right? I guess it's "just" a >>>>> performance problem, but it still seems like it could be a serious one. >>>>> >>>>> There are several ways to deal with this. All async runtimes I know >>>>> (Akka, Zio, Cats-effects) support this by letting you mark a task as >>>>> blocking. The runtime will then either schedule it to another >>>>> thread-pool, or it will grow the thread-pool to accommodate. In any case >>>>> 'the other 9 green threads' will simply be scheduled to another real >>>>> thread. In addition, some of these runtimes detect long running tasks >>>>> and will reschedule waiting tasks to another thread. This is all a bit >>>>> off topic though. >>>>> >>>>> > I don't see why this has to be "inherently multi-threaded." Why can't >>>>> we have the other threads report back what messages they've processed to >>>>> the worker thread. Then it will be able to handle these callbacks >>>>> without involving the other threads. >>>>> >>>>> Please consider the context which is that we are running inside the >>>>> callback of the rebalance listener. The only way to execute something >>>>> and also have a timeout on it is to run the something on another thread. >>>>> >>>>> Kind regards, >>>>> Erik. >>>>> >>>>> >>>>> Op 08-07-2023 om 19:17 schreef Colin McCabe: >>>>>> On Sat, Jul 8, 2023, at 02:41, Erik van Oosten wrote: >>>>>>> Hi Colin, >>>>>>> >>>>>>> Thanks for your thoughts and taking the time to reply. >>>>>>> >>>>>>> Let me take away your concerns. None of your worries are an issue with >>>>>>> the algorithm described in KIP-944. Here it goes: >>>>>>> >>>>>>> > It's not clear ot me that it's safe to access the Kafka consumer or >>>>>>>> producer concurrently from different threads. >>>>>>> Concurrent access is /not/ a design goal of KIP-944. In fact, it goes >>>>>>> through great lengths to make sure that this cannot happen. >>>>>>> >>>>>>> *The only design goal is to allow callbacks to call the consumer from >>>>>>> another thread.* >>>>>>> >>>>>>> To make sure there are no more misunderstandings about this, I have >>>>>>> added this goal to the KIP. >>>>>>> >>>>>> Hi Erik, >>>>>> >>>>>> Sorry, I spoke imprecisely. My concern is not concurrent access, but >>>>> multithreaded access in general. Basically cache line visibility issues. >>>>>>> > This is true even if the accesses happen at different times, >>>>> because >>>>>>>> modern CPUs require memory barriers to guarantee inter-thread >>>>> visibilty >>>>>>>> of loads and stores. >>>>>>> In KIP-944, the callback thread can only delegate to another thread >>>>>>> after reading from and writing to a threadlocal variable, providing the >>>>>>> barriers right there. >>>>>>> >>>>>> I don't see any documentation that accessing thread local variables >>>>> provides a total store or load barrier. Do you have such documentation? It >>>>> seems like if this were the case, we could eliminate volatile variables >>>>> from most of the code base. >>>>>>> > I know that there are at least a few locks in the consumer code >>>>> now, >>>>>>>> due to our need to send heartbeats from a worker thread. I don't think >>>>>>>> those would be sufficient to protect a client that is making calls >>>>> from >>>>>>>> random threads. >>>>>>> In the current implementation the consumer is also invoked from random >>>>>>> threads. If it works now, it should continue to work. >>>>>>> >>>>>> I'm not sure what you're referring to. Can you expand on this? >>>>>> >>>>>>> > There has been some discussion of moving to a more traditional >>>>> model >>>>>>>> where people make calls to the client and the clients passes the given >>>>>>>> data to a single background worker thread. This would avoid a lot lof >>>>>>>> the footguns of the current model and probably better reflect how >>>>> people >>>>>>>> actually use the client. >>>>>>> That is awesome. However, I'd rather not wait for that. >>>>>>> >>>>>>> > Another issue is that neither the producer nor the consumer is >>>>> fully >>>>>>>> nonblocking. There are some corner cases where we do in fact block. >>>>> From >>>>>>>> memory, the producer blocks in some "buffer full" cases, and the >>>>>>>> consumer blocks sometimes when fetching metadata. >>>>>>> I am aware of that. This is not an issue; all async runtimes can >>>>>>> cooperate with blocking code. >>>>>>> >>>>>> Hmm, not sure what you mean by "cooperate with blocking code." If you >>>>> have 10 green threads you're multiplexing on to one CPU thread, and that >>>>> CPU thread gets blocked because of what one green thread is doing, the >>>>> other 9 green threads are blocked too, right? I guess it's "just" a >>>>> performance problem, but it still seems like it could be a serious one. >>>>>>> > I suspect it would be more appropriate for Kotlin coroutines, Zio >>>>>>>> coroutines and so on to adopt this "pass messages to and from a >>>>>>>> background worker thread" model than to try to re-engineer the Kafka >>>>>>>> client ot work from random threads. >>>>>>> In both zio-kafka and fs2-kafka this is already the approach we are >>>>> taking. >>>>>>> Unfortunately, the Kafka consumer forces us to perform some work in >>>>>>> callbacks: >>>>>>> >>>>>>> * commit completed callback: register that the callback is complete, >>>>>>> * partition revoked callback: in this callback we need to submit >>>>>>> commits from everything consumed and processed so far, using >>>>>>> timeouts if processing takes to long. In an async runtime, this is >>>>>>> an inherently multi-threaded process. Especially, we cannot do >>>>>>> timeouts without involving multiple threads. >>>>>>> >>>>>> I don't see why this has to be "inherently multi-threaded." Why can't >>>>> we have the other threads report back what messages they've processed to >>>>> the worker thread. Then it will be able to handle these callbacks without >>>>> involving the other threads. >>>>>> regards, >>>>>> Colin >>>>>> >>>>>>> I have extended the KIP's motivation to explain the major use case. >>>>>>> >>>>>>> Please read KIP-944 again. Even though the description is extensive >>>>>>> (this callback from callback stuff is tricky), you will find that my >>>>>>> goals are modest. >>>>>>> >>>>>>> Also the implementation is just a few lines. With understanding of the >>>>>>> idea it should not be a lot of work to follow it. >>>>>>> >>>>>>> Kind regards, >>>>>>> Erik. >>>>>>> >>>>>>> >>>>>>> Op 07-07-2023 om 19:57 schreef Colin McCabe: >>>>>>>> Hi Erik, >>>>>>>> >>>>>>>> It's not clear ot me that it's safe to access the Kafka consumer or >>>>> producer concurrently from different threads. There are data structures >>>>> that aren't protected by locks, so I wouldn't necessarily expect accessing >>>>> and mutating them in a concurrent way to work. This is true even if the >>>>> accesses happen at different times, because modern CPUs require memory >>>>> barriers to guarantee inter-thread visibilty of loads and stores. >>>>>>>> I am writing this is without doing a detailed dive into the code (I >>>>> haven't been into the consumer / producer code in a bit.) Someone who has >>>>> worked more on the consumer recently might be able to give specific >>>>> examples of things that wouldn't work. >>>>>>>> I know that there are at least a few locks in the consumer code now, >>>>> due to our need to send heartbeats from a worker thread. I don't think >>>>> those would be sufficient to protect a client that is making calls from >>>>> random threads. >>>>>>>> There has been some discussion of moving to a more traditional model >>>>> where people make calls to the client and the clients passes the given >>>>> data >>>>> to a single background worker thread. This would avoid a lot lof the >>>>> footguns of the current model and probably better reflect how people >>>>> actually use the client. >>>>>>>> Another issue is that neither the producer nor the consumer is fully >>>>> nonblocking. There are some corner cases where we do in fact block. From >>>>> memory, the producer blocks in some "buffer full" cases, and the consumer >>>>> blocks sometimes when fetching metadata. >>>>>>>> I suspect it would be more appropriate for Kotlin coroutines, Zio >>>>> coroutines and so on to adopt this "pass messages to and from a background >>>>> worker thread" model than to try to re-engineer the Kafka client ot work >>>>> from random threads. >>>>>>>> There is actually somed good advice about how to handle multiple >>>>> threads in the KafkaConsumer.java header file itself. Check the sections >>>>> "One Consumer Per Thread" and "Decouple Consumption and Processing." What >>>>> I'm recommending here is essentially the latter. >>>>>>>> I do understand that it's frustrating to not get a quick response. >>>>> However, overall I think this one needs a lot more discussion before >>>>> getting anywhere near a vote. I will leave a -1 just as a procedural step. >>>>> Maybe some of the people working in the client area can also chime in. >>>>>>>> best, >>>>>>>> Colin >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jul 6, 2023, at 12:02, Erik van Oosten wrote: >>>>>>>>> Dear PMCs, >>>>>>>>> >>>>>>>>> So far there have been 0 responses to KIP-944. I understand this may >>>>> not >>>>>>>>> be something that keeps you busy, but this KIP is important to people >>>>>>>>> that use async runtimes like Zio, Cats and Kotlin. >>>>>>>>> >>>>>>>>> Is there anything you need to come to a decision? >>>>>>>>> >>>>>>>>> Kind regards, >>>>>>>>> Erik. >>>>>>>>> >>>>>>>>> >>>>>>>>> Op 05-07-2023 om 11:38 schreef Erik van Oosten: >>>>>>>>>> Hello all, >>>>>>>>>> >>>>>>>>>> I'd like to call a vote on KIP-944 Support async runtimes in >>>>> consumer. >>>>>>>>>> It has has been 'under discussion' for 7 days now. 'Under >>>>> discussion' >>>>>>>>>> between quotes, because there were 0 comments so far. I hope the KIP >>>>>>>>>> is clear! >>>>>>>>>> >>>>>>>>>> KIP description:https://cwiki.apache.org/confluence/x/chw0Dw >>>>>>>>>> >>>>>>>>>> Kind regards, >>>>>>>>>> Erik. >>>>>>>>>> >>>>>>>>>> > -- > Erik van Oosten > e.vanoos...@grons.nl > https://day-to-day-stuff.blogspot.com >