dajac commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1362082878
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -68,14 +107,16 @@ public MembershipManagerImpl(String groupId, String
groupInstanceId, AssignorSel
}
this.groupInstanceId = Optional.ofNullable(groupInstanceId);
this.targetAssignment = Optional.empty();
- this.nextTargetAssignment = Optional.empty();
+ this.log = logContext.logger(MembershipManagerImpl.class);
}
/**
* Update assignor selection for the member.
*
- * @param assignorSelection New assignor selection
- * @throws IllegalArgumentException If the provided assignor selection is
null
+ * @param assignorSelection New assignor selection. If empty is provided,
this will
+ * effectively clear the previous assignor
selection defined for the
+ * member.
+ * @throws IllegalArgumentException If the provided optional assignor
selection is null.
*/
public final void setAssignorSelection(AssignorSelection
assignorSelection) {
Review Comment:
btw, we can't change the assignor while the consumer runs. We could probably
remove this method or make it private.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -19,46 +19,85 @@
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
import java.util.Optional;
/**
- * Membership manager that maintains group membership for a single member
following the new
+ * Membership manager that maintains group membership for a single member,
following the new
* consumer group protocol.
* <p/>
- * This keeps membership state and assignment updated in-memory, based on the
heartbeat responses
- * the member receives. It is also responsible for computing assignment for
the group based on
- * the metadata, if the member has been selected by the broker to do so.
+ * This is responsible for:
+ * <li>Keeping member info (ex. member id, member epoch, assignment, etc.)</li>
+ * <li>Keeping member state as defined in {@link MemberState}.</li>
+ * <p/>
+ * Member info and state are updated based on the heartbeat responses the
member receives.
*/
public class MembershipManagerImpl implements MembershipManager {
+ /**
+ * Group ID of the consumer group the member will be part of, provided
when creating the current
+ * membership manager.
+ */
private final String groupId;
+
+ /**
+ * Group instance ID to be used by the member, provided when creating the
current membership manager.
+ */
private final Optional<String> groupInstanceId;
+
+ /**
+ * Member ID assigned by the server to the member, received in a heartbeat
response when
+ * joining the group specified in {@link #groupId}
+ */
private String memberId;
+
+ /**
+ * Current epoch of the member. It will be set to 0 by the member, and
provided to the server
+ * on the heartbeat request, to join the group. It will be then maintained
by the server,
+ * incremented as the member reconciles and acknowledges the assignments
it receives. It will
+ * be reset to 0 if the member gets fenced.
+ */
private int memberEpoch;
+
+ /**
+ * Current state of this member as part of the consumer group, as defined
in {@link MemberState}
+ */
private MemberState state;
+
+ /**
+ * Assignor selection configured for the member, that will be sent out to
the server on the
+ * {@link ConsumerGroupHeartbeatRequest}. This will default to using
server-side assignor,
+ * letting the server choose the specific assignor implementation to use.
+ */
private AssignorSelection assignorSelection;
/**
* Assignment that the member received from the server and successfully
processed.
*/
private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+
/**
* Assignment that the member received from the server but hasn't
completely processed
* yet.
*/
private Optional<ConsumerGroupHeartbeatResponseData.Assignment>
targetAssignment;
+
/**
- * Latest assignment that the member received from the server while a
{@link #targetAssignment}
- * was in process.
+ * Logger.
*/
- private Optional<ConsumerGroupHeartbeatResponseData.Assignment>
nextTargetAssignment;
+ private final Logger log;
- public MembershipManagerImpl(String groupId) {
- this(groupId, null, null);
+ public MembershipManagerImpl(String groupId, LogContext logContext) {
+ this(groupId, null, null, logContext);
}
- public MembershipManagerImpl(String groupId, String groupInstanceId,
AssignorSelection assignorSelection) {
+ public MembershipManagerImpl(String groupId,
+ String groupInstanceId,
+ AssignorSelection assignorSelection,
Review Comment:
I wonder if we should just replace AssignorSelection by a String or an
Optional. This is what it is at the moment.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##########
@@ -21,58 +21,82 @@
import java.util.Optional;
/**
- * Manages group membership for a single member.
+ * A stateful object tracking the state of a single member in relationship to
a consumer group:
+ * <p/>
* Responsible for:
* <li>Keeping member state</li>
* <li>Keeping assignment for the member</li>
* <li>Computing assignment for the group if the member is required to do
so<li/>
*/
public interface MembershipManager {
+ /**
+ * @return Group ID of the consumer group the member is part of (or wants
to be part of).
+ */
String groupId();
+ /**
+ * @return Instance ID used by the member when joining the group. If
non-empty, it will indicate that
+ * this is a static member.
+ */
Optional<String> groupInstanceId();
+ /**
+ * @return Member ID assigned by the server to this member when it joins
the consumer group.
+ */
String memberId();
+ /**
+ * @return Current epoch of the member, maintained by the server.
+ */
int memberEpoch();
+ /**
+ * @return Current state of this member in relationship to a consumer
group, as defined in
+ * {@link MemberState}.
+ */
MemberState state();
/**
- * Update the current state of the member based on a heartbeat response
+ * Update member info and transition member state based on a heartbeat
response.
+ *
+ * @param response Heartbeat response to extract member info and errors
from.
*/
void updateState(ConsumerGroupHeartbeatResponseData response);
/**
- * Returns the {@link AssignorSelection} for the member
+ * @return {@link AssignorSelection} configured for the member, that will
be sent out to
Review Comment:
nit: There is an extra space after `return`.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -160,92 +229,92 @@ private boolean maybeTransitionToStable() {
return state.equals(MemberState.STABLE);
}
+ /**
+ * Take new target assignment received from the server and set it as
targetAssignment to be
+ * processed. Following the consumer group protocol, the server won't send
a new target
+ * member while a previous one hasn't been acknowledged by the member, so
this will fail
+ * if a target assignment already exists.
+ *
+ * @throws IllegalStateException If a target assignment already exists.
+ */
private void
setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment
newTargetAssignment) {
if (!targetAssignment.isPresent()) {
+ log.info("Member {} accepted new target assignment {} to
reconcile", memberId, newTargetAssignment);
targetAssignment = Optional.of(newTargetAssignment);
} else {
- // Keep the latest next target assignment
- nextTargetAssignment = Optional.of(newTargetAssignment);
+ transitionToFailed();
+ throw new IllegalStateException("Cannot set new target assignment
because a " +
+ "previous one pending to be reconciled already exists.");
}
}
- private boolean hasPendingTargetAssignment() {
- return targetAssignment.isPresent() ||
nextTargetAssignment.isPresent();
- }
-
-
- /**
- * Update state and assignment as the member has successfully processed a
new target
- * assignment.
- * This indicates the end of the reconciliation phase for the member, and
makes the target
- * assignment the new current assignment.
- *
- * @param assignment Target assignment the member was able to successfully
process
- */
- public void
onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment
assignment) {
- updateAssignment(assignment);
- transitionTo(MemberState.STABLE);
- }
-
/**
- * Update state and member info as the member was not able to process the
assignment, due to
- * errors in the execution of the user-provided callbacks.
- *
- * @param error Exception found during the execution of the user-provided
callbacks
+ * Returns true if the member has a target assignment being processed.
*/
- public void onAssignmentProcessFailure(Throwable error) {
- transitionTo(MemberState.FAILED);
- // TODO: update member info appropriately, to clear up whatever
shouldn't be kept in
- // this unrecoverable state
+ private boolean hasPendingTargetAssignment() {
+ return targetAssignment.isPresent();
}
private void resetEpoch() {
this.memberEpoch = 0;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public MemberState state() {
return state;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public AssignorSelection assignorSelection() {
return this.assignorSelection;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
- public ConsumerGroupHeartbeatResponseData.Assignment assignment() {
+ public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() {
return this.currentAssignment;
}
- // VisibleForTesting
+
+ /**
+ * Assignment that the member received from the server but hasn't
completely processed yet.
Review Comment:
nit: `@return`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]