Hello Colin,
>> In KIP-944, the callback thread can only delegate to another
thread
after reading from and writing to a threadlocal variable, providing the
barriers right there.
> I don't see any documentation that accessing thread local variables
provides a total store or load barrier. Do you have such documentation?
It seems like if this were the case, we could eliminate volatile
variables from most of the code base.
Now I was imprecise. The thread-locals are only somewhat involved. In
the KIP proposal the callback thread reads an access key from a
thread-local variable. It then needs to pass that access key to another
thread, which then can set it on its own thread-local variable. The act
of passing a value from one thread to another implies that a memory
barrier needs to be passed. However, this is all not so relevant since
there is no need to pass the access key back when the other thread
is done.
But now I think about it a bit more, the locking mechanism runs in a
synchronized block. If I remember correctly this should be enough to
pass read and write barriers.
>> In the current implementation the consumer is also invoked from
random threads. If it works now, it should continue to work.
> I'm not sure what you're referring to. Can you expand on this?
Any invocation of the consumer (e.g. method poll) is not from a thread
managed by the consumer. This is what I was assuming you meant with the
term 'random thread'.
> Hmm, not sure what you mean by "cooperate with blocking code."
If you
have 10 green threads you're multiplexing on to one CPU thread, and
that
CPU thread gets blocked because of what one green thread is doing, the
other 9 green threads are blocked too, right? I guess it's "just" a
performance problem, but it still seems like it could be a serious one.
There are several ways to deal with this. All async runtimes I know
(Akka, Zio, Cats-effects) support this by letting you mark a task as
blocking. The runtime will then either schedule it to another
thread-pool, or it will grow the thread-pool to accommodate. In any
case
'the other 9 green threads' will simply be scheduled to another real
thread. In addition, some of these runtimes detect long running tasks
and will reschedule waiting tasks to another thread. This is all a bit
off topic though.
> I don't see why this has to be "inherently multi-threaded." Why
can't
we have the other threads report back what messages they've
processed to
the worker thread. Then it will be able to handle these callbacks
without involving the other threads.
Please consider the context which is that we are running inside the
callback of the rebalance listener. The only way to execute something
and also have a timeout on it is to run the something on another
thread.
Kind regards,
Erik.
Op 08-07-2023 om 19:17 schreef Colin McCabe:
On Sat, Jul 8, 2023, at 02:41, Erik van Oosten wrote:
Hi Colin,
Thanks for your thoughts and taking the time to reply.
Let me take away your concerns. None of your worries are an issue
with
the algorithm described in KIP-944. Here it goes:
> It's not clear ot me that it's safe to access the Kafka
consumer or
producer concurrently from different threads.
Concurrent access is /not/ a design goal of KIP-944. In fact, it goes
through great lengths to make sure that this cannot happen.
*The only design goal is to allow callbacks to call the consumer from
another thread.*
To make sure there are no more misunderstandings about this, I have
added this goal to the KIP.
Hi Erik,
Sorry, I spoke imprecisely. My concern is not concurrent access, but
multithreaded access in general. Basically cache line visibility
issues.
> This is true even if the accesses happen at different times,
because
modern CPUs require memory barriers to guarantee inter-thread
visibilty
of loads and stores.
In KIP-944, the callback thread can only delegate to another thread
after reading from and writing to a threadlocal variable,
providing the
barriers right there.
I don't see any documentation that accessing thread local variables
provides a total store or load barrier. Do you have such
documentation? It
seems like if this were the case, we could eliminate volatile variables
from most of the code base.
> I know that there are at least a few locks in the consumer
code now,
due to our need to send heartbeats from a worker thread. I don't
think
those would be sufficient to protect a client that is making
calls from
random threads.
In the current implementation the consumer is also invoked from
random
threads. If it works now, it should continue to work.
I'm not sure what you're referring to. Can you expand on this?
> There has been some discussion of moving to a more
traditional model
where people make calls to the client and the clients passes the
given
data to a single background worker thread. This would avoid a lot
lof
the footguns of the current model and probably better reflect how
people
actually use the client.
That is awesome. However, I'd rather not wait for that.
> Another issue is that neither the producer nor the consumer
is fully
nonblocking. There are some corner cases where we do in fact block.
From
memory, the producer blocks in some "buffer full" cases, and the
consumer blocks sometimes when fetching metadata.
I am aware of that. This is not an issue; all async runtimes can
cooperate with blocking code.
Hmm, not sure what you mean by "cooperate with blocking code." If you
have 10 green threads you're multiplexing on to one CPU thread, and
that
CPU thread gets blocked because of what one green thread is doing, the
other 9 green threads are blocked too, right? I guess it's "just" a
performance problem, but it still seems like it could be a serious one.
> I suspect it would be more appropriate for Kotlin coroutines,
Zio
coroutines and so on to adopt this "pass messages to and from a
background worker thread" model than to try to re-engineer the Kafka
client ot work from random threads.
In both zio-kafka and fs2-kafka this is already the approach we are
taking.
Unfortunately, the Kafka consumer forces us to perform some work in
callbacks:
* commit completed callback: register that the callback is
complete,
* partition revoked callback: in this callback we need to submit
commits from everything consumed and processed so far, using
timeouts if processing takes to long. In an async runtime,
this is
an inherently multi-threaded process. Especially, we cannot do
timeouts without involving multiple threads.
I don't see why this has to be "inherently multi-threaded." Why
can't we
have the other threads report back what messages they've processed
to the
worker thread. Then it will be able to handle these callbacks without
involving the other threads.
regards,
Colin
I have extended the KIP's motivation to explain the major use case.
Please read KIP-944 again. Even though the description is extensive
(this callback from callback stuff is tricky), you will find that my
goals are modest.
Also the implementation is just a few lines. With understanding of
the
idea it should not be a lot of work to follow it.
Kind regards,
Erik.
Op 07-07-2023 om 19:57 schreef Colin McCabe:
Hi Erik,
It's not clear ot me that it's safe to access the Kafka consumer or
producer concurrently from different threads. There are data structures
that aren't protected by locks, so I wouldn't necessarily expect
accessing
and mutating them in a concurrent way to work. This is true even if the
accesses happen at different times, because modern CPUs require memory
barriers to guarantee inter-thread visibilty of loads and stores.
I am writing this is without doing a detailed dive into the code (I
haven't been into the consumer / producer code in a bit.) Someone
who has
worked more on the consumer recently might be able to give specific
examples of things that wouldn't work.
I know that there are at least a few locks in the consumer code now,
due to our need to send heartbeats from a worker thread. I don't think
those would be sufficient to protect a client that is making calls from
random threads.
There has been some discussion of moving to a more traditional model
where people make calls to the client and the clients passes the
given data
to a single background worker thread. This would avoid a lot lof the
footguns of the current model and probably better reflect how people
actually use the client.
Another issue is that neither the producer nor the consumer is fully
nonblocking. There are some corner cases where we do in fact block.
From
memory, the producer blocks in some "buffer full" cases, and the
consumer
blocks sometimes when fetching metadata.
I suspect it would be more appropriate for Kotlin coroutines, Zio
coroutines and so on to adopt this "pass messages to and from a
background
worker thread" model than to try to re-engineer the Kafka client ot
work
from random threads.
There is actually somed good advice about how to handle multiple
threads in the KafkaConsumer.java header file itself. Check the
sections
"One Consumer Per Thread" and "Decouple Consumption and Processing."
What
I'm recommending here is essentially the latter.
I do understand that it's frustrating to not get a quick response.
However, overall I think this one needs a lot more discussion before
getting anywhere near a vote. I will leave a -1 just as a procedural
step.
Maybe some of the people working in the client area can also chime in.
best,
Colin
On Thu, Jul 6, 2023, at 12:02, Erik van Oosten wrote:
Dear PMCs,
So far there have been 0 responses to KIP-944. I understand this
may
not
be something that keeps you busy, but this KIP is important to
people
that use async runtimes like Zio, Cats and Kotlin.
Is there anything you need to come to a decision?
Kind regards,
Erik.
Op 05-07-2023 om 11:38 schreef Erik van Oosten:
Hello all,
I'd like to call a vote on KIP-944 Support async runtimes in
consumer.
It has has been 'under discussion' for 7 days now. 'Under
discussion'
between quotes, because there were 0 comments so far. I hope
the KIP
is clear!
KIP description:https://cwiki.apache.org/confluence/x/chw0Dw
Kind regards,
Erik.