lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394585362
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -77,32 +138,111 @@ public class MembershipManagerImpl implements MembershipManager { /** * Assignment that the member received from the server and successfully processed. */ - private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment; + private Set<TopicPartition> currentAssignment; /** - * Assignment that the member received from the server but hasn't completely processed - * yet. + * Subscription state object holding the current assignment the member has for the topics it + * subscribed to. */ - private Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment; + private final SubscriptionState subscriptions; + + /** + * Metadata that allows us to create the partitions needed for {@link ConsumerRebalanceListener}. + */ + private final ConsumerMetadata metadata; + + /** + * TopicPartition comparator based on topic name and partition id. + */ + private final static TopicPartitionComparator COMPARATOR = new TopicPartitionComparator(); /** * Logger. */ private final Logger log; - public MembershipManagerImpl(String groupId, LogContext logContext) { - this(groupId, null, null, logContext); + /** + * Manager to perform commit requests needed before revoking partitions (if auto-commit is + * enabled) + */ + private final CommitRequestManager commitRequestManager; + + /** + * Local cache of assigned topic IDs and names. Topics are added here when received in a + * target assignment, as we discover their names in the Metadata cache, and removed when the + * topic is not in the subscription anymore. The purpose of this cache is to avoid metadata + * requests in cases where a currently assigned topic is in the target assignment (new + * partition assigned, or revoked), but it is not present the Metadata cache at that moment. + * The cache is cleared when the subscription changes ({@link #transitionToJoining()}, the + * member fails ({@link #transitionToFatal()} or leaves the group ({@link #leaveGroup()}). + */ + private final Map<Uuid, String> assignedTopicNamesCache; + + /** + * Topic IDs received in a target assignment for which we haven't found topic names yet. + * Items are added to this set every time a target assignment is received. Items are removed + * when metadata is found for the topic. This is where the member collects all assignments + * received from the broker, even though they may not be ready to reconcile due to missing + * metadata. + */ + private final Map<Uuid, List<Integer>> assignmentUnresolved; + + /** + * Assignment received for which topic names have been resolved, so it's ready to be + * reconciled. Items are added to this set when received in a target assignment (if metadata + * available), or when a metadata update is received. This is where the member keeps all the + * assignment ready to reconcile, even though the reconciliation might need to wait if there + * is already another on in process. + */ + private final SortedSet<TopicPartition> assignmentReadyToReconcile; + + /** + * Epoch that a member must include a heartbeat request to indicate that it want to join or + * re-join a group. + */ + public static final int JOIN_GROUP_EPOCH = 0; Review Comment: Totally, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org