As I said: you first should try to understand the root cause.
A `DisconnectException` sounds like a network issue to me. So it might
not have anything to do with KS configurations, but seems to be an
environment question.
Of course, it could mean that heartbeats are not sent, and that a thread
drop out of the consumer group, triggering a rebalance. Would be good to
verify this with broker (ie, group coordinator) logs.
`TaskMigratedException` is a follow up error if you wish -- the thread
did already drop out of the consumer group (w/o noticing it). In
general, the root cause for this one is not calling poll() on time to
rejoin the group during an on-going rebalance.
- setting max.poll.interval.ms starting from 60000 to 300000
The default is 300,000 (ie, 5 minutes). Reducing it to 1 minute would
rather destabilize the system. We did see deployment with higher
settings, so maybe increasing it to 15 minutes could help. But larger
timeout have other (undesired) side effects, so it's not a general
recommendation to just bump it up arbitrarily. (This config is also use
as "rebalance timeout" and could block the whole group from making
progress during a rebalance...)
If you have issue to pin it down, a next good step oculd be just deploy
fewer threads (in sum across all instances). The more threads you have,
the more consumers are in the group, and larger groups are harder to
manage. If you can stabilize a smaller group, you can slowly scale it
out to see when it starts to hit issues, and try to figure out where the
issue comes from.
HTH.
-Matthias
On 1/21/25 12:19 AM, Martinus Elvin wrote:
Hello,
Thank you for the response!
Some info that I forgot to include is the machine where service
deployed. We have several server dedicated to deploy the services with
16 core with 32GB RAM, ad 32 core with 64GB RAM.
FYI, This particular service is deployed in 32 core, 64GB RAM machine.
We have doing alot of configuration tuning, especially for the consumer
configuration, following a lot of performance tuning article in the web.
Before I created this email, we have tried :
- setting session.timeout.ms starting from 30000 to 300000
- setting max.poll.interval.ms starting from 60000 to 300000
- setting max.poll record from 500, 250, 100, 50, 20, and 1
- request.timeout.ms starting from 30000 to 120000
We have done another testing with some of your advice applied:
- max.poll.records back to default (500)
- first test: 3 pods, wtih 32 consumer threads each (1:1 StreamThread)
- second test: 6 pods, with 16 consumer threads each (1:1 StreamThread)
But the problem still persist.
We are getting alot of
org.apache.kafka.common.errors.DisconnectException: null which I noticed
a request timeout have been hit even though it has been set for 2
minutes (from 30 seconds default)
Another error we experiencing is
org.apache.kafka.streams.errors.TaskMigratedException, indicating a
consumer failed to commit offset.
Note: we have alot of KStream service deployed (similar function,
different sources) so the resource kind of restricted.
Thank you!
Martinus Elvin
On 1/18/2025 3:24 AM, Matthias J. Sax wrote:
Hi,
In general, we recommend one StreamThread per core, so 48 threads
sounds excessive; I don't think that a single pod would get 48 cores?
So using more pods with fewer threads each, might be a first good step.
The only config that sticks out is
> - max.poll.records = 1
Not sure why you reduce it to 1. This should reduce the throughput a
single thread can achieve. Why does the default not work for you?
However, in the end, the first thing you need to figure out it, why
rebalancing starts. Are heartbeats getting lost? Do instances hit
`max.poll.interval.ms`? -- If threads get more busy with increasing
load, I could imagine that you run into thread contention,
destabilizing the system.
Next, you should inspect metrics to get a idea how the system
performs. What metrics increase/decrease while you increase
throughput, and at what point it starts to fail over.
Both together should give you a starting point to understand what the
issue could be, and what the appropriate change (more KS instance,
fewer threads, more memory, config change) should help.
-Matthias
On 1/9/25 7:30 PM, Martinus Elvin wrote:
Hello,
We have a kafka streams service that performs a left join (KStreams)
operation by their message key. The message size is 1 KB more or less.
The left topic has around two hundred thousand (200,000) messages per
second, while the right topic has around two thousand (2000) message
per second.
Each topic has 96 partitions with 3 hours retention time.
The join operation has 15 minutes time window.
We have a very concerning problem that the consumer keep getting
rebalancing after some time and the consumer lag can accumulate to
200 millions or more.
We have doing a lot of tuning and test on our service but the problem
still persist. The consumer start rebalancing when we have 100,000
tps or more.
Below are the latest configuration we use:
- auto.offset.reset = latest
- session.timeout.ms = 300000
- heartbeat.interval.ms = 75000
- fetch.max.wait.ms = 500
- fetch.min.bytes = 1048576
- fetch.max.bytes = 52428800
- max.partition.fetch.bytes = 1048576
- max.poll.interval.ms = 300000
- max.poll.records = 1
- request.timeout.ms = 120000
- default.api.timeout.ms = 60000
- cache.max.bytes.buffering = 104857600
- num.stream.threads = 48 (each pod)
Below are some info regarding kafka cluster we are using:
- 10 brokers (8 core 2.3GHz, 16GB RAM)
- kafka version = 3.8.0
- kafka-streams library = 3.1.2
The service running in a dockerized container with 2 pods (we have
tried up to 4 pods too).
Could someone provide inside or solutions so we can have a stable and
fast consumer to handle this kind of messages?
Thank you!
Martinus Elvin