rhauch commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r818198568



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the 
callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being 
thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link 
org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code 
retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in 
between the retries.
+     *
+     * @param callable          the function to execute.
+     * @param timeoutDuration   timeout duration
+     * @param retryBackoffMs    the number of milliseconds to delay upon 
receiving a
+     *                          {@link 
org.apache.kafka.connect.errors.RetriableException} before retrying again;
+     *                          must be 0 or more
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+
+    public static <T> T retryUntilTimeout(Callable<T> callable, Duration 
timeoutDuration, long retryBackoffMs) throws Exception {
+        long timeoutMs = timeoutDuration.toMillis();
+        if (retryBackoffMs >= timeoutMs) {
+            log.warn("retryBackoffMs, {}, needs to be less than timeoutMs, {}. 
 Callable will only execute once.", retryBackoffMs, timeoutMs);
+        }
+
+        if (retryBackoffMs >= timeoutMs ||
+                timeoutMs <= 0) {
+            // no retry
+            return callable.call();
+        }
+
+        long end = System.currentTimeMillis() + timeoutMs;
+        int attempt = 0;
+        Throwable lastError = null;
+        while (System.currentTimeMillis() <= end) {
+            attempt++;
+            try {
+                return callable.call();
+            } catch (RetriableException | 
org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("RetriableException caught on attempt {}, retrying 
automatically. " +
+                        "Reason: {}", attempt, e.getMessage(), e);

Review comment:
       Likewise, this log message could be changed to:
   ```suggestion
                   log.warn("Attempt {} to {} resulted in RetriableException; 
retrying automatically. " +
                           "Reason: {}", attempt, description.get(), 
e.getMessage(), e);
   ```
   

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -124,6 +124,9 @@ public String toString() {
     public static final int NO_PARTITIONS = -1;
     public static final short NO_REPLICATION_FACTOR = -1;
 
+    private static final String DEFAULT_ADMIN_CLIENT_RETRIES = "10";
+    private static final String DEFAULT_ADMIN_CLIENT_BACKOFF_MS = "100";
+

Review comment:
       These are unused now, right?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the 
callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being 
thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link 
org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code 
retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in 
between the retries.
+     *
+     * @param callable          the function to execute.
+     * @param timeoutDuration   timeout duration
+     * @param retryBackoffMs    the number of milliseconds to delay upon 
receiving a
+     *                          {@link 
org.apache.kafka.connect.errors.RetriableException} before retrying again;
+     *                          must be 0 or more
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+
+    public static <T> T retryUntilTimeout(Callable<T> callable, Duration 
timeoutDuration, long retryBackoffMs) throws Exception {
+        long timeoutMs = timeoutDuration.toMillis();
+        if (retryBackoffMs >= timeoutMs) {
+            log.warn("retryBackoffMs, {}, needs to be less than timeoutMs, {}. 
 Callable will only execute once.", retryBackoffMs, timeoutMs);

Review comment:
       All of the log messages in this method could use a description of the 
callable. WDYT about adding a `Supplier<String> description` parameter, and 
using that in the log messages. For example, the `KafkaBasedLog` could supply 
   ```
   () -> "list offsets for topic '" + topicName + "'"
   ```
   then this log message might be:
   
   ```suggestion
               log.warn("Call to {} will only execute once, since 
retryBackoffMs={} is larger than total timeoutMs={}"),
                   description.get(), retryBackoffMs, timeoutMs);
   ```

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
##########
@@ -465,6 +467,55 @@ public void verifyingGettingTopicCleanupPolicies() {
         }
     }
 
+    @Test
+    public void retryEndOffsetsShouldThrowConnectException() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = 1000L;
+        Cluster cluster = createCluster(1, "myTopic", 1);
+
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(10), cluster)) {
+            Map<TopicPartition, Long> offsetMap = new HashMap<>();
+            offsetMap.put(tp1, offset);
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE));
+            Map<String, Object> adminConfig = new HashMap<>();
+            adminConfig.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0");
+            TopicAdmin admin = new TopicAdmin(adminConfig, env.adminClient());
+
+            assertThrows(ConnectException.class, () -> {
+                admin.retryEndOffsets(tps, Duration.ofMillis(100), 1);
+            });
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldRetryWhenTopicNotFound() {

Review comment:
       Nit:
   ```suggestion
       public void retryEndOffsetsShouldRetryWhenTopicNotFound() {
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the 
callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being 
thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link 
org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code 
retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in 
between the retries.
+     *
+     * @param callable          the function to execute.
+     * @param timeoutDuration   timeout duration
+     * @param retryBackoffMs    the number of milliseconds to delay upon 
receiving a
+     *                          {@link 
org.apache.kafka.connect.errors.RetriableException} before retrying again;
+     *                          must be 0 or more
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+
+    public static <T> T retryUntilTimeout(Callable<T> callable, Duration 
timeoutDuration, long retryBackoffMs) throws Exception {
+        long timeoutMs = timeoutDuration.toMillis();
+        if (retryBackoffMs >= timeoutMs) {
+            log.warn("retryBackoffMs, {}, needs to be less than timeoutMs, {}. 
 Callable will only execute once.", retryBackoffMs, timeoutMs);
+        }
+
+        if (retryBackoffMs >= timeoutMs ||
+                timeoutMs <= 0) {
+            // no retry
+            return callable.call();
+        }
+
+        long end = System.currentTimeMillis() + timeoutMs;
+        int attempt = 0;
+        Throwable lastError = null;
+        while (System.currentTimeMillis() <= end) {
+            attempt++;
+            try {
+                return callable.call();
+            } catch (RetriableException | 
org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("RetriableException caught on attempt {}, retrying 
automatically. " +
+                        "Reason: {}", attempt, e.getMessage(), e);
+                lastError = e;
+            } catch (WakeupException e) {
+                lastError = e;
+            }
+
+            // if current time is less than the ending time, no more retry is 
necessary
+            // won't sleep if retryBackoffMs equals to 0
+            if (retryBackoffMs > 0 && System.currentTimeMillis() < end) {
+                Utils.sleep(retryBackoffMs);
+            }

Review comment:
       WDYT about this:
   ```suggestion
               long millisRemaining = Math.max(0, end - 
System.currentTimeMillis());
               if (millisRemaining > 0) {
                   Utils.sleep(millisRemaining)
               }
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the 
callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being 
thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link 
org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code 
retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in 
between the retries.
+     *
+     * @param callable          the function to execute.
+     * @param timeoutDuration   timeout duration
+     * @param retryBackoffMs    the number of milliseconds to delay upon 
receiving a
+     *                          {@link 
org.apache.kafka.connect.errors.RetriableException} before retrying again;
+     *                          must be 0 or more
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+
+    public static <T> T retryUntilTimeout(Callable<T> callable, Duration 
timeoutDuration, long retryBackoffMs) throws Exception {
+        long timeoutMs = timeoutDuration.toMillis();
+        if (retryBackoffMs >= timeoutMs) {
+            log.warn("retryBackoffMs, {}, needs to be less than timeoutMs, {}. 
 Callable will only execute once.", retryBackoffMs, timeoutMs);
+        }
+
+        if (retryBackoffMs >= timeoutMs ||
+                timeoutMs <= 0) {
+            // no retry
+            return callable.call();
+        }
+
+        long end = System.currentTimeMillis() + timeoutMs;
+        int attempt = 0;
+        Throwable lastError = null;
+        while (System.currentTimeMillis() <= end) {
+            attempt++;
+            try {
+                return callable.call();
+            } catch (RetriableException | 
org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("RetriableException caught on attempt {}, retrying 
automatically. " +
+                        "Reason: {}", attempt, e.getMessage(), e);
+                lastError = e;
+            } catch (WakeupException e) {
+                lastError = e;
+            }
+
+            // if current time is less than the ending time, no more retry is 
necessary
+            // won't sleep if retryBackoffMs equals to 0
+            if (retryBackoffMs > 0 && System.currentTimeMillis() < end) {
+                Utils.sleep(retryBackoffMs);
+            }
+        }
+
+        throw new ConnectException("Fail to retry the task after " + attempt + 
" attempts.  Reason: " + lastError.getMessage(), lastError);

Review comment:
       And this exception message could also use the description:
   ```suggestion
           throw new ConnectException("Fail to " + description.get() + " after 
" + attempt + " attempts.  Reason: " + lastError.getMessage(), lastError);
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -76,6 +76,10 @@
     private static final Logger log = 
LoggerFactory.getLogger(KafkaBasedLog.class);
     private static final long CREATE_TOPIC_TIMEOUT_NS = 
TimeUnit.SECONDS.toNanos(30);
     private static final long MAX_SLEEP_MS = TimeUnit.SECONDS.toMillis(1);
+    // 15min of admin retry duration to ensure successful metadata 
propagation.  10 seconds of backoff
+    // in between retries
+    private static final Duration ADMIN_CLIENT_RETRY_DURATION = 
Duration.ofMinutes(15);
+    private static final long ADMIN_CLIENT_RETRY_BACKOFF_MS = 10 * 1000;

Review comment:
       Nit: our convention is to use either `TimeUnit` or `Duration` to compute 
the millis to make it more readable and to reduce risk of conversion errors. 
Using `Duration` is a bit more readable, but the lines above already use 
`TimeUnit`:
   ```suggestion
       private static final long ADMIN_CLIENT_RETRY_BACKOFF_MS = 
TimeUnit.SECONDS.toMillis(10);
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the 
callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being 
thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link 
org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code 
retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in 
between the retries.
+     *
+     * @param callable          the function to execute.
+     * @param timeoutDuration   timeout duration

Review comment:
       Nit: since we require a non-null `Duration`, we should state that here:
   ```suggestion
        * @param timeoutDuration   timeout duration; may not be null
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to