vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1354505609
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -750,7 +770,9 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
// Get or create the member.
if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
- final ConsumerGroupMember member =
group.getOrMaybeCreateMember(memberId, createIfNotExists);
+ final ConsumerGroupMember member = instanceId == null ?
+ group.getOrMaybeCreateMember(memberId, createIfNotExists) :
+ group.getOrMaybeCreateStaticMember(memberId, instanceId,
createIfNotExists);
throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
if (memberEpoch == 0) {
Review Comment:
> I just thought about something else. When a static member is replaced, we
need to write records to erase the state of the previous member.
YEs that was a miss. I have added relevant tombstone records for the
replaced static member and also cancelled it's timers.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -908,23 +936,40 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
* Handles leave request from a consumer group member.
* @param groupId The group id from the request.
* @param memberId The member id from the request.
+ * @param memberEpoch The member epoch from the request.
*
* @return A Result containing the ConsumerGroupHeartbeat response and
* a list of records to update the state machine.
*/
private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record>
consumerGroupLeave(
String groupId,
- String memberId
+ String instanceId,
+ String memberId,
+ int memberEpoch
) throws ApiException {
ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
- ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId,
false);
-
- log.info("[GroupId " + groupId + "] Member " + memberId + " left the
consumer group.");
+ ConsumerGroupMember member = memberEpoch == -2 ?
+ group.getOrMaybeCreateStaticMember(memberId, instanceId,
false) :
+ group.getOrMaybeCreateMember(memberId, false);
- List<Record> records = consumerGroupFenceMember(group, member);
+ List<Record> records = new ArrayList<>();
+ // The departing member is a static one. We don't need to fence this
member because it is
+ // expected to come back within session timeout
+ if (memberEpoch == -2) {
Review Comment:
Added.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]