philipnee commented on code in PR #13490:
URL: https://github.com/apache/kafka/pull/13490#discussion_r1169363951


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -319,26 +327,52 @@ public Map<TopicPartition, OffsetAndMetadata> 
committed(final Set<TopicPartition
         return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs));
     }
 
+    /**
+     * Retrieve the last committed offset for the given partition (whether the 
commit happened by this process or
+     * another). This offset will be used as the position for the consumer in 
the event of a failure.
+     * <p>
+     * If the timeout specified by {@code default.api.timeout.ms} expires
+     * {@link org.apache.kafka.common.errors.TimeoutException} is thrown.
+     *
+     * @param partitions The partition to check
+     * @param timeout The maximum time to block.
+     * @return The last committed offset and metadata or null if there was no 
prior commit
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link 
#wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted before or while
+     *             this function is called
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
+     *             configured groupId. See the exception for more details
+     * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
+     * @throws org.apache.kafka.common.errors.TimeoutException if the 
committed offset cannot be found before
+     *             the timeout specified by {@code default.api.timeout.ms} 
expires.
+     */
     @Override
     public Map<TopicPartition, OffsetAndMetadata> committed(final 
Set<TopicPartition> partitions,
                                                             final Duration 
timeout) {
+        maybeWakeup();
         maybeThrowInvalidGroupIdException();
+
         if (partitions.isEmpty()) {
             return new HashMap<>();
         }
 
         final OffsetFetchApplicationEvent event = new 
OffsetFetchApplicationEvent(partitions);
+        activeFutures.add(event.future());
         eventHandler.add(event);
         try {
-            return event.complete(Duration.ofMillis(100));
+
+            return event.complete(timeout);
+        } catch (ExecutionException e) {
+            throw new KafkaException(e);
         } catch (InterruptedException e) {
             throw new InterruptException(e);
         } catch (TimeoutException e) {
             throw new org.apache.kafka.common.errors.TimeoutException(e);
-        } catch (ExecutionException e) {
-            // Execution exception is thrown here
-            throw new KafkaException(e);
-        } catch (Exception e) {
+        } catch (WakeupException e) {
+            this.activeFutures.remove(event.future());

Review Comment:
   same, we should probably remove the future upon any failure/completion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to