chia7712 commented on code in PR #17549: URL: https://github.com/apache/kafka/pull/17549#discussion_r1821126889
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java: ########## @@ -224,6 +227,12 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl this.clientTelemetryReporter = clientTelemetryReporter; this.time = time; this.metricsManager = metricsManager; + + // Update the group member ID label in the client telemetry reporter. + // According to KIP-1082, the consumer will generate the member ID as the incarnation ID of the process. + // Therefore, we can update the group member ID during initialization. + clientTelemetryReporter.ifPresent(reporter -> reporter.updateMetricsLabels( + Collections.singletonMap(ClientTelemetryProvider.GROUP_MEMBER_ID, memberId))); Review Comment: `Map.of` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java: ########## @@ -224,6 +227,12 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl this.clientTelemetryReporter = clientTelemetryReporter; this.time = time; this.metricsManager = metricsManager; + + // Update the group member ID label in the client telemetry reporter. + // According to KIP-1082, the consumer will generate the member ID as the incarnation ID of the process. + // Therefore, we can update the group member ID during initialization. + clientTelemetryReporter.ifPresent(reporter -> reporter.updateMetricsLabels( Review Comment: we don't need to pass `clientTelemetryReporter` to `AbstractMembershipManager` as we can call `updateMetricsLabels` after initializing `AbstractMembershipManager`, right? The member is fixed now ########## clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json: ########## @@ -39,7 +39,7 @@ { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The top-level error message, or null if there was no error." }, { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", - "about": "The member id generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." }, + "about": "The member id is generated by the consumer and provided by the consumer for all requests starting from version 1. In version 0, however, the member ID is generated by the group coordinator." }, Review Comment: `In version 0, however, the member ID is generated by the group coordinator.` This description is inaccurate; in version 0, users can also generate a member ID. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java: ########## @@ -80,10 +81,12 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl protected final String groupId; /** - * Member ID assigned by the server to the member, received in a heartbeat response when - * joining the group specified in {@link #groupId} + * Member ID generated by the consumer at startup, which is unique within the group and remains consistent + * for the entire lifetime of the process. This ID acts as an incarnation identifier for the consumer process + * and does not reset or change, even if the consumer leaves and rejoins the group. + * The Member ID remains the same until the process is completely stopped or terminated. */ - protected String memberId = ""; + protected final String memberId = Uuid.randomUuid().toString(); Review Comment: `memberIdInfoForLog` is useless, correct? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java: ########## @@ -622,10 +631,10 @@ public void transitionToSendingLeaveGroup(boolean dueToExpiredPollTimer) { /** * Call all listeners that are registered to get notified when the member epoch is updated. - * This also includes the latest member ID in the notification. If the member fails or leaves - * the group, this will be invoked with empty epoch and member ID. + * This also includes the member ID in the notification. If the member fails or leaves + * the group, this will be invoked with empty epoch. */ - void notifyEpochChange(Optional<Integer> epoch, Optional<String> memberId) { + void notifyEpochChange(Optional<Integer> epoch, String memberId) { Review Comment: `memberId` is immutable now so we can remove `memberId` from `notifyEpochChange` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -1293,17 +1287,16 @@ public void setInflightCommitStatus(final boolean inflightCommitStatus) { } static class MemberInfo { - Optional<String> memberId; + String memberId = ""; Optional<Integer> memberEpoch; MemberInfo() { Review Comment: we can initialize the `memberEpoch` without default constructor -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org