vvcephei commented on code in PR #13490: URL: https://github.com/apache/kafka/pull/13490#discussion_r1162130597
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -447,6 +482,19 @@ public void close() { @Override public void close(Duration timeout) { + if (timeout.toMillis() < 0) + throw new IllegalArgumentException("The timeout cannot be negative."); + try { + if (!closed) { + close(timeout, false); + } + } finally { + closed = true; Review Comment: It's interesting to set this only after the call to close completes or fails. I could see setting it only after it completes to ensure it's really closed, or setting it with a CAS before calling the inner close so that multiple callers don't all try to close at once. I'm not sure I see the rationale of doing it this way, though. If there is a reason I'm not seeing, could you add a comment explaining it so the code will be clear? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -319,26 +328,52 @@ public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs)); } + /** + * Retrieve the last committed offset for the given partition (whether the commit happened by this process or + * another). This offset will be used as the position for the consumer in the event of a failure. + * <p> + * If the timeout specified by {@code default.api.timeout.ms} expires + * {@link org.apache.kafka.common.errors.TimeoutException} is thrown. + * + * @param partitions The partition to check + * @param timeout The maximum time to block. + * @return The last committed offset and metadata or null if there was no prior commit + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while + * this function is called + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the + * configured groupId. See the exception for more details + * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + * @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before + * the timeout specified by {@code default.api.timeout.ms} expires. + */ @Override public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions, final Duration timeout) { + maybeWakeup(); maybeThrowInvalidGroupIdException(); + if (partitions.isEmpty()) { return new HashMap<>(); } final OffsetFetchApplicationEvent event = new OffsetFetchApplicationEvent(partitions); + activeFutures.add(event.future()); eventHandler.add(event); try { - return event.complete(Duration.ofMillis(100)); + + return event.complete(timeout); + } catch (ExecutionException e) { + throw new KafkaException(e); } catch (InterruptedException e) { - throw new InterruptException(e); + throw new InterruptException(e.getMessage()); Review Comment: What's up with this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org