RivenSun2 commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r908114549
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -745,6 +745,10 @@ protected boolean onJoinPrepare(int generation, String
memberId) {
boolean onJoinPrepareAsyncCommitCompleted = false;
// async commit offsets prior to rebalance if auto-commit enabled
RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
+ // wait for commit offset response if future exist
+ if (future != null) {
+ client.poll(future,
time.timer(rebalanceConfig.rebalanceTimeoutMs));
+ }
Review Comment:
If `client.poll(future, time.timer(rebalanceConfig.rebalanceTimeoutMs));` is
still called here, the problem maybe will recur, and the user maybe still block
the `poll` method of kafkaConsumer. ` rebalanceConfig.rebalanceTimeoutMs` may
be much larger than `pollDuration`
Suggest:
1. Promote the `future` variable in `onJoinPrepare` to the instance variable
of `ConsumerCoordinator`. The variable name can be tentatively
`rebalanceAutoCommitFuture`, and the initial value is `null`.
` private RequestFuture<Void> rebalanceAutoCommitFuture =null;`
2. Refactor the `onJoinPrepare` method. The `rebalanceAutoCommitFuture` can
be completed after the user has called the `poll` method multiple times without
blocking the user's `poll` method.
```
boolean onJoinPrepareAsyncCommitCompleted = false;
if(autoCommitEnabled && rebalanceAutoCommitFuture == null){
// async commit offsets prior to rebalance if auto-commit enabled
rebalanceAutoCommitFuture = maybeAutoCommitOffsetsAsync();
}
if (rebalanceAutoCommitFuture != null) {
client.poll(rebalanceAutoCommitFuture, time.timer(0));
}
// return true when
// 1. future is null, which means no commit request sent, so it is
still considered completed
// 2. offset commit completed
// 3. offset commit failed with non-retriable exception
if (rebalanceAutoCommitFuture == null)
onJoinPrepareAsyncCommitCompleted = true;
else if (rebalanceAutoCommitFuture.succeeded()) {
onJoinPrepareAsyncCommitCompleted = true;
rebalanceAutoCommitFuture = null;
} else if (rebalanceAutoCommitFuture.failed() &&
!rebalanceAutoCommitFuture.isRetriable()) {
log.error("Asynchronous auto-commit of offsets failed: {}",
rebalanceAutoCommitFuture.exception().getMessage());
onJoinPrepareAsyncCommitCompleted = true;
rebalanceAutoCommitFuture = null;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]