cadonna commented on code in PR #15455:
URL: https://github.com/apache/kafka/pull/15455#discussion_r1510930206
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -177,7 +177,7 @@ private void process(final FetchCommittedOffsetsEvent
event) {
return;
}
CommitRequestManager manager =
requestManagers.commitRequestManager.get();
- long expirationTimeMs = getExpirationTimeForTimeout(event.timeout());
+ long expirationTimeMs =
getExpirationTimeForTimeout(event.deadlineMs());
Review Comment:
Isn't `event.deadlineMs()` already a point in time instead of a time span?
If yes, you do not need to compute the point in time anymore, do you?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -937,13 +938,14 @@ public Map<TopicPartition, OffsetAndMetadata>
committed(final Set<TopicPartition
return Collections.emptyMap();
}
+ final Timer timer = time.timer(timeout);
final FetchCommittedOffsetsEvent event = new
FetchCommittedOffsetsEvent(
partitions,
- timeout.toMillis());
+ timer);
wakeupTrigger.setActiveTask(event.future());
try {
final Map<TopicPartition, OffsetAndMetadata> committedOffsets
= applicationEventHandler.addAndGet(event,
- time.timer(timeout));
+ timer);
Review Comment:
I have two questions/comments here:
1. Could you also not pass the timer to `addAndGet()` and use the timer of
the event? To me, it seems like the timer passed to the event is always the
timer also passed to `addAndGet()`.
2. Is it correct to use `timer.remainingMs()` when waiting for the future of
the event? Do we not risk to timeout the future before we check the timer for
expiration?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]