kirktrue commented on code in PR #14357: URL: https://github.com/apache/kafka/pull/14357#discussion_r1353544098
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignmentReconciler.java: ########## @@ -85,122 +111,162 @@ enum ReconciliationResult { private final BlockingQueue<BackgroundEvent> backgroundEventQueue; private Optional<RebalanceCallbackEvent> inflightCallback; - public MemberAssignmentReconciler(LogContext logContext, - SubscriptionState subscriptions, - ConsumerMetadata metadata, - BlockingQueue<BackgroundEvent> backgroundEventQueue) { + AssignmentReconciler(LogContext logContext, + SubscriptionState subscriptions, + ConsumerMetadata metadata, + BlockingQueue<BackgroundEvent> backgroundEventQueue) { this.log = logContext.logger(getClass()); this.subscriptions = subscriptions; this.metadata = metadata; this.backgroundEventQueue = backgroundEventQueue; } /** - * Perform the revocation process, if necessary, depending on the given {@link Assignment target assignment}. If the - * {@link SubscriptionState#assignedPartitions() current set of assigned partitions} includes entries that are - * <em>not</em> in the target assignment, these will be considered for revocation. If there is already a - * reconciliation in progress (revocation or assignment), this method will return without performing any - * revocation. + * Perform the reconciliation process, as necessary to meet the given {@link Assignment target assignment}. Note + * that the reconciliation is a multi-step process, and this method should be invoked on each heartbeat if + * the coordinator provides a {@link Assignment target assignment}. * * @param assignment Target {@link Assignment} * @return {@link ReconciliationResult} */ - ReconciliationResult revoke(Optional<Assignment> assignment) { + ReconciliationResult maybeReconcile(Optional<Assignment> assignment) { // Check for any outstanding operations first. If a conclusive result has already been reached, return that // before processing any further. - Optional<ReconciliationResult> inflightStatus = checkInflightStatus(); + if (inflightCallback.isPresent()) { + // We don't actually need the _result_ of the event, just to know that it's complete. + if (inflightCallback.get().future().isDone()) { + // This is the happy path--we completed the callback. Clear out our inflight callback first, though. + inflightCallback = Optional.empty(); Review Comment: Is the `HeartbeatRequestManager` going to call `AssignmentReconciler.lose()` to drop the partitions at that point? If so, this is the code at the top of that `lose()` method: ```java ReconciliationResult lose() { // Clear the inflight callback reference. This is done regardless of if one existed; if there was one it is // now abandoned because we're going to "lose" our partitions. This will also allow us to skip the inflight // check the other steps take. inflightCallback = Optional.empty(); . . . ``` Does that seem sufficient, or is more needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org