frankvicky commented on code in PR #17549:
URL: https://github.com/apache/kafka/pull/17549#discussion_r1820686572


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -944,23 +941,20 @@ public boolean sameRequest(final OffsetFetchRequestState 
request) {
         public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
 
             OffsetFetchRequest.Builder builder;
-            if (memberInfo.memberId.isPresent() && 
memberInfo.memberEpoch.isPresent()) {
-                builder = new OffsetFetchRequest.Builder(
-                        groupId,
-                        memberInfo.memberId.get(),
-                        memberInfo.memberEpoch.get(),
-                        true,
-                        new ArrayList<>(this.requestedPartitions),
-                        throwOnFetchStableOffsetUnsupported);
-            } else {
-                // Building request without passing member ID/epoch to leave 
the logic to choose
-                // default values when not present on the request builder.
-                builder = new OffsetFetchRequest.Builder(
-                        groupId,
-                        true,
-                        new ArrayList<>(this.requestedPartitions),
-                        throwOnFetchStableOffsetUnsupported);
-            }
+            // Building request without passing member ID/epoch to leave the 
logic to choose
+            // default values when not present on the request builder.
+            builder = memberInfo.memberEpoch.map(epoch -> new 
OffsetFetchRequest.Builder(
+                            groupId,
+                            memberInfo.memberId,
+                            epoch,
+                            true,
+                            new ArrayList<>(this.requestedPartitions),
+                            throwOnFetchStableOffsetUnsupported))
+                    .orElseGet(() -> new OffsetFetchRequest.Builder(
+                            groupId,
+                            true,
+                            new ArrayList<>(this.requestedPartitions),
+                            throwOnFetchStableOffsetUnsupported));

Review Comment:
   I'm not sure about this issue is related to following code or not. [0]
   
   If so, I think yes,we could call the method if `memberEpoch` is present.
   As my previous comment[1],the `groupMetadata` should represent the broker 
acknowledgements since we're moving coordinator logic to broker side.
   
   If I don't misunderstand, I will update it in next commit.
   
   [0]:
   
https://github.com/apache/kafka/blob/8c071b02e9908d9facf10c0a18e7e0f9d1b0825f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L636-L647
   
   [1]:
   >Hi @lianetm,
   >
   >Here is my understanding. Please correct me if I’m wrong:
   >Since the group management has now moved to the server-side, the 
ConsumerGroupMetadata basically represents the data acknowledged by the broker. 
In this case, I think we should keep the ConsumerGroupMetadata.memberId empty 
at startup. This is because, at that moment, the broker still has no knowledge 
of the memberId. Once the consumer polls, the broker will then recognize the 
memberId and update >the assignment, allowing the consumer to update the 
ConsumerGroupMetadata via MemberStateListener. After >that, 
ConsumerGroupMetadata will have the memberId.
   >
   > Does this make sense ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to