[ https://issues.apache.org/jira/browse/KAFKA-19222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17948969#comment-17948969 ]
Lianet Magrans commented on KAFKA-19222: ---------------------------------------- Hey [~twmb] ! When implementing the protocol on the java client we did take care of sending a full HB upon failures (including al fields), it's the only way to ensure that the coordinator really gets all the info needed, is that bit maybe what's missing on you client implementation? It maybe helps to look at the 3 usages of the "resetHeartbeatState" on the java-client [https://github.com/apache/kafka/blob/e68781414e9bcbc1d7cd5c247433a13f8d0e2e6e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java#L483] With that, if the response is lost, the expectation is that the client gets some kind of timeout/network failure and then resends a full HB. So the coord will accept the request to join with the older epoch as long as the assignment isSubset. Not sure if I'm missing something about your specific flow but let me know, hope it helps! Lianet > Invalid FENCED_MEMBER_EPOCH error to ConsumerGroupHeartbeat > ----------------------------------------------------------- > > Key: KAFKA-19222 > URL: https://issues.apache.org/jira/browse/KAFKA-19222 > Project: Kafka > Issue Type: Bug > Components: clients, consumer > Affects Versions: 4.0.0, 4.0.1 > Reporter: Travis Bischel > Priority: Major > > {{throwIfConsumerGroupMemberEpochIsInvalid}} is meant to check the epoch in > heartbeat requests. If a ConsumerGroupHeartbeat response is lost (connection > dies before the client reads the response), the client may issue a > ConsumerGroupHeartbeat with the old epoch. > {{throwIfConsumerGroupMemberEpochIsInvalid}} means to allow this with this > conditional: > {code:java} > } else if (receivedMemberEpoch < member.memberEpoch()) { > // If the member comes with the previous epoch and has a subset > of the current assignment partitions, > // we accept it because the response with the bumped epoch may > have been lost. > if (receivedMemberEpoch != member.previousMemberEpoch() || > !isSubset(ownedTopicPartitions, member.assignedPartitions())) { > throw new FencedMemberEpochException("The consumer group > member has a smaller member " > + "epoch (" + receivedMemberEpoch + ") than the one known > by the group coordinator (" > + member.memberEpoch() + "). The member must abandon all > its partitions and rejoin."); > } > } > {code} > However, {{isSubset}} immediately returns false if {{ownedPartitions}} is > null. Clients are not meant to send the {{ownedPartitions}} field > (ConsumerHeartbeatRequest.TopicPartitions) _unless_ the {{ownedPartitions}} > has changed. > As a concrete example, here are logs from a broker for a client. In this flow, > * ConsumerGroupHeartbeat is being issued within the client > * Concurrently, I am trying to leave the group -- this canceled the in-flight > ConsumerGroupHeartbeat from the client side > * I issue OffsetCommit (before the group is left) > * The OffsetCommit fails with STALE_MEMBER_EPOCH > * I issue a ConsumerGroupHeartbeat with _only_ the Group, MemberID, and > MemberEpoch fields set -- i.e. _nothing else is changing just give me the > latest epoch_ > * Broker side, the broker finally processes my initial ConsumerGroupHeartbeat > that was initially sent (and already canceled client side) -- in the logs you > can see the epoch is bumped to 7 and the heartbeat is successful > * Broker side, the broker processes a _duplicate ConsumerGroupHeartbeat_ and > replies FENCED_MEMBER_EPOCH > * Client side, I bail on committing and finally leave the group. I send a > ConsumerGroupHeartbeat with memberEpoch -1. > * Broker side, the final heartbeat is processed successfully. > Broker side logs: > {code} > 2025-04-30 17:22:26,024 > [data-plane-kafka-network-thread-2-ListenerName(PLAINTEXT)-PLAINTEXT-2] DEBUG > kafka.request.logger - Completed > request:{"isForwarded":false,"requestHeader":{"requestApiKey":8,"requestApiVersion":9,"correlationId":0,"clientId":"kgo","requestApiKeyName":"OFFSET_COMMIT"},"request":{"groupId":"c24d1994653ddbaa5ff68d14c54c8733a7f02f732f4da4b5811f80ce0b247c2d","generationIdOrMemberEpoch":6,"memberId":"6j403VCp6yHIJrSlfWTtYg==","groupInstanceId":null,"topics":[{"name":"c5fd69bff34c4bde0f955fcf6de5d1cc2487067028e2383b6106804d3a9949b5","partitions":[{"partitionIndex":4,"committedOffset":45435,"committedLeaderEpoch":0,"committedMetadata":"6j403VCp6yHIJrSlfWTtYg=="},{"partitionIndex":3,"committedOffset":45371,"committedLeaderEpoch":0,"committedMetadata":"6j403VCp6yHIJrSlfWTtYg=="}]}]},"response":{"throttleTimeMs":0,"topics":[{"name":"c5fd69bff34c4bde0f955fcf6de5d1cc2487067028e2383b6106804d3a9949b5","partitions":[{"partitionIndex":4,"errorCode":113},{"partitionIndex":3,"errorCode":113}]}]},"connection":"127.0.0.1:9094-127.0.0.1:56328-2-197","totalTimeMs":175.381,"requestQueueTimeMs":0.027,"localTimeMs":0.037,"remoteTimeMs":175.075,"throttleTimeMs":0,"responseQueueTimeMs":0.118,"sendTimeMs":0.121,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}} > 2025-04-30 17:22:26,121 > [data-plane-kafka-network-thread-2-ListenerName(PLAINTEXT)-PLAINTEXT-2] DEBUG > kafka.request.logger - Completed > request:{"isForwarded":false,"requestHeader":{"requestApiKey":68,"requestApiVersion":1,"correlationId":14,"clientId":"kgo","requestApiKeyName":"CONSUMER_GROUP_HEARTBEAT"},"request":{"groupId":"c24d1994653ddbaa5ff68d14c54c8733a7f02f732f4da4b5811f80ce0b247c2d","memberId":"6j403VCp6yHIJrSlfWTtYg==","memberEpoch":6,"instanceId":null,"rackId":null,"rebalanceTimeoutMs":-1,"subscribedTopicNames":null,"subscribedTopicRegex":null,"serverAssignor":null,"topicPartitions":null},"response":{"throttleTimeMs":0,"errorCode":0,"errorMessage":null,"memberId":"6j403VCp6yHIJrSlfWTtYg==","memberEpoch":7,"heartbeatIntervalMs":5000,"assignment":null},"connection":"127.0.0.1:9094-127.0.0.1:48238-2-153","totalTimeMs":273.886,"requestQueueTimeMs":0.021,"localTimeMs":0.025,"remoteTimeMs":273.594,"throttleTimeMs":0,"responseQueueTimeMs":0.062,"sendTimeMs":0.183,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":{"softwareName":"kgo","softwareVersion":"unknown"}} > 2025-04-30 17:22:26,135 > [data-plane-kafka-network-thread-2-ListenerName(PLAINTEXT)-PLAINTEXT-2] DEBUG > kafka.request.logger - Completed > request:{"isForwarded":false,"requestHeader":{"requestApiKey":68,"requestApiVersion":1,"correlationId":1,"clientId":"kgo","requestApiKeyName":"CONSUMER_GROUP_HEARTBEAT"},"request":{"groupId":"c24d1994653ddbaa5ff68d14c54c8733a7f02f732f4da4b5811f80ce0b247c2d","memberId":"6j403VCp6yHIJrSlfWTtYg==","memberEpoch":6,"instanceId":null,"rackId":null,"rebalanceTimeoutMs":-1,"subscribedTopicNames":null,"subscribedTopicRegex":null,"serverAssignor":null,"topicPartitions":null},"response":{"throttleTimeMs":0,"errorCode":110,"errorMessage":"The > consumer group member has a smaller member epoch (6) than the one known by > the group coordinator (7). The member must abandon all its partitions and > rejoin.","memberId":null,"memberEpoch":0,"heartbeatIntervalMs":0,"assignment":null},"connection":"127.0.0.1:9094-127.0.0.1:56328-2-197","totalTimeMs":3.304,"requestQueueTimeMs":0.095,"localTimeMs":0.091,"remoteTimeMs":2.67,"throttleTimeMs":0,"responseQueueTimeMs":0.306,"sendTimeMs":0.14,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}} > 2025-04-30 17:22:26,153 > [data-plane-kafka-network-thread-2-ListenerName(PLAINTEXT)-PLAINTEXT-2] DEBUG > kafka.request.logger - Completed > request:{"isForwarded":false,"requestHeader":{"requestApiKey":68,"requestApiVersion":1,"correlationId":2,"clientId":"kgo","requestApiKeyName":"CONSUMER_GROUP_HEARTBEAT"},"request":{"groupId":"c24d1994653ddbaa5ff68d14c54c8733a7f02f732f4da4b5811f80ce0b247c2d","memberId":"6j403VCp6yHIJrSlfWTtYg==","memberEpoch":-1,"instanceId":null,"rackId":null,"rebalanceTimeoutMs":-1,"subscribedTopicNames":null,"subscribedTopicRegex":null,"serverAssignor":null,"topicPartitions":null},"response":{"throttleTimeMs":0,"errorCode":0,"errorMessage":null,"memberId":"6j403VCp6yHIJrSlfWTtYg==","memberEpoch":-1,"heartbeatIntervalMs":0,"assignment":null},"connection":"127.0.0.1:9094-127.0.0.1:56328-2-197","totalTimeMs":6.528,"requestQueueTimeMs":0.088,"localTimeMs":0.116,"remoteTimeMs":6.166,"throttleTimeMs":0,"responseQueueTimeMs":0.053,"sendTimeMs":0.102,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}} > {code} > This can be very easily triggered by always issuing ConsumerGroupHeartbeat > twice, ignoring the first response, in any test that has rebalancing. -- This message was sent by Atlassian Jira (v8.20.10#820010)