chia7712 commented on code in PR #17549:
URL: https://github.com/apache/kafka/pull/17549#discussion_r1821126889


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -224,6 +227,12 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
         this.clientTelemetryReporter = clientTelemetryReporter;
         this.time = time;
         this.metricsManager = metricsManager;
+
+        // Update the group member ID label in the client telemetry reporter.
+        // According to KIP-1082, the consumer will generate the member ID as 
the incarnation ID of the process.
+        // Therefore, we can update the group member ID during initialization.
+        clientTelemetryReporter.ifPresent(reporter -> 
reporter.updateMetricsLabels(
+            Collections.singletonMap(ClientTelemetryProvider.GROUP_MEMBER_ID, 
memberId)));

Review Comment:
   `Map.of`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -224,6 +227,12 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
         this.clientTelemetryReporter = clientTelemetryReporter;
         this.time = time;
         this.metricsManager = metricsManager;
+
+        // Update the group member ID label in the client telemetry reporter.
+        // According to KIP-1082, the consumer will generate the member ID as 
the incarnation ID of the process.
+        // Therefore, we can update the group member ID during initialization.
+        clientTelemetryReporter.ifPresent(reporter -> 
reporter.updateMetricsLabels(

Review Comment:
   we don't need to pass `clientTelemetryReporter` to 
`AbstractMembershipManager` as we can call `updateMetricsLabels` after 
initializing `AbstractMembershipManager`, right? The member is fixed now



##########
clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json:
##########
@@ -39,7 +39,7 @@
     { "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "The top-level error message, or null if there was no error." },
     { "name": "MemberId", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
-      "about": "The member id generated by the coordinator. Only provided when 
the member joins with MemberEpoch == 0." },
+      "about": "The member id is generated by the consumer and provided by the 
consumer for all requests starting from version 1. In version 0, however, the 
member ID is generated by the group coordinator." },

Review Comment:
   `In version 0, however, the member ID is generated by the group coordinator.`
   
   This description is inaccurate; in version 0, users can also generate a 
member ID.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -80,10 +81,12 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
     protected final String groupId;
 
     /**
-     * Member ID assigned by the server to the member, received in a heartbeat 
response when
-     * joining the group specified in {@link #groupId}
+     * Member ID generated by the consumer at startup, which is unique within 
the group and remains consistent
+     * for the entire lifetime of the process. This ID acts as an incarnation 
identifier for the consumer process
+     * and does not reset or change, even if the consumer leaves and rejoins 
the group.
+     * The Member ID remains the same until the process is completely stopped 
or terminated.
      */
-    protected String memberId = "";
+    protected final String memberId = Uuid.randomUuid().toString();

Review Comment:
   `memberIdInfoForLog` is useless, correct?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -622,10 +631,10 @@ public void transitionToSendingLeaveGroup(boolean 
dueToExpiredPollTimer) {
 
     /**
      * Call all listeners that are registered to get notified when the member 
epoch is updated.
-     * This also includes the latest member ID in the notification. If the 
member fails or leaves
-     * the group, this will be invoked with empty epoch and member ID.
+     * This also includes the member ID in the notification. If the member 
fails or leaves
+     * the group, this will be invoked with empty epoch.
      */
-    void notifyEpochChange(Optional<Integer> epoch, Optional<String> memberId) 
{
+    void notifyEpochChange(Optional<Integer> epoch, String memberId) {

Review Comment:
   `memberId` is immutable now so we can remove `memberId` from 
`notifyEpochChange`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -1293,17 +1287,16 @@ public void setInflightCommitStatus(final boolean 
inflightCommitStatus) {
     }
 
     static class MemberInfo {
-        Optional<String> memberId;
+        String memberId = "";
         Optional<Integer> memberEpoch;
 
         MemberInfo() {

Review Comment:
   we can initialize the `memberEpoch` without default constructor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to