[ https://issues.apache.org/jira/browse/KAFKA-7286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jason Gustafson resolved KAFKA-7286. ------------------------------------ Resolution: Fixed Fix Version/s: 2.1.0 > Loading offsets and group metadata hangs with large group metadata records > -------------------------------------------------------------------------- > > Key: KAFKA-7286 > URL: https://issues.apache.org/jira/browse/KAFKA-7286 > Project: Kafka > Issue Type: Bug > Reporter: Flavien Raynaud > Assignee: Flavien Raynaud > Priority: Minor > Fix For: 2.1.0 > > > When a (Kafka-based) consumer group contains many members, group metadata > records (in the {{__consumer-offsets}} topic) may happen to be quite large. > Increasing the {{message.max.bytes}} makes storing these records possible. > Loading them when a broker restart is done via > [doLoadGroupsAndOffsets|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L504]. > However, this method relies on the {{offsets.load.buffer.size}} > configuration to create a > [buffer|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L513] > that will contain the records being loaded. > If a group metadata record is too large for this buffer, the loading method > will get stuck trying to load records (in a tight loop) into a buffer that > cannot accommodate a single record. > ---- > For example, if the {{__consumer-offsets-9}} partition contains a record > smaller than {{message.max.bytes}} but larger than > {{offsets.load.buffer.size}}, logs would indicate the following: > {noformat} > ... > [2018-08-13 21:00:21,073] INFO [GroupMetadataManager brokerId=0] Scheduling > loading of offsets and group metadata from __consumer_offsets-9 > (kafka.coordinator.group.GroupMetadataManager) > ... > {noformat} > But logs will never contain the expected {{Finished loading offsets and group > metadata from ...}} line. > Consumers whose group are assigned to this partition will see {{Marking the > coordinator dead}} and will never be able to stabilize and make progress. > ---- > From what I could gather in the code, it seems that: > - > [fetchDataInfo|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L522] > returns at least one record (even if larger than > {{offsets.load.buffer.size}}, thanks to {{minOneMessage = true}}) > - No fully-readable record is stored in the buffer with > [fileRecords.readInto(buffer, > 0)|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L528] > (too large to fit in the buffer) > - > [memRecords.batches|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L532] > returns an empty iterator > - > [currOffset|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L590] > never advances, hence loading the partition hangs forever. > ---- > It would be great to let the partition load even if a record is larger than > the configured {{offsets.load.buffer.size}} limit. The fact that > {{minOneMessage = true}} when reading records seems to indicate it might be a > good idea for the buffer to accommodate at least one record. > If you think the limit should stay a hard limit, then at least adding a log > line indicating {{offsets.load.buffer.size}} is not large enough and should > be increased. Otherwise, one can only guess and dig through the code to > figure out what is happening :) > I will try to open a PR with the first idea (allowing large records to be > read when needed) soon, but any feedback from anyone who also had the same > issue in the past would be appreciated :) -- This message was sent by Atlassian JIRA (v7.6.3#76005)