Hey Erik, Sorry for holding up this email for a few days since Colin's response includes some of my concerns. I'm in favor of this KIP, and I think your approach seems safe. Of course, I probably missed something therefore I think this KIP needs to cover different use cases to demonstrate it doesn't cause any unsafe access. I think this can be demonstrated via diagrams and some code in the KIP.
Thanks, P On Sat, Jul 8, 2023 at 12:28 PM Erik van Oosten <e.vanoos...@grons.nl.invalid> wrote: > Hello Colin, > > >> In KIP-944, the callback thread can only delegate to another thread > after reading from and writing to a threadlocal variable, providing the > barriers right there. > > > I don't see any documentation that accessing thread local variables > provides a total store or load barrier. Do you have such documentation? > It seems like if this were the case, we could eliminate volatile > variables from most of the code base. > > Now I was imprecise. The thread-locals are only somewhat involved. In > the KIP proposal the callback thread reads an access key from a > thread-local variable. It then needs to pass that access key to another > thread, which then can set it on its own thread-local variable. The act > of passing a value from one thread to another implies that a memory > barrier needs to be passed. However, this is all not so relevant since > there is no need to pass the access key back when the other thread is done. > > But now I think about it a bit more, the locking mechanism runs in a > synchronized block. If I remember correctly this should be enough to > pass read and write barriers. > > >> In the current implementation the consumer is also invoked from > random threads. If it works now, it should continue to work. > > I'm not sure what you're referring to. Can you expand on this? > > Any invocation of the consumer (e.g. method poll) is not from a thread > managed by the consumer. This is what I was assuming you meant with the > term 'random thread'. > > > Hmm, not sure what you mean by "cooperate with blocking code." If you > have 10 green threads you're multiplexing on to one CPU thread, and that > CPU thread gets blocked because of what one green thread is doing, the > other 9 green threads are blocked too, right? I guess it's "just" a > performance problem, but it still seems like it could be a serious one. > > There are several ways to deal with this. All async runtimes I know > (Akka, Zio, Cats-effects) support this by letting you mark a task as > blocking. The runtime will then either schedule it to another > thread-pool, or it will grow the thread-pool to accommodate. In any case > 'the other 9 green threads' will simply be scheduled to another real > thread. In addition, some of these runtimes detect long running tasks > and will reschedule waiting tasks to another thread. This is all a bit > off topic though. > > > I don't see why this has to be "inherently multi-threaded." Why can't > we have the other threads report back what messages they've processed to > the worker thread. Then it will be able to handle these callbacks > without involving the other threads. > > Please consider the context which is that we are running inside the > callback of the rebalance listener. The only way to execute something > and also have a timeout on it is to run the something on another thread. > > Kind regards, > Erik. > > > Op 08-07-2023 om 19:17 schreef Colin McCabe: > > On Sat, Jul 8, 2023, at 02:41, Erik van Oosten wrote: > >> Hi Colin, > >> > >> Thanks for your thoughts and taking the time to reply. > >> > >> Let me take away your concerns. None of your worries are an issue with > >> the algorithm described in KIP-944. Here it goes: > >> > >> > It's not clear ot me that it's safe to access the Kafka consumer or > >>> producer concurrently from different threads. > >> Concurrent access is /not/ a design goal of KIP-944. In fact, it goes > >> through great lengths to make sure that this cannot happen. > >> > >> *The only design goal is to allow callbacks to call the consumer from > >> another thread.* > >> > >> To make sure there are no more misunderstandings about this, I have > >> added this goal to the KIP. > >> > > Hi Erik, > > > > Sorry, I spoke imprecisely. My concern is not concurrent access, but > multithreaded access in general. Basically cache line visibility issues. > > > >> > This is true even if the accesses happen at different times, because > >>> modern CPUs require memory barriers to guarantee inter-thread visibilty > >>> of loads and stores. > >> In KIP-944, the callback thread can only delegate to another thread > >> after reading from and writing to a threadlocal variable, providing the > >> barriers right there. > >> > > I don't see any documentation that accessing thread local variables > provides a total store or load barrier. Do you have such documentation? It > seems like if this were the case, we could eliminate volatile variables > from most of the code base. > > > >> > I know that there are at least a few locks in the consumer code now, > >>> due to our need to send heartbeats from a worker thread. I don't think > >>> those would be sufficient to protect a client that is making calls from > >>> random threads. > >> In the current implementation the consumer is also invoked from random > >> threads. If it works now, it should continue to work. > >> > > I'm not sure what you're referring to. Can you expand on this? > > > >> > There has been some discussion of moving to a more traditional model > >>> where people make calls to the client and the clients passes the given > >>> data to a single background worker thread. This would avoid a lot lof > >>> the footguns of the current model and probably better reflect how > people > >>> actually use the client. > >> That is awesome. However, I'd rather not wait for that. > >> > >> > Another issue is that neither the producer nor the consumer is fully > >>> nonblocking. There are some corner cases where we do in fact block. > From > >>> memory, the producer blocks in some "buffer full" cases, and the > >>> consumer blocks sometimes when fetching metadata. > >> I am aware of that. This is not an issue; all async runtimes can > >> cooperate with blocking code. > >> > > Hmm, not sure what you mean by "cooperate with blocking code." If you > have 10 green threads you're multiplexing on to one CPU thread, and that > CPU thread gets blocked because of what one green thread is doing, the > other 9 green threads are blocked too, right? I guess it's "just" a > performance problem, but it still seems like it could be a serious one. > > > >> > I suspect it would be more appropriate for Kotlin coroutines, Zio > >>> coroutines and so on to adopt this "pass messages to and from a > >>> background worker thread" model than to try to re-engineer the Kafka > >>> client ot work from random threads. > >> In both zio-kafka and fs2-kafka this is already the approach we are > taking. > >> > >> Unfortunately, the Kafka consumer forces us to perform some work in > >> callbacks: > >> > >> * commit completed callback: register that the callback is complete, > >> * partition revoked callback: in this callback we need to submit > >> commits from everything consumed and processed so far, using > >> timeouts if processing takes to long. In an async runtime, this is > >> an inherently multi-threaded process. Especially, we cannot do > >> timeouts without involving multiple threads. > >> > > I don't see why this has to be "inherently multi-threaded." Why can't we > have the other threads report back what messages they've processed to the > worker thread. Then it will be able to handle these callbacks without > involving the other threads. > > > > regards, > > Colin > > > >> I have extended the KIP's motivation to explain the major use case. > >> > >> Please read KIP-944 again. Even though the description is extensive > >> (this callback from callback stuff is tricky), you will find that my > >> goals are modest. > >> > >> Also the implementation is just a few lines. With understanding of the > >> idea it should not be a lot of work to follow it. > >> > >> Kind regards, > >> Erik. > >> > >> > >> Op 07-07-2023 om 19:57 schreef Colin McCabe: > >>> Hi Erik, > >>> > >>> It's not clear ot me that it's safe to access the Kafka consumer or > producer concurrently from different threads. There are data structures > that aren't protected by locks, so I wouldn't necessarily expect accessing > and mutating them in a concurrent way to work. This is true even if the > accesses happen at different times, because modern CPUs require memory > barriers to guarantee inter-thread visibilty of loads and stores. > >>> > >>> I am writing this is without doing a detailed dive into the code (I > haven't been into the consumer / producer code in a bit.) Someone who has > worked more on the consumer recently might be able to give specific > examples of things that wouldn't work. > >>> > >>> I know that there are at least a few locks in the consumer code now, > due to our need to send heartbeats from a worker thread. I don't think > those would be sufficient to protect a client that is making calls from > random threads. > >>> > >>> There has been some discussion of moving to a more traditional model > where people make calls to the client and the clients passes the given data > to a single background worker thread. This would avoid a lot lof the > footguns of the current model and probably better reflect how people > actually use the client. > >>> > >>> Another issue is that neither the producer nor the consumer is fully > nonblocking. There are some corner cases where we do in fact block. From > memory, the producer blocks in some "buffer full" cases, and the consumer > blocks sometimes when fetching metadata. > >>> > >>> I suspect it would be more appropriate for Kotlin coroutines, Zio > coroutines and so on to adopt this "pass messages to and from a background > worker thread" model than to try to re-engineer the Kafka client ot work > from random threads. > >>> > >>> There is actually somed good advice about how to handle multiple > threads in the KafkaConsumer.java header file itself. Check the sections > "One Consumer Per Thread" and "Decouple Consumption and Processing." What > I'm recommending here is essentially the latter. > >>> > >>> I do understand that it's frustrating to not get a quick response. > However, overall I think this one needs a lot more discussion before > getting anywhere near a vote. I will leave a -1 just as a procedural step. > Maybe some of the people working in the client area can also chime in. > >>> > >>> best, > >>> Colin > >>> > >>> > >>> On Thu, Jul 6, 2023, at 12:02, Erik van Oosten wrote: > >>>> Dear PMCs, > >>>> > >>>> So far there have been 0 responses to KIP-944. I understand this may > not > >>>> be something that keeps you busy, but this KIP is important to people > >>>> that use async runtimes like Zio, Cats and Kotlin. > >>>> > >>>> Is there anything you need to come to a decision? > >>>> > >>>> Kind regards, > >>>> Erik. > >>>> > >>>> > >>>> Op 05-07-2023 om 11:38 schreef Erik van Oosten: > >>>>> Hello all, > >>>>> > >>>>> I'd like to call a vote on KIP-944 Support async runtimes in > consumer. > >>>>> It has has been 'under discussion' for 7 days now. 'Under discussion' > >>>>> between quotes, because there were 0 comments so far. I hope the KIP > >>>>> is clear! > >>>>> > >>>>> KIP description:https://cwiki.apache.org/confluence/x/chw0Dw > >>>>> > >>>>> Kind regards, > >>>>> Erik. > >>>>> > >>>>> > >> -- > >> Erik van Oosten > >> e.vanoos...@grons.nl > >> https://day-to-day-stuff.blogspot.com > > -- > Erik van Oosten > e.vanoos...@grons.nl > https://day-to-day-stuff.blogspot.com > >