This is an automated email from the ASF dual-hosted git repository. capistrant pushed a commit to branch fabric8-retry-improvements in repository https://gitbox.apache.org/repos/asf/druid.git
commit e84ae3de03866857c4eea6d187ae8aa497bac686 Author: capistrant <[email protected]> AuthorDate: Fri Aug 15 17:02:41 2025 -0500 Start building out more robust fabric8 retries --- .../k8s/overlord/common/DruidK8sConstants.java | 10 ++ .../k8s/overlord/common/KubernetesPeonClient.java | 187 ++++++++++++++++++--- .../overlord/common/KubernetesPeonClientTest.java | 138 ++++++++++++++- 3 files changed, 314 insertions(+), 21 deletions(-) diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java index b1d8ac262b1..fddedc20c2d 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java @@ -19,7 +19,12 @@ package org.apache.druid.k8s.overlord.common; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.vertx.core.http.HttpClosedException; + public class DruidK8sConstants { @@ -56,4 +61,9 @@ public class DruidK8sConstants // exceeded quota: {value}, requested: {value}, used: {value}, limited: {value} "exceeded quota:" ); + + public static final ImmutableMap<Class<? extends RuntimeException>, Optional<String>> TRANSITIVE_CONNECTION_POOL_EXCEPTIONS = ImmutableMap.of( + KubernetesClientException.class, Optional.of("Connection was closed"), + HttpClosedException.class, Optional.absent() + ); } diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java index 70f76076a9a..78298288fe8 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java @@ -88,29 +88,20 @@ public class KubernetesPeonClient String jobName = job.getMetadata().getName(); log.info("Submitting job[%s] for task[%s].", jobName, task.getId()); - client.batch() - .v1() - .jobs() - .inNamespace(namespace) - .resource(job) - .create(); + createK8sJobWithRetries(job); log.info("Submitted job[%s] for task[%s]. Waiting for POD to launch.", jobName, task.getId()); - // wait until the pod is running or complete or failed, any of those is fine + // Wait for the pod to be available Pod mainPod = getPeonPodWithRetries(jobName); - Pod result = client.pods() - .inNamespace(namespace) - .withName(mainPod.getMetadata().getName()) - .waitUntilCondition(pod -> { - if (pod == null) { - return true; - } - return pod.getStatus() != null && pod.getStatus().getPodIP() != null; - }, howLong, timeUnit); - + log.info("Pod for job[%s] launched for task[%s]. Waiting for pod to be in running state.", jobName, task.getId()); + + // Wait for the pod to be in state running, completed, or failed. + Pod result = waitForPodResultWithRetries(mainPod, howLong, timeUnit); + if (result == null) { throw new ISE("K8s pod for the task [%s] appeared and disappeared. It can happen if the task was canceled", task.getId()); } + log.info("Pod for job[%s] is in state [%s] for task[%s].", jobName, result.getStatus().getPhase(), task.getId()); long duration = System.currentTimeMillis() - start; emitK8sPodMetrics(task, "k8s/peon/startup/time", duration); return result; @@ -290,11 +281,139 @@ public class KubernetesPeonClient return pods.isEmpty() ? Optional.absent() : Optional.of(pods.get(0)); } + public Pod waitForPodResultWithRetries(final Pod pod, long howLong, TimeUnit timeUnit) + { + return clientApi.executeRequest(client -> waitForPodResultWithRetries(client, pod, howLong, timeUnit, 5, RetryUtils.DEFAULT_MAX_TRIES)); + } + public Pod getPeonPodWithRetries(String jobName) { return clientApi.executeRequest(client -> getPeonPodWithRetries(client, jobName, 5, RetryUtils.DEFAULT_MAX_TRIES)); } + public void createK8sJobWithRetries(Job job) + { + clientApi.executeRequest(client -> { + createK8sJobWithRetries(client, job, 5, RetryUtils.DEFAULT_MAX_TRIES); + return null; + }); + } + + /** + * Creates a Kubernetes job with retry logic for transient connection pool exceptions. + * <p> + * This method attempts to create the specified job in Kubernetes with built-in retry logic + * for transient connection pool issues. If the job already exists (HTTP 409 conflict), + * the method returns successfully without throwing an exception, assuming the job was + * already submitted by a previous request. + * <p> + * The retry logic only applies to transient connection pool exceptions as defined in + * {@link DruidK8sConstants#TRANSITIVE_CONNECTION_POOL_EXCEPTIONS}. Other exceptions + * will cause the method to fail immediately. + * + * @param client the Kubernetes client to use for job creation + * @param job the Kubernetes job to create + * @param quietTries number of initial retry attempts without logging warnings + * @param maxTries maximum total number of retry attempts + * @throws DruidException if job creation fails after all retry attempts or encounters non-retryable errors + */ + @VisibleForTesting + void createK8sJobWithRetries(KubernetesClient client, Job job, int quietTries, int maxTries) + { + try { + RetryUtils.retry( + () -> { + try { + client.batch() + .v1() + .jobs() + .inNamespace(namespace) + .resource(job) + .create(); + return null; + } + catch (KubernetesClientException e) { + if (e.getCode() == 409) { + // Job already exists, return successfully + log.info("K8s job[%s] already exists, skipping creation", job.getMetadata().getName()); + return null; + } + throw e; + } + }, + this::isRetryableTransientConnectionPoolException, quietTries, maxTries + ); + } + catch (Exception e) { + throw DruidException.defensive(e, "Error when creating K8s job[%s]", job.getMetadata().getName()); + } + } + + /** + * Waits for a Kubernetes pod to reach a ready state with retry logic for transient connection pool exceptions. + * <p> + * This method waits for the specified pod to have a valid status with a pod IP assigned, indicating + * it has been scheduled and is in a ready state. The method includes retry logic to handle transient + * connection pool exceptions that may occur during the wait operation. + * <p> + * The retry logic only applies to transient connection pool exceptions as defined in + * {@link DruidK8sConstants#TRANSITIVE_CONNECTION_POOL_EXCEPTIONS}. The method will wait up to the + * specified timeout for the pod to become ready, and retry the entire wait operation if transient + * connection issues are encountered. + * + * @param client the Kubernetes client to use for pod operations + * @param pod the pod to wait for + * @param howLong the maximum time to wait for the pod to become ready + * @param timeUnit the time unit for the wait timeout + * @param quietTries number of initial retry attempts without logging warnings + * @param maxTries maximum total number of retry attempts + * @return the pod in its ready state, or null if the pod disappeared or wait operation failed + * @throws DruidException if waiting fails after all retry attempts or encounters non-retryable errors + */ + @VisibleForTesting + Pod waitForPodResultWithRetries(KubernetesClient client, Pod pod, long howLong, TimeUnit timeUnit, int quietTries, int maxTries) + { + try { + return RetryUtils.retry( + () -> client.pods() + .inNamespace(namespace) + .withName(pod.getMetadata().getName()) + .waitUntilCondition( + p -> { + if (p == null) { + return true; + } + return p.getStatus() != null && p.getStatus().getPodIP() != null; + }, howLong, timeUnit), + this::isRetryableTransientConnectionPoolException, quietTries, maxTries); + } + catch (Exception e) { + throw DruidException.defensive(e, "Error when waiting for pod[%s] to start", pod.getMetadata().getName()); + } + } + + /** + * Retrieves the pod associated with a Kubernetes job with retry logic for transient failures. + * <p> + * This method searches for a pod with the specified job name label and includes retry logic + * to handle both transient connection pool exceptions and cases where the pod may not be + * immediately available after job creation. If no pod is found, the method examines job + * events to provide detailed error information about pod creation failures. + * <p> + * The retry logic applies to: + * <ul> + * <li>Transient connection pool exceptions as defined in {@link DruidK8sConstants#TRANSITIVE_CONNECTION_POOL_EXCEPTIONS}</li> + * <li>Pod not found scenarios, except when blacklisted error messages from {@link DruidK8sConstants#BLACKLISTED_PEON_POD_ERROR_MESSAGES} are encountered</li> + * </ul> + * + * @param client the Kubernetes client to use for pod and event operations + * @param jobName the name of the job whose pod should be retrieved + * @param quietTries number of initial retry attempts without logging warnings + * @param maxTries maximum total number of retry attempts + * @return the pod associated with the job + * @throws KubernetesResourceNotFoundException if the pod cannot be found after all retry attempts + * @throws DruidException if retrieval fails due to other errors + */ @VisibleForTesting Pod getPeonPodWithRetries(KubernetesClient client, String jobName, int quietTries, int maxTries) { @@ -317,7 +436,7 @@ public class KubernetesPeonClient "Job[%s] failed to create pods. Message[%s]", jobName, latestEvent.getMessage()); } }, - this::shouldRetryStartingPeonPod, quietTries, maxTries + this::shouldRetryWaitForStartingPeonPod, quietTries, maxTries ); } catch (KubernetesResourceNotFoundException e) { @@ -337,8 +456,12 @@ public class KubernetesPeonClient * These substrings, found in {@link DruidK8sConstants#BLACKLISTED_PEON_POD_ERROR_MESSAGES}, * represent Kubernetes event that indicate a retry for starting the Peon Pod would likely be futile. */ - private boolean shouldRetryStartingPeonPod(Throwable e) + private boolean shouldRetryWaitForStartingPeonPod(Throwable e) { + if (isRetryableTransientConnectionPoolException(e)) { + return true; + } + if (!(e instanceof KubernetesResourceNotFoundException)) { return false; } @@ -353,6 +476,32 @@ public class KubernetesPeonClient return true; } + /** + * Checks if the exception is a potentially transient connection pool exception. + * <p> + * This method checks if the exception is one of the known transient connection pool exceptions + * and whether it contains a specific message substring, if applicable. + * <p> + * We have experienced connections in the pool being closed by the server-side but remaining in the pool. These issues + * should be safe to retry in many cases. + */ + private boolean isRetryableTransientConnectionPoolException(Throwable e) + { + for (var entry : DruidK8sConstants.TRANSITIVE_CONNECTION_POOL_EXCEPTIONS.entrySet()) { + Class<? extends RuntimeException> exceptionClass = entry.getKey(); + Optional<String> messageSubstring = entry.getValue(); + + if (exceptionClass.isInstance(e)) { + if (messageSubstring.isPresent()) { + return e.getMessage() != null && e.getMessage().contains(messageSubstring.get()); + } else { + return true; + } + } + } + return false; + } + private List<Event> getPeonEvents(KubernetesClient client, String jobName) { ObjectReference objectReference = new ObjectReferenceBuilder() diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java index 9c831217914..9f5670c333b 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java @@ -27,7 +27,6 @@ import io.fabric8.kubernetes.api.model.PodListBuilder; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.KubernetesClientTimeoutException; import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; @@ -145,7 +144,7 @@ public class KubernetesPeonClientTest client.pods().inNamespace(NAMESPACE).resource(pod).create(); Assertions.assertThrows( - KubernetesClientTimeoutException.class, + DruidException.class, () -> instance.launchPeonJobAndWaitForStart(job, NoopTask.create(), 1, TimeUnit.SECONDS) ); } @@ -713,4 +712,139 @@ public class KubernetesPeonClientTest Optional<LogWatch> maybeLogWatch = instance.getPeonLogWatcher(new K8sTaskId(TASK_NAME_PREFIX, ID)); Assertions.assertFalse(maybeLogWatch.isPresent()); } + + @Test + void test_createK8sJobWithRetries_withSuccessfulCreation_createsJob() + { + Job job = new JobBuilder() + .withNewMetadata() + .withName(KUBERNETES_JOB_NAME) + .endMetadata() + .build(); + + // Should not throw any exception + instance.createK8sJobWithRetries(job); + + // Verify job was created + Job createdJob = client.batch().v1().jobs().inNamespace(NAMESPACE).withName(KUBERNETES_JOB_NAME).get(); + Assertions.assertNotNull(createdJob); + Assertions.assertEquals(KUBERNETES_JOB_NAME, createdJob.getMetadata().getName()); + } + + @Test + void test_createK8sJobWithRetries_withNonRetryableException_failsImmediately() + { + Job job = new JobBuilder() + .withNewMetadata() + .withName(KUBERNETES_JOB_NAME) + .endMetadata() + .build(); + + String jobPath = "/apis/batch/v1/namespaces/" + NAMESPACE + "/jobs"; + + // Return 403 Forbidden - this is not a retryable exception + server.expect().post() + .withPath(jobPath) + .andReturn(HttpURLConnection.HTTP_FORBIDDEN, "Forbidden: insufficient permissions") + .once(); + + // Should fail immediately without retries + DruidException e = Assertions.assertThrows( + DruidException.class, + () -> instance.createK8sJobWithRetries(clientApi.getClient(), job, 0, 5) + ); + + // Verify the error message contains our job name + Assertions.assertTrue(e.getMessage().contains(KUBERNETES_JOB_NAME)); + } + + @Test + void test_createK8sJobWithRetries_withJobAlreadyExists_succeedsGracefully() + { + Job job = new JobBuilder() + .withNewMetadata() + .withName(KUBERNETES_JOB_NAME) + .endMetadata() + .build(); + + String jobPath = "/apis/batch/v1/namespaces/" + NAMESPACE + "/jobs"; + + // Return 409 Conflict - job already exists + server.expect().post() + .withPath(jobPath) + .andReturn(HttpURLConnection.HTTP_CONFLICT, "Job already exists") + .once(); + + // Should succeed gracefully without throwing exception + Assertions.assertDoesNotThrow( + () -> instance.createK8sJobWithRetries(clientApi.getClient(), job, 0, 5) + ); + } + + @Test + void test_waitForPodResultWithRetries_withSuccessfulPodReady_returnsPod() + { + Pod pod = new PodBuilder() + .withNewMetadata() + .withName(POD_NAME) + .endMetadata() + .withNewStatus() + .withPodIP("192.168.1.100") + .endStatus() + .build(); + + // Create the pod in the mock client + client.pods().inNamespace(NAMESPACE).resource(pod).create(); + + // Should return the pod successfully + Pod result = instance.waitForPodResultWithRetries( + clientApi.getClient(), + pod, + 1, + TimeUnit.SECONDS, + 0, + 3 + ); + + Assertions.assertNotNull(result); + Assertions.assertEquals(POD_NAME, result.getMetadata().getName()); + Assertions.assertEquals("192.168.1.100", result.getStatus().getPodIP()); + } + + @Test + void test_waitForPodResultWithRetries_withNonRetryableFailure_throwsDruidException() + { + Pod pod = new PodBuilder() + .withNewMetadata() + .withName(POD_NAME) + .endMetadata() + .withNewStatus() + .withPodIP(null) // Pod without IP, will timeout + .endStatus() + .build(); + + String podPath = "/api/v1/namespaces/" + NAMESPACE + "/pods/" + POD_NAME; + + // Mock server to return the pod without IP, causing timeout + server.expect().get() + .withPath(podPath + "?watch=true") + .andReturn(HttpURLConnection.HTTP_INTERNAL_ERROR, "Internal server error") + .once(); + + // Should throw DruidException after failure + DruidException e = Assertions.assertThrows( + DruidException.class, + () -> instance.waitForPodResultWithRetries( + clientApi.getClient(), + pod, + 1, + TimeUnit.MILLISECONDS, // Very short timeout to force failure + 0, + 1 + ) + ); + + // Verify the error message contains our pod name + Assertions.assertTrue(e.getMessage().contains(POD_NAME)); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
