cadonna commented on code in PR #14557:
URL: https://github.com/apache/kafka/pull/14557#discussion_r1426483385


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -463,35 +467,34 @@ public void testEnsurePollExecutedCommitAsyncCallbacks() {
         MockCommitCallback callback = new MockCommitCallback();
         CompletableFuture<Void> future = new CompletableFuture<>();
         consumer.assign(Collections.singleton(new TopicPartition("foo", 0)));
-        doReturn(future).when(consumer).commit(new HashMap<>(), false);
+        doReturn(future).when(consumer).commit(new HashMap<>(), false, 
Optional.empty());
         assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), 
callback));
         future.complete(null);
-        assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO),
-            callback,
-            null);
+        assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO), 
null);
     }
 
     @Test
     public void testEnsureShutdownExecutedCommitAsyncCallbacks() {
         MockCommitCallback callback = new MockCommitCallback();
         CompletableFuture<Void> future = new CompletableFuture<>();
-        doReturn(future).when(consumer).commit(new HashMap<>(), false);
+        doReturn(future).when(consumer).commit(new HashMap<>(), false, 
Optional.empty());
         assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), 
callback));
         future.complete(null);
-        assertMockCommitCallbackInvoked(() -> consumer.close(),
-            callback,
-            null);
+        assertMockCommitCallbackInvoked(() -> consumer.close(), null);
     }
 
     private void assertMockCommitCallbackInvoked(final Executable task,
-                                                 final MockCommitCallback 
callback,
-                                                 final Errors errors) {
-        assertDoesNotThrow(task);
-        assertEquals(1, callback.invoked);
-        if (errors == null)
-            assertNull(callback.exception);
-        else if (errors.exception() instanceof RetriableException)
-            assertTrue(callback.exception instanceof 
RetriableCommitFailedException);
+                                                 final Throwable 
expectedException) {
+        if (expectedException == null) {
+            assertDoesNotThrow(task);
+        } else {
+            Throwable t = assertThrows(Throwable.class, task);
+            if (expectedException instanceof RetriableException) {
+                assertTrue(t instanceof RetriableCommitFailedException);
+            } else {
+                assertEquals(t.getClass(), expectedException.getClass());
+            }

Review Comment:
   This seems overly complicated.
   
   Instead of 
   
   ```
   Throwable t = assertThrows(Throwable.class, task);
   ```
   
   you could use
   
   ```
   Throwable t = assertThrows(expectedException.getClass(), task);
   ```
   If you use the above, I do not think you need to do the following, because 
you have already verified the class of the exception and you set the expected 
exception in your test:
   
   ```
   if (expectedException instanceof RetriableException) {
   ...
   }
   ```
   
   Could you also please verify the message of the exception?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -190,63 +182,91 @@ private static long findMinTime(final Collection<? 
extends RequestState> request
      * has elapsed.
      *
      * @param offsets Offsets to commit
+     * @param expirationTimeMs Time until which the request will continue to 
be retried if it fails
+     *                         with a retriable error. If not present, the 
request will be sent
+     *                         but not retried.
      * @return Future that will complete when a response is received for the 
request, or a
      * completed future if no request is generated.
      */
-    public CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, 
OffsetAndMetadata> offsets) {
-        if (!canAutoCommit()) {
+    private CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
+                                                    final Optional<Long> 
expirationTimeMs,
+                                                    boolean checkInterval) {
+        if (!autoCommitEnabled()) {
+            log.debug("Skipping auto-commit because auto-commit config is not 
enabled.");
             return CompletableFuture.completedFuture(null);
         }
 
         AutoCommitState autocommit = autoCommitState.get();
-        if (!autocommit.shouldAutoCommit()) {
+        if (checkInterval && !autocommit.shouldAutoCommit()) {
             return CompletableFuture.completedFuture(null);
         }
 
-        CompletableFuture<Void> result = sendAutoCommit(offsets);
+        CompletableFuture<Void> result = addOffsetCommitRequest(offsets, 
expirationTimeMs)
+            .whenComplete(autoCommitCallback(offsets));
         autocommit.resetTimer();
         autocommit.setInflightCommitStatus(true);
         return result;
     }
 
     /**
      * If auto-commit is enabled, this will generate a commit offsets request 
for all assigned
-     * partitions and their current positions.
+     * partitions and their current positions. Note on auto-commit timers: 
this will reset the
+     * auto-commit timer to the interval before issuing the async commit, and 
when the async commit
+     * completes, it will reset the auto-commit timer with the exponential 
backoff if the request
+     * failed with a retriable error.
      *
      * @return Future that will complete when a response is received for the 
request, or a
      * completed future if no request is generated.
      */
-    public CompletableFuture<Void> maybeAutoCommitAllConsumed() {
-        return maybeAutoCommit(subscriptions.allConsumed());
-    }
+    public CompletableFuture<Void> maybeAutoCommitAllConsumedAsync() {
+        if (!autoCommitEnabled()) {
+            // Early return to ensure that no action/logging is performed.
+            return CompletableFuture.completedFuture(null);
+        }
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
subscriptions.allConsumed();
+        CompletableFuture<Void> result = maybeAutoCommit(offsets, 
Optional.empty(), true);
+        result.whenComplete((__, error) -> {

Review Comment:
   Are underscores allowed in all Java versions we support?
   
   https://openjdk.org/jeps/302#Treatment-of-underscores says
   
   > Phase 1 was forbidding underscore as a lambda formal parameter name in 
Java 8 
   
   and we still support Java 8.
   The builds have a different compile error at the moment, so I cannot verify 
this there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to