frankvicky commented on code in PR #17549: URL: https://github.com/apache/kafka/pull/17549#discussion_r1820686572
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -944,23 +941,20 @@ public boolean sameRequest(final OffsetFetchRequestState request) { public NetworkClientDelegate.UnsentRequest toUnsentRequest() { OffsetFetchRequest.Builder builder; - if (memberInfo.memberId.isPresent() && memberInfo.memberEpoch.isPresent()) { - builder = new OffsetFetchRequest.Builder( - groupId, - memberInfo.memberId.get(), - memberInfo.memberEpoch.get(), - true, - new ArrayList<>(this.requestedPartitions), - throwOnFetchStableOffsetUnsupported); - } else { - // Building request without passing member ID/epoch to leave the logic to choose - // default values when not present on the request builder. - builder = new OffsetFetchRequest.Builder( - groupId, - true, - new ArrayList<>(this.requestedPartitions), - throwOnFetchStableOffsetUnsupported); - } + // Building request without passing member ID/epoch to leave the logic to choose + // default values when not present on the request builder. + builder = memberInfo.memberEpoch.map(epoch -> new OffsetFetchRequest.Builder( + groupId, + memberInfo.memberId, + epoch, + true, + new ArrayList<>(this.requestedPartitions), + throwOnFetchStableOffsetUnsupported)) + .orElseGet(() -> new OffsetFetchRequest.Builder( + groupId, + true, + new ArrayList<>(this.requestedPartitions), + throwOnFetchStableOffsetUnsupported)); Review Comment: I'm not sure about this issue is related to following code or not. [0] If so, I think yes,we could call the method if `memberEpoch` is present. As my previous comment[1],the `groupMetadata` should represent the broker acknowledgements since we're moving coordinator logic to broker side. If I don't misunderstand, I will update it in next commit. [0]: https://github.com/apache/kafka/blob/8c071b02e9908d9facf10c0a18e7e0f9d1b0825f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L636-L647 [1]: >Hi @lianetm, > >Here is my understanding. Please correct me if I’m wrong: >Since the group management has now moved to the server-side, the ConsumerGroupMetadata basically represents the data acknowledged by the broker. In this case, I think we should keep the ConsumerGroupMetadata.memberId empty at startup. This is because, at that moment, the broker still has no knowledge of the memberId. Once the consumer polls, the broker will then recognize the memberId and update >the assignment, allowing the consumer to update the ConsumerGroupMetadata via MemberStateListener. After >that, ConsumerGroupMetadata will have the memberId. > > Does this make sense ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org