lucasbru commented on code in PR #21495:
URL: https://github.com/apache/kafka/pull/21495#discussion_r2846983313
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -817,11 +817,13 @@ private void transitionToStale() {
* - There are topics that haven't been added to the current assignment
yet, but all their topic IDs
* are missing from the target assignment.
*
- * @param canCommit Controls whether reconciliation can proceed when
auto-commit is enabled.
- * Set to true only when the current offset positions are
safe to commit.
- * If false and auto-commit enabled, the reconciliation
will be skipped.
+ * @param invokedByPoll True if this reconciliation attempt is triggered
by the application thread on consumer.poll().
Review Comment:
nit: invokedByPoll is set to false when called from poll. From
RequestManager.poll. That's a bit confusing. Maybe call it
`invokedByConsumerPoll` or `invokedByUserPoll`?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -817,11 +817,13 @@ private void transitionToStale() {
* - There are topics that haven't been added to the current assignment
yet, but all their topic IDs
* are missing from the target assignment.
*
- * @param canCommit Controls whether reconciliation can proceed when
auto-commit is enabled.
- * Set to true only when the current offset positions are
safe to commit.
- * If false and auto-commit enabled, the reconciliation
will be skipped.
+ * @param invokedByPoll True if this reconciliation attempt is triggered
by the application thread on consumer.poll().
+ * False if this is triggered by the background
thread on regular manager poll.
+ * In both cases we want to resolve metadata to
unresolved assignments,
+ * but the actual reconciliation (commit, callbacks,
assignment updates)
+ * will only proceed if this is triggered from the
application thread on consumer.poll
*/
- public void maybeReconcile(boolean canCommit) {
+ public void maybeReconcile(boolean invokedByPoll) {
Review Comment:
ShareConsumer aside, I am not sure this change fully fixes the problem, but
maybe I am just not seeing it. It's specifically about the case where we are
triggering the reconciliation from `poll`, have a rebalance listener
registered, but aren't waiting long enough before exiting poll.
The event sequence that I see is this:
1. maybeReconcile(true) is called on the background thread during
AsyncPollEvent processing
2. It kicks off the reconciliation chain: commit → revokePartitions() →
enqueues ConsumerRebalanceListenerCallbackNeededEvent onto the background event
queue
3. Back on the application thread, we executes the onPartitionsRevoked
callback, and sends back a ConsumerRebalanceListenerCallbackCompletedEvent
4. The background thread picks up that completed event, calls
future.complete(), which triggers the thenCompose → assignPartitions() →
updateSubscriptionAwaitingCallback()
I think after Step 3, the application thread can immediately exit
`Consumer.poll` and go into `Consumer.seek`. Does `isValidatePositionsComplete`
actually gate against this? I do not see it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]