vvcephei commented on code in PR #13490:
URL: https://github.com/apache/kafka/pull/13490#discussion_r1162130597


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -447,6 +482,19 @@ public void close() {
 
     @Override
     public void close(Duration timeout) {
+        if (timeout.toMillis() < 0)
+            throw new IllegalArgumentException("The timeout cannot be 
negative.");
+        try {
+            if (!closed) {
+                close(timeout, false);
+            }
+        } finally {
+            closed = true;

Review Comment:
   It's interesting to set this only after the call to close completes or fails.
   
   I could see setting it only after it completes to ensure it's really closed, 
or setting it with a CAS before calling the inner close so that multiple 
callers don't all try to close at once.
   
   I'm not sure I see the rationale of doing it this way, though. If there is a 
reason I'm not seeing, could you add a comment explaining it so the code will 
be clear?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -319,26 +328,52 @@ public Map<TopicPartition, OffsetAndMetadata> 
committed(final Set<TopicPartition
         return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs));
     }
 
+    /**
+     * Retrieve the last committed offset for the given partition (whether the 
commit happened by this process or
+     * another). This offset will be used as the position for the consumer in 
the event of a failure.
+     * <p>
+     * If the timeout specified by {@code default.api.timeout.ms} expires
+     * {@link org.apache.kafka.common.errors.TimeoutException} is thrown.
+     *
+     * @param partitions The partition to check
+     * @param timeout The maximum time to block.
+     * @return The last committed offset and metadata or null if there was no 
prior commit
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link 
#wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted before or while
+     *             this function is called
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
+     *             configured groupId. See the exception for more details
+     * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
+     * @throws org.apache.kafka.common.errors.TimeoutException if the 
committed offset cannot be found before
+     *             the timeout specified by {@code default.api.timeout.ms} 
expires.
+     */
     @Override
     public Map<TopicPartition, OffsetAndMetadata> committed(final 
Set<TopicPartition> partitions,
                                                             final Duration 
timeout) {
+        maybeWakeup();
         maybeThrowInvalidGroupIdException();
+
         if (partitions.isEmpty()) {
             return new HashMap<>();
         }
 
         final OffsetFetchApplicationEvent event = new 
OffsetFetchApplicationEvent(partitions);
+        activeFutures.add(event.future());
         eventHandler.add(event);
         try {
-            return event.complete(Duration.ofMillis(100));
+
+            return event.complete(timeout);
+        } catch (ExecutionException e) {
+            throw new KafkaException(e);
         } catch (InterruptedException e) {
-            throw new InterruptException(e);
+            throw new InterruptException(e.getMessage());

Review Comment:
   What's up with this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to