Flavien Raynaud created KAFKA-7286:
--------------------------------------

             Summary: Loading offsets and group metadata hangs with large group 
metadata records
                 Key: KAFKA-7286
                 URL: https://issues.apache.org/jira/browse/KAFKA-7286
             Project: Kafka
          Issue Type: Bug
            Reporter: Flavien Raynaud


When a (Kafka-based) consumer group contains many members, group metadata 
records (in the {{__consumer-offsets}} topic) may happen to be quite large.

Increasing the {{message.max.bytes}} makes storing these records possible.
 Loading them when a broker restart is done via 
[doLoadGroupsAndOffsets|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L504].
 However, this method relies on the {{offsets.load.buffer.size}} configuration 
to create a 
[buffer|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L513]
 that will contain the records being loaded.

If a group metadata record is too large for this buffer, the loading method 
will get stuck trying to load records (in a tight loop) into a buffer that 
cannot accommodate a single record.
----
For example, if the {{__consumer-offsets-9}} partition contains a record 
smaller than {{message.max.bytes}} but larger than 
{{offsets.load.buffer.size}}, logs would indicate the following:
{noformat}
...
[2018-08-13 21:00:21,073] INFO [GroupMetadataManager brokerId=0] Scheduling 
loading of offsets and group metadata from __consumer_offsets-9 
(kafka.coordinator.group.GroupMetadataManager)
...
{noformat}
But logs will never contain the expected {{Finished loading offsets and group 
metadata from ...}} line.

Consumers whose group are assigned to this partition will see {{Marking the 
coordinator dead}} and will never be able to stabilize and make progress.
----
>From what I could gather in the code, it seems that:
 - 
[fetchDataInfo|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L522]
 returns at least one record (even if larger than {{offsets.load.buffer.size}}, 
thanks to {{minOneMessage = true}})
 - No fully-readable record is stored in the buffer with 
[fileRecords.readInto(buffer, 
0)|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L528]
 (too large to fit in the buffer)
 - 
[memRecords.batches|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L532]
 returns an empty iterator
 - 
[currOffset|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L590]
 never advances, hence loading the partition hangs forever.

----
It would be great to let the partition load even if a record is larger than the 
configured {{offsets.load.buffer.size}} limit. The fact that {{minOneMessage = 
true}} when reading records seems to indicate it might be a good idea for the 
buffer to accommodate at least one record.

If you think the limit should stay a hard limit, then at least adding a log 
line indicating {{offsets.load.buffer.size}} is not large enough and should be 
increased. Otherwise, one can only guess and dig through the code to figure out 
what is happening :)

I will try to open a PR with the first idea (allowing large records to be read 
when needed) soon, but any feedback from anyone who also had the same issue in 
the past would be appreciated :)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to