Howdy!

I found that the method of resizing fetching thread had one potential bug
which lead to imbalanced partitions distribution for each thread after
changing thread number on dynamic configuration.

To figure it out, I added a primitive unit test method "testResize" in
"AbstractFetcherThreadTest" to simulate the replica fetchers resizing
process from 10 threads to 60 threads with originally 10 topics and 100
partitions each. I designed the test to show that after the resizing
process, all partitions should be redistributed correctly based on the new
thread number. However, the test failed because when I tried to compare the
"fetcherThreadMap" with the fetcherId for each topic-partition, the
fetcherId mismatched! The unit test I added is in this commit (
https://github.com/yufeiyan1220/kafka/commit/eb99b7499b416cdeb44c2ccd3ea55a1e38ea3d60),
and the standard output of the unit test showed in attachment.

I doubt that maybe it is because the method "addFetcherForPartitions" which
maybe adds some new fetchers to "fetcherThreadMap" called in the block of
iterating the "fetcherThreadMap", and the iterator ignore some fetchers,
which leads to some of the fetchers remain their topic partitions. And it
leads to the imbalanced partition distribution pattern in resizing.

To solve this issue I make a mutable map to store all partitions and its
fetch offset, and then add it back once out of the iteration. I make
another commit (
https://github.com/yufeiyan1220/kafka/commit/0a793dfca2ab9b8e8b45ba5693359960f3e306eb),
and the new resize method passed the unit test.

I'm not sure whether it is an issue that Kafka Community need to fix. But
for me, it affects the fetching efficiency when I try to deploy Kafka cross
regions with high network latency.

I'd really appreciate to hear from you!

Reply via email to