lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1382667315


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -77,32 +93,88 @@ public class MembershipManagerImpl implements 
MembershipManager {
     /**
      * Assignment that the member received from the server and successfully 
processed.
      */
-    private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+    private Set<TopicPartition> currentAssignment;
 
     /**
      * Assignment that the member received from the server but hasn't 
completely processed
      * yet.
      */
     private Optional<ConsumerGroupHeartbeatResponseData.Assignment> 
targetAssignment;
 
+    /**
+     * Subscription state object holding the current assignment the member has 
for the topics it
+     * subscribed to.
+     */
+    private final SubscriptionState subscriptions;
+
+    /**
+     * Metadata that allows us to create the partitions needed for {@link 
ConsumerRebalanceListener}.
+     */
+    private final ConsumerMetadata metadata;
+
+    /**
+     * TopicPartition comparator based on topic name and partition id.
+     */
+    private final static TopicPartitionComparator COMPARATOR = new 
TopicPartitionComparator();
+
     /**
      * Logger.
      */
     private final Logger log;
 
-    public MembershipManagerImpl(String groupId, LogContext logContext) {
-        this(groupId, null, null, logContext);
+    /**
+     * Manager to perform commit requests needed before revoking partitions 
(if auto-commit is
+     * enabled)
+     */
+    private final CommitRequestManager commitRequestManager;
+
+    /**
+     * Manager to perform metadata requests. Used to get topic metadata when 
needed for resolving
+     * topic names for topic IDs received in a target assignment.
+     */
+    private final TopicMetadataRequestManager metadataRequestManager;
+
+    /**
+     * Epoch that a member (not static) must include a heartbeat request to 
indicate that it wants
+     * to leave the group. This is considered as a definitive leave.
+     */
+    public static final int LEAVE_GROUP_EPOCH = -1;
+
+    /**
+     * Epoch that a static member (member with group instance id) must include 
a heartbeat request
+     * to indicate that it wants to leave the group. This will be considered 
as a potentially
+     * temporary leave.
+     */
+    public static final int LEAVE_GROUP_EPOCH_FOR_STATIC_MEMBER = -2;

Review Comment:
   Good point. I reused the -1, and removed the static membership constant and 
logic from our side given that it is not supported yet. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to