kirktrue commented on code in PR #15455:
URL: https://github.com/apache/kafka/pull/15455#discussion_r1511613214
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -937,13 +938,14 @@ public Map<TopicPartition, OffsetAndMetadata>
committed(final Set<TopicPartition
return Collections.emptyMap();
}
+ final Timer timer = time.timer(timeout);
final FetchCommittedOffsetsEvent event = new
FetchCommittedOffsetsEvent(
partitions,
- timeout.toMillis());
+ timer);
wakeupTrigger.setActiveTask(event.future());
try {
final Map<TopicPartition, OffsetAndMetadata> committedOffsets
= applicationEventHandler.addAndGet(event,
- time.timer(timeout));
+ timer);
Review Comment:
You're correct on both counts! 😄
> 1. Could you also not pass the timer to `addAndGet()` and use the timer of
the event? To me, it seems like the timer passed to the event is always the
timer also passed to `addAndGet()`.
Indeed it is the same `Timer` object. I intend to resolve that confusion as
part of KAFKA-15974 and to keep this one smaller in scope.
> 2. Is it correct to use `timer.remainingMs()` when waiting for the future
of the event? Do we not risk to timeout the future before we check the timer
for expiration?
It is not, but the proper fix to change the underlying design is more
substantial, so I was saving that change as part of KAFKA-15974.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]