philipnee commented on code in PR #13490: URL: https://github.com/apache/kafka/pull/13490#discussion_r1169363951
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -319,26 +327,52 @@ public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs)); } + /** + * Retrieve the last committed offset for the given partition (whether the commit happened by this process or + * another). This offset will be used as the position for the consumer in the event of a failure. + * <p> + * If the timeout specified by {@code default.api.timeout.ms} expires + * {@link org.apache.kafka.common.errors.TimeoutException} is thrown. + * + * @param partitions The partition to check + * @param timeout The maximum time to block. + * @return The last committed offset and metadata or null if there was no prior commit + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while + * this function is called + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the + * configured groupId. See the exception for more details + * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + * @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before + * the timeout specified by {@code default.api.timeout.ms} expires. + */ @Override public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions, final Duration timeout) { + maybeWakeup(); maybeThrowInvalidGroupIdException(); + if (partitions.isEmpty()) { return new HashMap<>(); } final OffsetFetchApplicationEvent event = new OffsetFetchApplicationEvent(partitions); + activeFutures.add(event.future()); eventHandler.add(event); try { - return event.complete(Duration.ofMillis(100)); + + return event.complete(timeout); + } catch (ExecutionException e) { + throw new KafkaException(e); } catch (InterruptedException e) { throw new InterruptException(e); } catch (TimeoutException e) { throw new org.apache.kafka.common.errors.TimeoutException(e); - } catch (ExecutionException e) { - // Execution exception is thrown here - throw new KafkaException(e); - } catch (Exception e) { + } catch (WakeupException e) { + this.activeFutures.remove(event.future()); Review Comment: same, we should probably remove the future upon any failure/completion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org