Hello,

We recently started observing our streams application is taking hours to
days to rebalance following events where streams consumers experience
churn, such as code deployments. We increased the acceptable.recovery.lag
property from the default of 10,000 to 100,000, however, we are still
experiencing long periods of imbalance. We took a look at the "Decided on
assignment" log entry which dumps the task lags per client, and we can see
that for some tasks, both the standby and warmup never catch up to the
recovery lag for a long period.

Sample of the logs:
time         taskId       list(clientId, lag)
08:00:13.820 0_10 [('8cbfacca-1473-4a9f-b2ab-42dc598cb80b', '-2'),
('a209ad1e-2f80-4ab3-8dca-7d8b9bee8c70', '135951'),
('b62ec539-097b-41e4-8c8d-7e5b2f4b82d4', '135927')]
...
08:09:47.110 0_10 [('8cbfacca-1473-4a9f-b2ab-42dc598cb80b', '-2'),
('a209ad1e-2f80-4ab3-8dca-7d8b9bee8c70', '135170'),
('b62ec539-097b-41e4-8c8d-7e5b2f4b82d4', '135170')]
...
08:20:57.059 0_10 [('8cbfacca-1473-4a9f-b2ab-42dc598cb80b', '-2'),
('a209ad1e-2f80-4ab3-8dca-7d8b9bee8c70', '133187'),
('b62ec539-097b-41e4-8c8d-7e5b2f4b82d4', '133187')]
...
08:30:35.297 0_10 [('8cbfacca-1473-4a9f-b2ab-42dc598cb80b', '-2'),
('a209ad1e-2f80-4ab3-8dca-7d8b9bee8c70', '133811'),
('b62ec539-097b-41e4-8c8d-7e5b2f4b82d4', '133811')]

Interestingly, once we reduced our consumer application fleet from 56
instances with 32 threads each, to 7 instances with 32 threads each and
more cpu cores, the standbys were now able to keep up. We only have 250
partitions to process in this consumer application as well, and we
configure 1 standby, so 500 total tasks.

Logs from when this task recovered as the fleet size reduced:

time         taskId       list(clientId, lag)
23:55:51.951 0_10 [('a209ad1e-2f80-4ab3-8dca-7d8b9bee8c70', '-2'),
('bdc5a844-2f1c-4b10-9680-50c63259ee9f', '135674'),
('c48f04ff-9c31-4455-9665-fa3092675037', '135674')]
23:56:44.196 0_10 [('e0eb7a61-d44e-484c-8b9c-9a1444df0c93', '78068801869'),
('1b1a99ae-922b-4913-a3da-8058ede03920', '78068801869')]
23:57:45.169 0_10 [('e0eb7a61-d44e-484c-8b9c-9a1444df0c93', '257'),
('1b1a99ae-922b-4913-a3da-8058ede03920', '-2')]
23:57:47.314 0_10 [('e0eb7a61-d44e-484c-8b9c-9a1444df0c93', '2044'),
('1b1a99ae-922b-4913-a3da-8058ede03920', '-2')]
<no more rebalancing>

   - Is there a hard limit we are reaching in terms of the number of
   consumers that can be part of a group, or with the relation of tasks
   available to consume?
      - Or, are there efficiency's in having densely packing consumers on
      instances or threads in terms of fetching updates from multiple
partitions?
   - Is there a bottleneck hitting in terms of how fast we produce requests
   versus how fast they can be consumed in standbys?
      - Looks like the StoreChangelogReader
      
<https://github.com/apache/kafka/blob/3.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L580-L604>
      only updates offsets when it has buffered records every
      commit.interval.ms. Could this be an issue if we generate more than
      acceptable.recovery.lag offsets during that interval?
   - I did not observe our brokers being overwhelmed, and there was no
   impact to other streams applications -- are there specific metrics from the
   brokers we should look at to see if this is a broker side bottleneck?

Thank you, would appreciate any help in understanding this issue more

Reply via email to