This is an automated email from the ASF dual-hosted git repository.

pcongiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git


The following commit(s) were added to refs/heads/main by this push:
     new a2a74db22 fix(ctrl): monitor Pod ready condition
a2a74db22 is described below

commit a2a74db226fce7f0b67ddc0ef7df441656b0c638
Author: Pasquale Congiusti <[email protected]>
AuthorDate: Thu Jan 4 15:28:30 2024 +0100

    fix(ctrl): monitor Pod ready condition
    
    When the user uses a startup probe, the Integration won't turn as running 
until the condition is reached
    
    Closes #4977
---
 pkg/controller/integration/monitor.go            | 161 +++++++----------------
 pkg/controller/integration/monitor_cronjob.go    |   2 +-
 pkg/controller/integration/monitor_deployment.go |   4 +-
 pkg/controller/integration/monitor_knative.go    |   2 +-
 4 files changed, 50 insertions(+), 119 deletions(-)

diff --git a/pkg/controller/integration/monitor.go 
b/pkg/controller/integration/monitor.go
index 7c5a6b7c6..9a6208fcb 100644
--- a/pkg/controller/integration/monitor.go
+++ b/pkg/controller/integration/monitor.go
@@ -254,7 +254,7 @@ func getIntegrationSecretsAndConfigmaps(ctx 
context.Context, client client.Clien
 type controller interface {
        checkReadyCondition(ctx context.Context) (bool, error)
        getPodSpec() corev1.PodSpec
-       updateReadyCondition(readyPods []corev1.Pod) bool
+       updateReadyCondition(readyPods int) bool
 }
 
 func (action *monitorAction) newController(env *trait.Environment, integration 
*v1.Integration) (controller, error) {
@@ -318,33 +318,32 @@ func (action *monitorAction) 
updateIntegrationPhaseAndReadyCondition(
        if err != nil {
                return err
        }
-
-       readyPods, unreadyPods := filterPodsByReadyStatus(environment, 
runningPods, controller.getPodSpec())
-
        if done, err := controller.checkReadyCondition(ctx); done || err != nil 
{
-               if len(readyPods) > 0 || len(unreadyPods) > 0 {
-                       // There may be pods that are not ready but still 
probable for getting error messages.
-                       // Ignore returned error from probing as it's expected 
when the ctrl obj is not ready.
-                       _ = action.probeReadiness(ctx, environment, 
integration, unreadyPods, readyPods)
-               }
+               // There may be pods that are not ready but still probable for 
getting error messages.
+               // Ignore returned error from probing as it's expected when the 
ctrl obj is not ready.
+               _, _, _ = action.probeReadiness(ctx, environment, integration, 
runningPods)
                return err
        }
-       if done := checkPodStatuses(integration, pendingPods, runningPods); 
done {
+       if arePodsFailingStatuses(integration, pendingPods, runningPods) {
                return nil
        }
-       integration.Status.Phase = v1.IntegrationPhaseRunning
-
-       if done := controller.updateReadyCondition(readyPods); done {
+       readyPods, probeOk, err := action.probeReadiness(ctx, environment, 
integration, runningPods)
+       if err != nil {
+               return err
+       }
+       if !probeOk {
+               integration.Status.Phase = v1.IntegrationPhaseError
                return nil
        }
-       if err := action.probeReadiness(ctx, environment, integration, 
unreadyPods, readyPods); err != nil {
-               return err
+       if done := controller.updateReadyCondition(readyPods); done {
+               integration.Status.Phase = v1.IntegrationPhaseRunning
+               return nil
        }
 
        return nil
 }
 
-func checkPodStatuses(integration *v1.Integration, pendingPods []corev1.Pod, 
runningPods []corev1.Pod) bool {
+func arePodsFailingStatuses(integration *v1.Integration, pendingPods 
[]corev1.Pod, runningPods []corev1.Pod) bool {
        // Check Pods statuses
        for _, pod := range pendingPods {
                // Check the scheduled condition
@@ -396,114 +395,44 @@ func checkPodStatuses(integration *v1.Integration, 
pendingPods []corev1.Pod, run
        return false
 }
 
-func filterPodsByReadyStatus(environment *trait.Environment, runningPods 
[]corev1.Pod, podSpec corev1.PodSpec) ([]corev1.Pod, []corev1.Pod) {
-       var readyPods []corev1.Pod
-       var unreadyPods []corev1.Pod
-
-       integrationContainerName := environment.GetIntegrationContainerName()
-       for _, pod := range runningPods {
-               // We compare the Integration PodSpec to that of the Pod in 
order to make
-               // sure we account for up-to-date version.
-               if !comparePodSpec(integrationContainerName, podSpec, pod.Spec) 
{
-                       continue
-               }
-               ready := kubernetes.GetPodCondition(pod, corev1.PodReady)
-               if ready == nil {
-                       continue
-               }
-               switch ready.Status {
-               case corev1.ConditionTrue:
-                       // We still account terminating Pods to handle rolling 
deployments
-                       readyPods = append(readyPods, pod)
-               case corev1.ConditionFalse:
-                       if pod.DeletionTimestamp != nil {
-                               continue
-                       }
-                       unreadyPods = append(unreadyPods, pod)
-               }
-       }
-
-       return readyPods, unreadyPods
-}
-
-// comparePodSpec compares given pod spec according to integration specific 
information (e.g. digest, container image).
-func comparePodSpec(integrationContainerName string, runningPodSpec 
corev1.PodSpec, referencePod corev1.PodSpec) bool {
-       runningPodContainer := 
findIntegrationContainer(integrationContainerName, runningPodSpec)
-       referencePodContainer := 
findIntegrationContainer(integrationContainerName, referencePod)
-
-       if runningPodContainer == nil || referencePodContainer == nil {
-               return false
-       }
-
-       // integration digest must be the same
-       if getIntegrationDigest(runningPodContainer.Env) != 
getIntegrationDigest(referencePodContainer.Env) {
-               return false
-       }
-
-       // integration container image must be the same (same integration kit)
-       if runningPodContainer.Image != referencePodContainer.Image {
-               return false
-       }
-
-       return true
-}
-
-func getIntegrationDigest(envs []corev1.EnvVar) string {
-       for _, env := range envs {
-               if env.Name == digest.IntegrationDigestEnvVar {
-                       return env.Value
-               }
-       }
-
-       return ""
-}
-
-// findIntegrationContainer find if present the integration container in the 
pod spec using the integration specifications.
-func findIntegrationContainer(integrationContainerName string, spec 
corev1.PodSpec) *corev1.Container {
-       for _, c := range spec.Containers {
-               if c.Name == integrationContainerName {
-                       return &c
-               }
-       }
-
-       return nil
-}
-
 // probeReadiness calls the readiness probes of the non-ready Pods directly to 
retrieve insights from the Camel runtime.
-func (action *monitorAction) probeReadiness(
-       ctx context.Context, environment *trait.Environment, integration 
*v1.Integration,
-       unreadyPods []corev1.Pod, readyPods []corev1.Pod,
-) error {
+// The func return the number of readyPods, the success of the probe and any 
error may have happened during its execution.
+func (action *monitorAction) probeReadiness(ctx context.Context, environment 
*trait.Environment, integration *v1.Integration, pods []corev1.Pod) (int, bool, 
error) {
+       // as a default we assume the Integration is Ready
        readyCondition := v1.IntegrationCondition{
                Type:   v1.IntegrationConditionReady,
-               Status: corev1.ConditionFalse,
-               Pods:   make([]v1.PodCondition, len(unreadyPods)),
+               Status: corev1.ConditionTrue,
+               Pods:   make([]v1.PodCondition, len(pods)),
        }
 
+       readyPods := 0
+       unreadyPods := 0
+
        runtimeReady := true
        runtimeFailed := false
+       probeReadinessOk := true
 
-       for i := range unreadyPods {
-               pod := &unreadyPods[i]
-               if ready := kubernetes.GetPodCondition(*pod, corev1.PodReady); 
ready.Reason != "ContainersNotReady" {
-                       continue
-               }
-               container := getIntegrationContainer(environment, pod)
-               if container == nil {
-                       return fmt.Errorf("integration container not found in 
Pod %s/%s", pod.Namespace, pod.Name)
-               }
-
+       for i := range pods {
+               pod := &pods[i]
                readyCondition.Pods[i].Name = pod.Name
-
                for p := range pod.Status.Conditions {
                        if pod.Status.Conditions[p].Type == corev1.PodReady {
                                readyCondition.Pods[i].Condition = 
pod.Status.Conditions[p]
                                break
                        }
                }
+               // If it's in ready status, then we don't care to probe.
+               if ready := kubernetes.GetPodCondition(*pod, corev1.PodReady); 
ready.Status == corev1.ConditionTrue {
+                       readyPods++
+                       continue
+               }
+               unreadyPods++
+               container := getIntegrationContainer(environment, pod)
+               if container == nil {
+                       return readyPods, false, fmt.Errorf("integration 
container not found in Pod %s/%s", pod.Namespace, pod.Name)
+               }
                if probe := container.ReadinessProbe; probe != nil && 
probe.HTTPGet != nil {
                        body, err := proxyGetHTTPProbe(ctx, action.client, 
probe, pod, container)
-
                        // When invoking the HTTP probe, the kubernetes client 
exposes a very
                        // specific behavior:
                        //
@@ -559,7 +488,7 @@ func (action *monitorAction) probeReadiness(
 
                        health, err := NewHealthCheck(body)
                        if err != nil {
-                               return err
+                               return readyPods, false, err
                        }
                        for _, check := range health.Checks {
                                if check.Status == v1.HealthCheckStatusUp {
@@ -575,19 +504,21 @@ func (action *monitorAction) probeReadiness(
        }
 
        if runtimeFailed {
-               integration.Status.Phase = v1.IntegrationPhaseError
+               probeReadinessOk = false
                readyCondition.Reason = v1.IntegrationConditionErrorReason
-               readyCondition.Message = fmt.Sprintf("%d/%d pods are not 
ready", len(unreadyPods), len(unreadyPods)+len(readyPods))
+               readyCondition.Status = corev1.ConditionFalse
+               readyCondition.Message = fmt.Sprintf("%d/%d pods are not 
ready", unreadyPods, unreadyPods+readyPods)
+               integration.Status.SetConditions(readyCondition)
        }
        if !runtimeReady {
-               integration.Status.Phase = v1.IntegrationPhaseError
+               probeReadinessOk = false
                readyCondition.Reason = 
v1.IntegrationConditionRuntimeNotReadyReason
-               readyCondition.Message = fmt.Sprintf("%d/%d pods are not 
ready", len(unreadyPods), len(unreadyPods)+len(readyPods))
+               readyCondition.Status = corev1.ConditionFalse
+               readyCondition.Message = fmt.Sprintf("%d/%d pods are not 
ready", unreadyPods, unreadyPods+readyPods)
+               integration.Status.SetConditions(readyCondition)
        }
 
-       integration.Status.SetConditions(readyCondition)
-
-       return nil
+       return readyPods, probeReadinessOk, nil
 }
 
 func findHighestPriorityReadyKit(kits []v1.IntegrationKit) 
(*v1.IntegrationKit, error) {
diff --git a/pkg/controller/integration/monitor_cronjob.go 
b/pkg/controller/integration/monitor_cronjob.go
index a2f969d34..1620a66c3 100644
--- a/pkg/controller/integration/monitor_cronjob.go
+++ b/pkg/controller/integration/monitor_cronjob.go
@@ -77,7 +77,7 @@ func (c *cronJobController) getPodSpec() corev1.PodSpec {
        return c.obj.Spec.JobTemplate.Spec.Template.Spec
 }
 
-func (c *cronJobController) updateReadyCondition(readyPods []corev1.Pod) bool {
+func (c *cronJobController) updateReadyCondition(readyPods int) bool {
        switch {
        case c.obj.Status.LastScheduleTime == nil:
                c.integration.SetReadyCondition(corev1.ConditionTrue,
diff --git a/pkg/controller/integration/monitor_deployment.go 
b/pkg/controller/integration/monitor_deployment.go
index 55b7797c7..e2f823c16 100644
--- a/pkg/controller/integration/monitor_deployment.go
+++ b/pkg/controller/integration/monitor_deployment.go
@@ -59,14 +59,14 @@ func (c *deploymentController) getPodSpec() corev1.PodSpec {
        return c.obj.Spec.Template.Spec
 }
 
-func (c *deploymentController) updateReadyCondition(readyPods []corev1.Pod) 
bool {
+func (c *deploymentController) updateReadyCondition(readyPods int) bool {
        replicas := int32(1)
        if r := c.integration.Spec.Replicas; r != nil {
                replicas = *r
        }
        // The Deployment status reports updated and ready replicas separately,
        // so that the number of ready replicas also accounts for older 
versions.
-       readyReplicas := int32(len(readyPods))
+       readyReplicas := int32(readyPods)
        switch {
        case readyReplicas >= replicas:
                // The Integration is considered ready when the number of 
replicas
diff --git a/pkg/controller/integration/monitor_knative.go 
b/pkg/controller/integration/monitor_knative.go
index 1d62eef2e..06b7dc82b 100644
--- a/pkg/controller/integration/monitor_knative.go
+++ b/pkg/controller/integration/monitor_knative.go
@@ -51,7 +51,7 @@ func (c *knativeServiceController) getPodSpec() 
corev1.PodSpec {
        return c.obj.Spec.Template.Spec.PodSpec
 }
 
-func (c *knativeServiceController) updateReadyCondition(readyPods 
[]corev1.Pod) bool {
+func (c *knativeServiceController) updateReadyCondition(readyPods int) bool {
        ready := kubernetes.GetKnativeServiceCondition(*c.obj, 
servingv1.ServiceConditionReady)
        if ready.IsTrue() {
                c.integration.SetReadyCondition(corev1.ConditionTrue,

Reply via email to