rhauch commented on a change in pull request #11797: URL: https://github.com/apache/kafka/pull/11797#discussion_r818198568
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.concurrent.Callable; + +public class RetryUtil { + private static final Logger log = LoggerFactory.getLogger(RetryUtil.class); + + /** + * The method executes the callable at least once, optionally retrying the callable if + * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown. If timeout is exhausted, + * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown. + * + * <p>If {@code timeoutDuration} is set to 0, the task will be + * executed exactly once. If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be + * executed only once. + * + * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries. + * + * @param callable the function to execute. + * @param timeoutDuration timeout duration + * @param retryBackoffMs the number of milliseconds to delay upon receiving a + * {@link org.apache.kafka.connect.errors.RetriableException} before retrying again; + * must be 0 or more + * @throws ConnectException If the task exhausted all the retries. + */ + + public static <T> T retryUntilTimeout(Callable<T> callable, Duration timeoutDuration, long retryBackoffMs) throws Exception { + long timeoutMs = timeoutDuration.toMillis(); + if (retryBackoffMs >= timeoutMs) { + log.warn("retryBackoffMs, {}, needs to be less than timeoutMs, {}. Callable will only execute once.", retryBackoffMs, timeoutMs); + } + + if (retryBackoffMs >= timeoutMs || + timeoutMs <= 0) { + // no retry + return callable.call(); + } + + long end = System.currentTimeMillis() + timeoutMs; + int attempt = 0; + Throwable lastError = null; + while (System.currentTimeMillis() <= end) { + attempt++; + try { + return callable.call(); + } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) { + log.warn("RetriableException caught on attempt {}, retrying automatically. " + + "Reason: {}", attempt, e.getMessage(), e); Review comment: Likewise, this log message could be changed to: ```suggestion log.warn("Attempt {} to {} resulted in RetriableException; retrying automatically. " + "Reason: {}", attempt, description.get(), e.getMessage(), e); ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java ########## @@ -124,6 +124,9 @@ public String toString() { public static final int NO_PARTITIONS = -1; public static final short NO_REPLICATION_FACTOR = -1; + private static final String DEFAULT_ADMIN_CLIENT_RETRIES = "10"; + private static final String DEFAULT_ADMIN_CLIENT_BACKOFF_MS = "100"; + Review comment: These are unused now, right? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.concurrent.Callable; + +public class RetryUtil { + private static final Logger log = LoggerFactory.getLogger(RetryUtil.class); + + /** + * The method executes the callable at least once, optionally retrying the callable if + * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown. If timeout is exhausted, + * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown. + * + * <p>If {@code timeoutDuration} is set to 0, the task will be + * executed exactly once. If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be + * executed only once. + * + * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries. + * + * @param callable the function to execute. + * @param timeoutDuration timeout duration + * @param retryBackoffMs the number of milliseconds to delay upon receiving a + * {@link org.apache.kafka.connect.errors.RetriableException} before retrying again; + * must be 0 or more + * @throws ConnectException If the task exhausted all the retries. + */ + + public static <T> T retryUntilTimeout(Callable<T> callable, Duration timeoutDuration, long retryBackoffMs) throws Exception { + long timeoutMs = timeoutDuration.toMillis(); + if (retryBackoffMs >= timeoutMs) { + log.warn("retryBackoffMs, {}, needs to be less than timeoutMs, {}. Callable will only execute once.", retryBackoffMs, timeoutMs); Review comment: All of the log messages in this method could use a description of the callable. WDYT about adding a `Supplier<String> description` parameter, and using that in the log messages. For example, the `KafkaBasedLog` could supply ``` () -> "list offsets for topic '" + topicName + "'" ``` then this log message might be: ```suggestion log.warn("Call to {} will only execute once, since retryBackoffMs={} is larger than total timeoutMs={}"), description.get(), retryBackoffMs, timeoutMs); ``` ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java ########## @@ -465,6 +467,55 @@ public void verifyingGettingTopicCleanupPolicies() { } } + @Test + public void retryEndOffsetsShouldThrowConnectException() { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set<TopicPartition> tps = Collections.singleton(tp1); + Long offset = 1000L; + Cluster cluster = createCluster(1, "myTopic", 1); + + try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(10), cluster)) { + Map<TopicPartition, Long> offsetMap = new HashMap<>(); + offsetMap.put(tp1, offset); + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE)); + Map<String, Object> adminConfig = new HashMap<>(); + adminConfig.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0"); + TopicAdmin admin = new TopicAdmin(adminConfig, env.adminClient()); + + assertThrows(ConnectException.class, () -> { + admin.retryEndOffsets(tps, Duration.ofMillis(100), 1); + }); + } + } + + @Test + public void endOffsetsShouldRetryWhenTopicNotFound() { Review comment: Nit: ```suggestion public void retryEndOffsetsShouldRetryWhenTopicNotFound() { ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.concurrent.Callable; + +public class RetryUtil { + private static final Logger log = LoggerFactory.getLogger(RetryUtil.class); + + /** + * The method executes the callable at least once, optionally retrying the callable if + * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown. If timeout is exhausted, + * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown. + * + * <p>If {@code timeoutDuration} is set to 0, the task will be + * executed exactly once. If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be + * executed only once. + * + * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries. + * + * @param callable the function to execute. + * @param timeoutDuration timeout duration + * @param retryBackoffMs the number of milliseconds to delay upon receiving a + * {@link org.apache.kafka.connect.errors.RetriableException} before retrying again; + * must be 0 or more + * @throws ConnectException If the task exhausted all the retries. + */ + + public static <T> T retryUntilTimeout(Callable<T> callable, Duration timeoutDuration, long retryBackoffMs) throws Exception { + long timeoutMs = timeoutDuration.toMillis(); + if (retryBackoffMs >= timeoutMs) { + log.warn("retryBackoffMs, {}, needs to be less than timeoutMs, {}. Callable will only execute once.", retryBackoffMs, timeoutMs); + } + + if (retryBackoffMs >= timeoutMs || + timeoutMs <= 0) { + // no retry + return callable.call(); + } + + long end = System.currentTimeMillis() + timeoutMs; + int attempt = 0; + Throwable lastError = null; + while (System.currentTimeMillis() <= end) { + attempt++; + try { + return callable.call(); + } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) { + log.warn("RetriableException caught on attempt {}, retrying automatically. " + + "Reason: {}", attempt, e.getMessage(), e); + lastError = e; + } catch (WakeupException e) { + lastError = e; + } + + // if current time is less than the ending time, no more retry is necessary + // won't sleep if retryBackoffMs equals to 0 + if (retryBackoffMs > 0 && System.currentTimeMillis() < end) { + Utils.sleep(retryBackoffMs); + } Review comment: WDYT about this: ```suggestion long millisRemaining = Math.max(0, end - System.currentTimeMillis()); if (millisRemaining > 0) { Utils.sleep(millisRemaining) } ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.concurrent.Callable; + +public class RetryUtil { + private static final Logger log = LoggerFactory.getLogger(RetryUtil.class); + + /** + * The method executes the callable at least once, optionally retrying the callable if + * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown. If timeout is exhausted, + * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown. + * + * <p>If {@code timeoutDuration} is set to 0, the task will be + * executed exactly once. If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be + * executed only once. + * + * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries. + * + * @param callable the function to execute. + * @param timeoutDuration timeout duration + * @param retryBackoffMs the number of milliseconds to delay upon receiving a + * {@link org.apache.kafka.connect.errors.RetriableException} before retrying again; + * must be 0 or more + * @throws ConnectException If the task exhausted all the retries. + */ + + public static <T> T retryUntilTimeout(Callable<T> callable, Duration timeoutDuration, long retryBackoffMs) throws Exception { + long timeoutMs = timeoutDuration.toMillis(); + if (retryBackoffMs >= timeoutMs) { + log.warn("retryBackoffMs, {}, needs to be less than timeoutMs, {}. Callable will only execute once.", retryBackoffMs, timeoutMs); + } + + if (retryBackoffMs >= timeoutMs || + timeoutMs <= 0) { + // no retry + return callable.call(); + } + + long end = System.currentTimeMillis() + timeoutMs; + int attempt = 0; + Throwable lastError = null; + while (System.currentTimeMillis() <= end) { + attempt++; + try { + return callable.call(); + } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) { + log.warn("RetriableException caught on attempt {}, retrying automatically. " + + "Reason: {}", attempt, e.getMessage(), e); + lastError = e; + } catch (WakeupException e) { + lastError = e; + } + + // if current time is less than the ending time, no more retry is necessary + // won't sleep if retryBackoffMs equals to 0 + if (retryBackoffMs > 0 && System.currentTimeMillis() < end) { + Utils.sleep(retryBackoffMs); + } + } + + throw new ConnectException("Fail to retry the task after " + attempt + " attempts. Reason: " + lastError.getMessage(), lastError); Review comment: And this exception message could also use the description: ```suggestion throw new ConnectException("Fail to " + description.get() + " after " + attempt + " attempts. Reason: " + lastError.getMessage(), lastError); ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ########## @@ -76,6 +76,10 @@ private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class); private static final long CREATE_TOPIC_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30); private static final long MAX_SLEEP_MS = TimeUnit.SECONDS.toMillis(1); + // 15min of admin retry duration to ensure successful metadata propagation. 10 seconds of backoff + // in between retries + private static final Duration ADMIN_CLIENT_RETRY_DURATION = Duration.ofMinutes(15); + private static final long ADMIN_CLIENT_RETRY_BACKOFF_MS = 10 * 1000; Review comment: Nit: our convention is to use either `TimeUnit` or `Duration` to compute the millis to make it more readable and to reduce risk of conversion errors. Using `Duration` is a bit more readable, but the lines above already use `TimeUnit`: ```suggestion private static final long ADMIN_CLIENT_RETRY_BACKOFF_MS = TimeUnit.SECONDS.toMillis(10); ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.concurrent.Callable; + +public class RetryUtil { + private static final Logger log = LoggerFactory.getLogger(RetryUtil.class); + + /** + * The method executes the callable at least once, optionally retrying the callable if + * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown. If timeout is exhausted, + * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown. + * + * <p>If {@code timeoutDuration} is set to 0, the task will be + * executed exactly once. If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be + * executed only once. + * + * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries. + * + * @param callable the function to execute. + * @param timeoutDuration timeout duration Review comment: Nit: since we require a non-null `Duration`, we should state that here: ```suggestion * @param timeoutDuration timeout duration; may not be null ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
