jeffkbkim commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1379393118
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##########
@@ -537,22 +533,34 @@ public Map<Uuid, Set<Integer>>
partitionsPendingAssignment() {
public String currentAssignmentSummary() {
return "CurrentAssignment(memberEpoch=" + memberEpoch +
", previousMemberEpoch=" + previousMemberEpoch +
- ", targetMemberEpoch=" + targetMemberEpoch +
", state=" + state +
", assignedPartitions=" + assignedPartitions +
- ", partitionsPendingRevocation=" + partitionsPendingRevocation +
- ", partitionsPendingAssignment=" + partitionsPendingAssignment +
+ ", revokedPartitions=" + revokedPartitions +
')';
}
+ /**
+ * @return True if the assignment of this member is equals to the
assignment
Review Comment:
nit: is equal to
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##########
@@ -511,24 +507,24 @@ public MemberState state() {
}
/**
- * @return The set of assigned partitions.
+ * @return True of the member is in the Stable state and at the desired
epoch.
Review Comment:
nit: True if
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##########
@@ -34,48 +33,68 @@
* consumer group protocol. Given the current state of a member and a desired
or target
* assignment state, the state machine takes the necessary steps to converge
them.
*
- * The member state has the following properties:
- * - Current Epoch:
- * The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ * The member is fully reconciled to the desired target assignment.
*
- * - Next Epoch:
- * The desired epoch of the member. It corresponds to the epoch of the
target/desired assignment.
- * The member transitions to this epoch when it has revoked the partitions
that it does not own
- * or if it does not have to revoke any.
+ * Valid Transitions:
+ * - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
*
- * - Previous Epoch:
- * The epoch of the member when the state was last updated.
+ * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
*
- * - Assigned Partitions:
- * The set of partitions currently assigned to the member. This represents
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
*
- * - Partitions Pending Revocation:
- * The set of partitions that the member should revoke before it can
transition to the next state.
+ * - STABLE -> UNRELEASED_PARTITIONS
+ * When a new target assignment is installed and all the newly assigned
partitions
+ * are not available yet, he transitions to the UNRELEASED_PARTITIONS state
+ * and waits until at least one of them is available.
*
- * - Partitions Pending Assignment:
- * The set of partitions that the member will eventually receive. The
partitions in this set are
- * still owned by other members in the group.
+ * - UNACKNOWLEDGED_ASSIGNMENT:
+ * The member has received a new assignment from the group coordinator but
+ * he has not acknowledged it yet. The member is removed from the group if
+ * he does not acknowledge it within the rebalance timeout.
*
- * The state machine has three states:
- * - REVOKING:
- * This state means that the member must revoke partitions before it can
transition to the next epoch
- * and thus start receiving new partitions. This is to guarantee that
offsets of revoked partitions
- * are committed with the current epoch. The member transitions to the next
state only when it has
- * acknowledged the revocation.
+ * Valid Transitions:
+ * - UNACKNOWLEDGED_ASSIGNMENT -> STABLE
+ * When the assignment is acknowledged, the member transitions to the
STABLE
+ * state if it is fully reconciled.
*
- * - ASSIGNING:
- * This state means that the member waits on partitions which are still
owned by other members in the
- * group. It remains in this state until they are all freed up.
+ * - UNACKNOWLEDGED_ASSIGNMENT -> UNACKNOWLEDGED_ASSIGNMENT
+ * When the assignment is acknowledged, the member remains in the
+ * UNACKNOWLEDGED_ASSIGNMENT state if a new assignment is computed.
*
- * - STABLE:
- * This state means that the member has received all its assigned partitions.
+ * If the next assignment contains partitions to be revoked, the member
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
+ *
+ * - UNACKNOWLEDGED_ASSIGNMENT -> UNRELEASED_PARTITIONS
+ * When the assignment is acknowledged, the member transitions to the
+ * UNRELEASED_PARTITIONS if newly assigned partitions are not available
yet.
+ *
+ * - UNRELEASED_PARTITIONS:
+ * The member's reconciliation cannot progress because newly assigned
partitions
+ * are still owned by other members in the group. They are not released yet.
*
- * The reconciliation process is started or re-started whenever a new target
assignment is installed;
- * the epoch of the new target assignment is different from the next epoch of
the member. In this transient
- * state, the assigned partitions, the partitions pending revocation and the
partitions pending assignment
- * are updated. If the partitions pending revocation is not empty, the state
machine transitions to
- * REVOKING; if partitions pending assignment is not empty, it transitions to
ASSIGNING; otherwise it
- * transitions to STABLE.
+ * Valid Transitions:
+ * - UNRELEASED_PARTITIONS -> STABLE
+ * The member may transition to the STABLE state if the partitions that he
+ * was awaiting on are no longer in the desired target assignment.
+ *
+ * - UNRELEASED_PARTITIONS -> UNACKNOWLEDGED_ASSIGNMENT
+ * When at least one partition become available, a new assignment is
computed
+ * for the member and he transitions to the UNACKNOWLEDGED_ASSIGNMENT
state.
Review Comment:
are there any implications to computing a new assignment for every
acknowledged partitions?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##########
@@ -34,48 +33,68 @@
* consumer group protocol. Given the current state of a member and a desired
or target
* assignment state, the state machine takes the necessary steps to converge
them.
*
- * The member state has the following properties:
- * - Current Epoch:
- * The current epoch of the member.
+ * State Machine:
Review Comment:
this section is extremely helpful, thanks!
##########
group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentValue.json:
##########
@@ -24,20 +24,12 @@
"about": "The current member epoch that is expected from the member in
the heartbeat request." },
{ "name": "PreviousMemberEpoch", "versions": "0+", "type": "int32",
"about": "If the last epoch bump is lost before reaching the member, the
member will retry with the previous epoch." },
- { "name": "TargetMemberEpoch", "versions": "0+", "type": "int32",
- "about": "The target epoch corresponding to the assignment used to
compute the AssignedPartitions, the PartitionsPendingRevocation and the
PartitionsPendingAssignment fields." },
+ { "name": "State", "versions": "0+", "type": "int8",
+ "about": "The member state. See ConsumerGroupMember.MemberState for the
possible values." },
{ "name": "AssignedPartitions", "versions": "0+", "type":
"[]TopicPartitions",
"about": "The partitions assigned to (or owned by) this member." },
- { "name": "PartitionsPendingRevocation", "versions": "0+", "type":
"[]TopicPartitions",
- "about": "The partitions that must be revoked by this member." },
- { "name": "PartitionsPendingAssignment", "versions": "0+", "type":
"[]TopicPartitions",
- "about": "The partitions that will be assigned to this member when they
are freed up by their current owners." },
- { "name": "Error", "versions": "0+", "type": "int8",
- "about": "The error reported by the assignor." },
- { "name": "MetadataVersion", "versions": "0+", "type": "int16",
- "about": "The version of the metadata bytes." },
- { "name": "MetadataBytes", "versions": "0+", "type": "bytes",
- "about": "The metadata bytes." }
+ { "name": "RevokedPartitions", "versions": "0+", "type":
"[]TopicPartitions",
+ "about": "The partitions that must be revoked by this member." }
Review Comment:
1. AssignedPartitions: does this include pending partitions that the member
has not acknowledged yet? and the consumer is responsible for converging to
this final state?
2. RevokedPartitions: this works the same as PartitionsPendingRevocation, as
in we just renamed this, right?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##########
@@ -34,48 +33,68 @@
* consumer group protocol. Given the current state of a member and a desired
or target
* assignment state, the state machine takes the necessary steps to converge
them.
*
- * The member state has the following properties:
- * - Current Epoch:
- * The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ * The member is fully reconciled to the desired target assignment.
*
- * - Next Epoch:
- * The desired epoch of the member. It corresponds to the epoch of the
target/desired assignment.
- * The member transitions to this epoch when it has revoked the partitions
that it does not own
- * or if it does not have to revoke any.
+ * Valid Transitions:
+ * - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
*
- * - Previous Epoch:
- * The epoch of the member when the state was last updated.
+ * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
*
- * - Assigned Partitions:
- * The set of partitions currently assigned to the member. This represents
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
*
- * - Partitions Pending Revocation:
- * The set of partitions that the member should revoke before it can
transition to the next state.
+ * - STABLE -> UNRELEASED_PARTITIONS
+ * When a new target assignment is installed and all the newly assigned
partitions
+ * are not available yet, he transitions to the UNRELEASED_PARTITIONS state
+ * and waits until at least one of them is available.
*
- * - Partitions Pending Assignment:
- * The set of partitions that the member will eventually receive. The
partitions in this set are
- * still owned by other members in the group.
+ * - UNACKNOWLEDGED_ASSIGNMENT:
+ * The member has received a new assignment from the group coordinator but
+ * he has not acknowledged it yet. The member is removed from the group if
+ * he does not acknowledge it within the rebalance timeout.
*
- * The state machine has three states:
- * - REVOKING:
- * This state means that the member must revoke partitions before it can
transition to the next epoch
- * and thus start receiving new partitions. This is to guarantee that
offsets of revoked partitions
- * are committed with the current epoch. The member transitions to the next
state only when it has
- * acknowledged the revocation.
+ * Valid Transitions:
+ * - UNACKNOWLEDGED_ASSIGNMENT -> STABLE
+ * When the assignment is acknowledged, the member transitions to the
STABLE
+ * state if it is fully reconciled.
*
- * - ASSIGNING:
- * This state means that the member waits on partitions which are still
owned by other members in the
- * group. It remains in this state until they are all freed up.
+ * - UNACKNOWLEDGED_ASSIGNMENT -> UNACKNOWLEDGED_ASSIGNMENT
+ * When the assignment is acknowledged, the member remains in the
+ * UNACKNOWLEDGED_ASSIGNMENT state if a new assignment is computed.
*
- * - STABLE:
- * This state means that the member has received all its assigned partitions.
+ * If the next assignment contains partitions to be revoked, the member
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
+ *
+ * - UNACKNOWLEDGED_ASSIGNMENT -> UNRELEASED_PARTITIONS
+ * When the assignment is acknowledged, the member transitions to the
+ * UNRELEASED_PARTITIONS if newly assigned partitions are not available
yet.
+ *
+ * - UNRELEASED_PARTITIONS:
+ * The member's reconciliation cannot progress because newly assigned
partitions
+ * are still owned by other members in the group. They are not released yet.
*
- * The reconciliation process is started or re-started whenever a new target
assignment is installed;
- * the epoch of the new target assignment is different from the next epoch of
the member. In this transient
- * state, the assigned partitions, the partitions pending revocation and the
partitions pending assignment
- * are updated. If the partitions pending revocation is not empty, the state
machine transitions to
- * REVOKING; if partitions pending assignment is not empty, it transitions to
ASSIGNING; otherwise it
- * transitions to STABLE.
+ * Valid Transitions:
+ * - UNRELEASED_PARTITIONS -> STABLE
+ * The member may transition to the STABLE state if the partitions that he
+ * was awaiting on are no longer in the desired target assignment.
Review Comment:
on "no longer in the desired assignment": does this mean that a new target
assignment was built? but even then, it should go to UNACKNOWLEDGED_ASSIGNMENT
first right?
or does the "target assignment" here mean the assignment the coordinator
sends to the member?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##########
@@ -301,11 +322,9 @@ public String toString() {
private final int previousMemberEpoch;
/**
- * The next member epoch. This corresponds to the target
- * assignment epoch used to compute the current assigned,
- * revoking and assigning partitions.
+ * The member state.
*/
- private final int targetMemberEpoch;
+ private final MemberState state;
Review Comment:
so that i understand: a member no longer has a targetMemberEpoch and we rely
on the member state to determine which state in the reconciliation process the
member is in. is this correct?
i vaguely remember fencing requests based on member epochs but looking at
this PR it looks like targetMemberEpoch is only used to determine which state
we should transition the member to.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]