AndrewJSchofield commented on code in PR #14912:
URL: https://github.com/apache/kafka/pull/14912#discussion_r1414423829
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1169,34 +1169,48 @@ private Fetch<K, V> collectFetch() {
* @return true iff the operation completed without timing out
*/
private boolean updateFetchPositions(final Timer timer) {
- // Validate positions using the partition leader end offsets, to
detect if any partition
- // has been truncated due to a leader change. This will trigger an
OffsetForLeaderEpoch
- // request, retrieve the partition end offsets, and validate the
current position against it.
- applicationEventHandler.add(new ValidatePositionsApplicationEvent());
-
- cachedSubscriptionHasAllFetchPositions =
subscriptions.hasAllFetchPositions();
- if (cachedSubscriptionHasAllFetchPositions) return true;
-
- // Reset positions using committed offsets retrieved from the group
coordinator, for any
- // partitions which do not have a valid position and are not awaiting
reset. This will
- // trigger an OffsetFetch request and update positions with the
offsets retrieved. This
- // will only do a coordinator lookup if there are partitions which
have missing
- // positions, so a consumer with manually assigned partitions can
avoid a coordinator
- // dependence by always ensuring that assigned partitions have an
initial position.
- if (isCommittedOffsetsManagementEnabled() &&
!initWithCommittedOffsetsIfNeeded(timer))
- return false;
-
- // If there are partitions still needing a position and a reset policy
is defined,
- // request reset using the default policy. If no reset strategy is
defined and there
- // are partitions with a missing position, then we will raise a
NoOffsetForPartitionException exception.
- subscriptions.resetInitializingPositions();
+ try {
+ // Validate positions using the partition leader end offsets, to
detect if any partition
+ // has been truncated due to a leader change. This will trigger an
OffsetForLeaderEpoch
+ // request, retrieve the partition end offsets, and validate the
current position against it.
+ // If the timer is not expired, wait for the validation,
otherwise, just request it.
+ if (timer.notExpired()) {
+ applicationEventHandler.addAndGet(new
ValidatePositionsApplicationEvent(), timer);
+ } else {
+ applicationEventHandler.add(new
ValidatePositionsApplicationEvent());
+ }
Review Comment:
I really ought to change this a bit. The method signature should be
`updateFetchPositions(Timer timer, boolean shouldWait)`. If the caller has
`Duration.ofMillis(0)`, the code will not wait but it should send the async
requests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]