This is an automated email from the ASF dual-hosted git repository.
ricardozanini pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-tools.git
The following commit(s) were added to refs/heads/main by this push:
new 913d221a5a2 [sonataflow-operator] ]incubator-kie-tools-3471: Implement
nativelly PDB support for SonataFlow workflows and Data Index service
deployments (#3475)
913d221a5a2 is described below
commit 913d221a5a26cd072521492851b26a544aee978c
Author: Walter Medvedeo <[email protected]>
AuthorDate: Thu Mar 5 17:32:52 2026 +0100
[sonataflow-operator] ]incubator-kie-tools-3471: Implement nativelly PDB
support for SonataFlow workflows and Data Index service deployments (#3475)
---
packages/sonataflow-operator/Makefile | 14 +-
.../api/v1alpha08/podtemplate_types.go | 32 ++-
.../api/v1alpha08/sonataflow_types.go | 5 +
.../api/v1alpha08/zz_generated.deepcopy.go | 36 ++++
packages/sonataflow-operator/config/rbac/role.yaml | 12 ++
.../internal/controller/platform/autoscaling.go | 73 -------
.../internal/controller/platform/k8s.go | 80 ++++++--
.../controller/platform/services/services.go | 22 +++
.../controller/profiles/common/object_creators.go | 21 ++
.../profiles/preview/deployment_handler.go | 9 +
.../profiles/preview/disruption_budget_handler.go | 105 ++++++++++
.../internal/controller/sonataflow_controller.go | 55 +++++-
.../controller/sonataflowplatform_controller.go | 75 +++++---
.../internal/controller/workflowdef/utils.go | 12 ++
packages/sonataflow-operator/operator.yaml | 93 +++++++++
packages/sonataflow-operator/test/e2e/helpers.go | 124 +++++++++++-
.../sonataflow-operator/test/e2e/platform_test.go | 141 ++++++++++++++
.../01-postgres.yaml | 89 +++++++++
.../02-sonataflow_platform.yaml | 66 +++++++
.../kustomization.yaml | 34 ++++
.../01-postgres.yaml | 89 +++++++++
.../02-sonataflow_platform.yaml | 65 +++++++
.../03-data-index-service-hpa.yaml | 41 ++++
.../kustomization.yaml | 35 ++++
.../sonataflow-operator/test/e2e/workflow_test.go | 214 +++++++++++++++++++++
.../utils/kubernetes/autoscaling.go | 126 ++++++++++++
.../utils/kubernetes/deployment.go | 12 ++
.../utils/kubernetes/disruption_budget.go | 69 +++++++
28 files changed, 1628 insertions(+), 121 deletions(-)
diff --git a/packages/sonataflow-operator/Makefile
b/packages/sonataflow-operator/Makefile
index 306887d378d..bf7ac63d3a7 100644
--- a/packages/sonataflow-operator/Makefile
+++ b/packages/sonataflow-operator/Makefile
@@ -423,7 +423,7 @@ generate-all: generate generate-deploy bundle
@$(MAKE) fmt
.PHONY: test-e2e # You will need to have a Minikube/Kind cluster up and
running to run this target, and run container-builder before the test
-label = "flows-ephemeral" # possible values are flows-ephemeral,
flows-persistence, flows-monitoring, flows-hpa, platform, cluster
+label = "flows-ephemeral" # possible values are flows-ephemeral,
flows-persistence, flows-monitoring, flows-hpa, flows-pdb-with-hpa, flows-pdb,
platform, cluster
test-e2e:
ifeq ($(label), cluster)
@echo "🌐 Running e2e tests for cluster..."
@@ -455,8 +455,18 @@ else ifeq ($(label), flows-hpa)
go test ./test/e2e/e2e_suite_test.go ./test/e2e/helpers.go
./test/e2e/workflow_test.go \
-v -ginkgo.v -ginkgo.no-color -ginkgo.github-output
-ginkgo.label-filter=$(label) \
-ginkgo.junit-report=./e2e-test-report-workflow_test.xml -timeout 60m
KUSTOMIZE=$(KUSTOMIZE);
+else ifeq ($(label), flows-pdb)
+ @echo "🔁 Running e2e tests for flows-pdb..."
+ go test ./test/e2e/e2e_suite_test.go ./test/e2e/helpers.go
./test/e2e/workflow_test.go \
+ -v -ginkgo.v -ginkgo.no-color -ginkgo.github-output
-ginkgo.label-filter=$(label) \
+ -ginkgo.junit-report=./e2e-test-report-workflow_test.xml -timeout 60m
KUSTOMIZE=$(KUSTOMIZE);
+else ifeq ($(label), flows-pdb-with-hpa)
+ @echo "🔁 Running e2e tests for flows-pdb-with-hpa..."
+ go test ./test/e2e/e2e_suite_test.go ./test/e2e/helpers.go
./test/e2e/workflow_test.go \
+ -v -ginkgo.v -ginkgo.no-color -ginkgo.github-output
-ginkgo.label-filter=$(label) \
+ -ginkgo.junit-report=./e2e-test-report-workflow_test.xml -timeout 60m
KUSTOMIZE=$(KUSTOMIZE);
else
- @echo "❌ Invalid label. Please use one of: cluster, platform,
flows-ephemeral, flows-persistence, flows-monitoring, flows-hpa"
+ @echo "❌ Invalid label. Please use one of: cluster, platform,
flows-ephemeral, flows-persistence, flows-monitoring, flows-hpa, flows-pdb,
flows-pdb-with-hpa"
endif
.PHONY: full-test-e2e
diff --git a/packages/sonataflow-operator/api/v1alpha08/podtemplate_types.go
b/packages/sonataflow-operator/api/v1alpha08/podtemplate_types.go
index 99e7f9335e3..574dbedbc2f 100644
--- a/packages/sonataflow-operator/api/v1alpha08/podtemplate_types.go
+++ b/packages/sonataflow-operator/api/v1alpha08/podtemplate_types.go
@@ -17,7 +17,10 @@
package v1alpha08
-import corev1 "k8s.io/api/core/v1"
+import (
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/util/intstr"
+)
// ContainerSpec is the container for the internal deployments based on the
default Kubernetes Container API
type ContainerSpec struct {
@@ -532,9 +535,27 @@ func (f *PodSpec) ToPodSpec() corev1.PodSpec {
}
}
-// PodTemplateSpec describes the desired custom Kubernetes PodTemplate
definition for the deployed flow or service.
+// PodDisruptionBudgetSpec describes the Kubernetes pod disruption budget
configuration for the SonataFlow and supporting
+// services pods.
+type PodDisruptionBudgetSpec struct {
+ // An eviction is allowed if at least "minAvailable" pods selected by
+ // "selector" will still be available after the eviction, i.e. even in
the
+ // absence of the evicted pod. So for example you can prevent all
voluntary
+ // evictions by specifying "100%". This is a mutually exclusive setting
with "maxUnavailable".
+ // +optional
+ MinAvailable *intstr.IntOrString `json:"minAvailable,omitempty"
protobuf:"bytes,1,opt,name=minAvailable"`
+
+ // An eviction is allowed if at most "maxUnavailable" pods selected by
+ // "selector" are unavailable after the eviction, i.e. even in absence
of
+ // the evicted pod. For example, one can prevent all voluntary evictions
+ // by specifying 0. This is a mutually exclusive setting with
"minAvailable".
+ // +optional
+ MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"
protobuf:"bytes,3,opt,name=maxUnavailable"`
+}
+
+// PodTemplateSpec describes the desired custom Kubernetes PodTemplate
definition for a service.
//
-// The ContainerSpec describes the container where the actual flow or service
is running. It will override any default definitions.
+// The ContainerSpec describes the container where the service is running. It
will override any default definitions.
// For example, to override the image one can use
`.spec.podTemplate.container.image = my/image:tag`.
type PodTemplateSpec struct {
// Container is the Kubernetes container where the application should
run.
@@ -545,4 +566,9 @@ type PodTemplateSpec struct {
PodSpec `json:",inline"`
// +optional
Replicas *int32 `json:"replicas,omitempty"`
+ // Defines the Kubernetes PodDisruptionBudgetSpec for this service.
When configured, the SonataFlowPlatform controller
+ // will automatically create a PodDisruptionBudget based on this
specification that targets the service Deployment.
+ // Currently only apply for the Data Index.
+ // +optional
+ PodDisruptionBudget *PodDisruptionBudgetSpec
`json:"podDisruptionBudget,omitempty"`
}
diff --git a/packages/sonataflow-operator/api/v1alpha08/sonataflow_types.go
b/packages/sonataflow-operator/api/v1alpha08/sonataflow_types.go
index eb204d1553d..ca959549d29 100644
--- a/packages/sonataflow-operator/api/v1alpha08/sonataflow_types.go
+++ b/packages/sonataflow-operator/api/v1alpha08/sonataflow_types.go
@@ -56,6 +56,11 @@ type FlowPodTemplateSpec struct {
// Defines the kind of deployment model for this pod spec. In dev
profile, only "kubernetes" is valid.
// +optional
DeploymentModel DeploymentModel `json:"deploymentModel,omitempty"`
+ // Defines the Kubernetes PodDisruptionBudgetSpec for this workflow.
When configured, the SonataFlow controller will
+ // automatically create a PodDisruptionBudget based on this
specification that targets the workflow Deployment.
+ // Ignored in "knative" deployment model, and dev profile workflows.
+ // +optional
+ PodDisruptionBudget *PodDisruptionBudgetSpec
`json:"podDisruptionBudget,omitempty"`
}
// Flow describes the contents of the Workflow definition following the CNCF
Serverless Workflow Specification.
diff --git
a/packages/sonataflow-operator/api/v1alpha08/zz_generated.deepcopy.go
b/packages/sonataflow-operator/api/v1alpha08/zz_generated.deepcopy.go
index 22faee96338..bf8c458e524 100644
--- a/packages/sonataflow-operator/api/v1alpha08/zz_generated.deepcopy.go
+++ b/packages/sonataflow-operator/api/v1alpha08/zz_generated.deepcopy.go
@@ -28,6 +28,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/intstr"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
)
@@ -354,6 +355,11 @@ func (in *FlowPodTemplateSpec) DeepCopyInto(out
*FlowPodTemplateSpec) {
*out = new(int32)
**out = **in
}
+ if in.PodDisruptionBudget != nil {
+ in, out := &in.PodDisruptionBudget, &out.PodDisruptionBudget
+ *out = new(PodDisruptionBudgetSpec)
+ (*in).DeepCopyInto(*out)
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new FlowPodTemplateSpec.
@@ -549,6 +555,31 @@ func (in *PlatformServicesStatus) DeepCopy()
*PlatformServicesStatus {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
+func (in *PodDisruptionBudgetSpec) DeepCopyInto(out *PodDisruptionBudgetSpec) {
+ *out = *in
+ if in.MinAvailable != nil {
+ in, out := &in.MinAvailable, &out.MinAvailable
+ *out = new(intstr.IntOrString)
+ **out = **in
+ }
+ if in.MaxUnavailable != nil {
+ in, out := &in.MaxUnavailable, &out.MaxUnavailable
+ *out = new(intstr.IntOrString)
+ **out = **in
+ }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new PodDisruptionBudgetSpec.
+func (in *PodDisruptionBudgetSpec) DeepCopy() *PodDisruptionBudgetSpec {
+ if in == nil {
+ return nil
+ }
+ out := new(PodDisruptionBudgetSpec)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
func (in *PodSpec) DeepCopyInto(out *PodSpec) {
*out = *in
@@ -722,6 +753,11 @@ func (in *PodTemplateSpec) DeepCopyInto(out
*PodTemplateSpec) {
*out = new(int32)
**out = **in
}
+ if in.PodDisruptionBudget != nil {
+ in, out := &in.PodDisruptionBudget, &out.PodDisruptionBudget
+ *out = new(PodDisruptionBudgetSpec)
+ (*in).DeepCopyInto(*out)
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new PodTemplateSpec.
diff --git a/packages/sonataflow-operator/config/rbac/role.yaml
b/packages/sonataflow-operator/config/rbac/role.yaml
index f5bc2d75e7b..845fe910a07 100644
--- a/packages/sonataflow-operator/config/rbac/role.yaml
+++ b/packages/sonataflow-operator/config/rbac/role.yaml
@@ -52,6 +52,18 @@ rules:
- list
- update
- watch
+ - apiGroups:
+ - policy
+ resources:
+ - poddisruptionbudgets
+ verbs:
+ - create
+ - delete
+ - get
+ - list
+ - patch
+ - update
+ - watch
- apiGroups:
- serving.knative.dev
resources:
diff --git
a/packages/sonataflow-operator/internal/controller/platform/autoscaling.go
b/packages/sonataflow-operator/internal/controller/platform/autoscaling.go
deleted file mode 100644
index 2a4dad03109..00000000000
--- a/packages/sonataflow-operator/internal/controller/platform/autoscaling.go
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package platform
-
-import (
- "context"
-
- v1 "k8s.io/api/core/v1"
- "k8s.io/klog/v2"
-
- autoscalingv2 "k8s.io/api/autoscaling/v2"
- "sigs.k8s.io/controller-runtime/pkg/client"
-
- "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log"
-)
-
-// findHPAForDeployment returns the HorizontalPodAutoscaler targeting a
deployment in a given namespace, or nil if it
-// doesn't exist.
-// Note: By k8s definition, the HorizontalPodAutoscaler must belong to the
same namespace as the managed deployment.
-func findHPAForDeployment(ctx context.Context, c client.Client, namespace
string, name string) (*autoscalingv2.HorizontalPodAutoscaler, error) {
- klog.V(log.D).Infof("Querying HorizontalPodAutoscalers in namespace:
%s", namespace)
- var hpaList autoscalingv2.HorizontalPodAutoscalerList
- if err := c.List(ctx, &hpaList, client.InNamespace(namespace)); err !=
nil {
- return nil, err
- }
- klog.V(log.D).Infof("Total number of returned HorizontalPodAutoscalers
is: %d", len(hpaList.Items))
- for _, hpa := range hpaList.Items {
- ref := hpa.Spec.ScaleTargetRef
- klog.V(log.D).Infof("Checking if HorizontalPodAutoscaler name:
%s, ref.Kind: %s, ref.Name: %s, ref.APIVersion: %s, targets deployment: %s.",
hpa.Name, ref.Kind, ref.Name, ref.APIVersion, name)
- if ref.Kind == "Deployment" && ref.Name == name &&
ref.APIVersion == "apps/v1" {
- klog.V(log.D).Infof("HorizontalPodAutoscaler name: %s
targets deployment: %s.", hpa.Name, name)
- return &hpa, nil
- }
- }
- klog.V(log.D).Infof("No HorizontalPodAutoscaler targets deployment: %s
in namespace: %s.", name, namespace)
- return nil, nil
-}
-
-// hpaIsActive returns true if the HorizontalPodAutoscaler is active.
-func hpaIsActive(hpa *autoscalingv2.HorizontalPodAutoscaler) bool {
- klog.V(log.D).Infof("Checking if HorizontalPodAutoscaler is Active.")
- for _, cond := range hpa.Status.Conditions {
- klog.V(log.D).Infof("Checking Status condition type: %s, %s.",
cond.Type, cond.Status)
- if cond.Type == autoscalingv2.ScalingActive {
- return cond.Status == v1.ConditionTrue
- }
- }
- return false
-}
-
-// hpaIsWorking returns true if the HorizontalPodAutoscaler has started to
take care of the scaling for the
-// corresponding target ref. At this point, our controllers must transfer the
control to the HorizontalPodAutoscaler
-// and let it manage the replicas.
-func hpaIsWorking(hpa *autoscalingv2.HorizontalPodAutoscaler) bool {
- return hpaIsActive(hpa) || hpa.Status.DesiredReplicas > 0
-}
diff --git a/packages/sonataflow-operator/internal/controller/platform/k8s.go
b/packages/sonataflow-operator/internal/controller/platform/k8s.go
index 71a65e08146..753416d9553 100644
--- a/packages/sonataflow-operator/internal/controller/platform/k8s.go
+++ b/packages/sonataflow-operator/internal/controller/platform/k8s.go
@@ -24,6 +24,7 @@ import (
"fmt"
v2 "k8s.io/api/autoscaling/v2"
+ policyv1 "k8s.io/api/policy/v1"
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/version"
@@ -103,19 +104,25 @@ func (action *serviceAction) Handle(ctx context.Context,
platform *operatorapi.S
}
func createOrUpdateServiceComponents(ctx context.Context, client
client.Client, platform *operatorapi.SonataFlowPlatform, psh
services.PlatformServiceHandler) (*corev1.Event, error) {
- if err := createOrUpdateConfigMap(ctx, client, platform, psh); err !=
nil {
+ var deployment *appsv1.Deployment
+ var hpa *v2.HorizontalPodAutoscaler
+ var err error
+ if err = createOrUpdateConfigMap(ctx, client, platform, psh); err !=
nil {
return nil, err
}
- if err := createOrUpdateDeployment(ctx, client, platform, psh); err !=
nil {
+ if deployment, hpa, err = createOrUpdateDeployment(ctx, client,
platform, psh); err != nil {
return nil, err
}
- if err := createOrUpdateService(ctx, client, platform, psh); err != nil
{
+ if err = createOrUpdatePDB(ctx, client, platform, psh, deployment,
hpa); err != nil {
+ return nil, err
+ }
+ if err = createOrUpdateService(ctx, client, platform, psh); err != nil {
return nil, err
}
return createOrUpdateKnativeResources(ctx, client, platform, psh)
}
-func createOrUpdateDeployment(ctx context.Context, client client.Client,
platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler)
error {
+func createOrUpdateDeployment(ctx context.Context, client client.Client,
platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler)
(*appsv1.Deployment, *v2.HorizontalPodAutoscaler, error) {
readyProbe := &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
@@ -157,7 +164,7 @@ func createOrUpdateDeployment(ctx context.Context, client
client.Client, platfor
serviceContainer = psh.ConfigurePersistence(serviceContainer)
serviceContainer, err := psh.MergeContainerSpec(serviceContainer)
if err != nil {
- return err
+ return nil, nil, err
}
// immutable
@@ -165,15 +172,15 @@ func createOrUpdateDeployment(ctx context.Context, client
client.Client, platfor
var hpa *v2.HorizontalPodAutoscaler = nil
if psh.AcceptsHPA() {
- hpa, err = findHPAForDeployment(ctx, utils.GetClient(),
platform.Namespace, psh.GetServiceName())
+ hpa, err = kubeutil.FindHPAForDeployment(ctx,
utils.GetClient(), platform.Namespace, psh.GetServiceName())
if err != nil {
- return fmt.Errorf("failed to find a potential
HorizontalPodAutoscaler for deployment %s/%s: %v", platform.Namespace,
psh.GetServiceName(), err)
+ return nil, nil, fmt.Errorf("failed to find a potential
HorizontalPodAutoscaler for deployment %s/%s: %v", platform.Namespace,
psh.GetServiceName(), err)
}
klog.V(log.D).Infof("HorizontalPodAutoscaler exists for
deployment %s/%s: %t.", platform.Namespace, psh.GetServiceName(), hpa != nil)
}
kSinkInjected, err := psh.CheckKSinkInjected()
if err != nil {
- return nil
+ return nil, nil, err
}
lbl, selectorLbl := getLabels(platform, psh)
serviceDeploymentSpec := appsv1.DeploymentSpec{
@@ -204,7 +211,7 @@ func createOrUpdateDeployment(ctx context.Context, client
client.Client, platfor
serviceDeploymentSpec.Template.Spec, err =
psh.MergePodSpec(serviceDeploymentSpec.Template.Spec)
if err != nil {
- return err
+ return nil, nil, err
}
kubeutil.AddOrReplaceContainer(serviceContainer.Name,
*serviceContainer, &serviceDeploymentSpec.Template.Spec)
@@ -215,7 +222,7 @@ func createOrUpdateDeployment(ctx context.Context, client
client.Client, platfor
Labels: lbl,
}}
if err := controllerutil.SetControllerReference(platform,
serviceDeployment, client.Scheme()); err != nil {
- return err
+ return nil, nil, err
}
// Create or Update the deployment
@@ -224,7 +231,7 @@ func createOrUpdateDeployment(ctx context.Context, client
client.Client, platfor
err := mergo.Merge(&(serviceDeployment.Spec),
serviceDeploymentSpec, mergo.WithOverride)
// mergo.Merge algorithm is not setting the
serviceDeployment.Spec.Replicas when the
// *serviceDeploymentSpec.Replicas is 0. Making impossible to
scale to zero. Ensure the value.
- if hpa == nil || !hpaIsWorking(hpa) || psh.GetReplicaCount() ==
0 {
+ if hpa == nil || !kubeutil.HPAIsWorking(hpa) ||
psh.GetReplicaCount() == 0 {
// Only when no HorizontalPodAutoscaler was created for
current deployment, we should manage the replicas.
// Or, when the existing one did not wake up from a
previous inactive period due to a replicas set to 0.
// In this last case, we should still let the
controller the chance to set the replicas to wake up the
HorizontalPodAutoscaler.
@@ -240,10 +247,59 @@ func createOrUpdateDeployment(ctx context.Context, client
client.Client, platfor
}
return nil
}); err != nil {
- return err
+ return nil, nil, err
} else {
klog.V(log.I).InfoS("Deployment successfully reconciled",
"operation", op)
}
+ return serviceDeployment, hpa, nil
+}
+
+func createOrUpdatePDB(ctx context.Context, c client.Client, platform
*operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler,
deployment *appsv1.Deployment, hpa *v2.HorizontalPodAutoscaler) error {
+ if psh.AcceptsPDB() {
+ createOrUpdate := false
+ pdbSpec := psh.GetPDBSpec()
+ if !kubeutil.IsEmptyPodDisruptionBudgetSpec(pdbSpec) {
+ if hpa != nil {
+ // The HPA determines the replicas. Be sure
that the service can't be later downscaled to a number of replicas that blocks
a drain.
+ // And also, that the user didn't voluntary
scaled the service to 0.
+ createOrUpdate =
kubeutil.HPAMinReplicasIsGreaterThan(hpa, int32(1)) &&
!kubeutil.DeploymentIsScaledToZero(deployment)
+ } else {
+ // The just reconciled deployment replicas were
already configured properly, we can rely on this number.
+ // Be sure that the number of replicas don't
block a drain.
+ createOrUpdate =
kubeutil.DeploymentReplicasIsGreaterThan(deployment, int32(1))
+ }
+ }
+
+ if createOrUpdate {
+ lbl, selectorLbl := getLabels(platform, psh)
+ podDisruptionBudget := &policyv1.PodDisruptionBudget{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: psh.GetServiceName(),
+ Namespace: platform.Namespace,
+ Labels: lbl,
+ },
+ }
+ if err :=
controllerutil.SetControllerReference(platform, podDisruptionBudget,
c.Scheme()); err != nil {
+ return err
+ }
+ if op, err := controllerutil.CreateOrUpdate(ctx, c,
podDisruptionBudget, func() error {
+
kubeutil.ApplyPodDisruptionBudgetSpec(podDisruptionBudget, pdbSpec)
+ podDisruptionBudget.Spec.Selector =
&metav1.LabelSelector{
+ MatchLabels: selectorLbl,
+ }
+ return nil
+ }); err != nil {
+ return err
+ } else {
+ klog.V(log.I).Infof("PodDisruptionBudget %s/%s
successfully reconciled, op: %s.", podDisruptionBudget.Namespace,
podDisruptionBudget.Name, op)
+ }
+ } else {
+ // Delete a potentially existing PDB.
+ if err := kubeutil.SafeDeletePodDisruptionBudget(ctx,
c, platform.Namespace, psh.GetServiceName()); err != nil {
+ return err
+ }
+ }
+ }
return nil
}
diff --git
a/packages/sonataflow-operator/internal/controller/platform/services/services.go
b/packages/sonataflow-operator/internal/controller/platform/services/services.go
index f596821f335..e0732bdc3dd 100644
---
a/packages/sonataflow-operator/internal/controller/platform/services/services.go
+++
b/packages/sonataflow-operator/internal/controller/platform/services/services.go
@@ -125,6 +125,12 @@ type PlatformServiceHandler interface {
// AcceptsHPA returns true if the service accepts an external HPA
configuration.
AcceptsHPA() bool
+
+ // AcceptsPDB returns true if the service accepts the operator managed
PDB generation.
+ AcceptsPDB() bool
+
+ // GetPDBSpec returns the configured PodDisruptionBudgetSpec for the
given service.
+ GetPDBSpec() *operatorapi.PodDisruptionBudgetSpec
}
type DataIndexHandler struct {
@@ -135,6 +141,14 @@ func (d *DataIndexHandler) AcceptsHPA() bool {
return true
}
+func (d *DataIndexHandler) AcceptsPDB() bool {
+ return true
+}
+
+func (d *DataIndexHandler) GetPDBSpec() *operatorapi.PodDisruptionBudgetSpec {
+ return
d.platform.Spec.Services.DataIndex.ServiceSpec.PodTemplate.PodDisruptionBudget
+}
+
// GetDBMigrationStrategy returns DB migration approach
func (d *DataIndexHandler) GetDBMigrationStrategy()
operatorapi.DBMigrationStrategyType {
return
GetDBMigrationStrategy(d.platform.Spec.Services.DataIndex.Persistence)
@@ -354,6 +368,14 @@ func (d *JobServiceHandler) AcceptsHPA() bool {
return false
}
+func (d *JobServiceHandler) AcceptsPDB() bool {
+ return false
+}
+
+func (d *JobServiceHandler) GetPDBSpec() *operatorapi.PodDisruptionBudgetSpec {
+ return nil
+}
+
// GetDBMigrationStrategy returns db migration approach otherwise
func (j *JobServiceHandler) GetDBMigrationStrategy()
operatorapi.DBMigrationStrategyType {
return
GetDBMigrationStrategy(j.platform.Spec.Services.JobService.Persistence)
diff --git
a/packages/sonataflow-operator/internal/controller/profiles/common/object_creators.go
b/packages/sonataflow-operator/internal/controller/profiles/common/object_creators.go
index 2a88206ac31..c082b7960bf 100644
---
a/packages/sonataflow-operator/internal/controller/profiles/common/object_creators.go
+++
b/packages/sonataflow-operator/internal/controller/profiles/common/object_creators.go
@@ -24,6 +24,8 @@ import (
"fmt"
"strings"
+ policyv1 "k8s.io/api/policy/v1"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles"
servingv1 "knative.dev/serving/pkg/apis/serving/v1"
@@ -465,3 +467,22 @@ func ServiceMonitorCreator(workflow
*operatorapi.SonataFlow) (client.Object, err
}
return serviceMonitor, nil
}
+
+// PodDisruptionBudgetCreator creates a PodDisruptionBudget for workflow.
+func PodDisruptionBudgetCreator(workflow *operatorapi.SonataFlow)
(client.Object, error) {
+ lbl := workflowproj.GetMergedLabels(workflow)
+ podDisruptionBudget := &policyv1.PodDisruptionBudget{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: workflow.Name,
+ Namespace: workflow.Namespace,
+ Labels: lbl,
+ },
+ Spec: policyv1.PodDisruptionBudgetSpec{
+ Selector: &metav1.LabelSelector{
+ MatchLabels:
workflowproj.GetSelectorLabels(workflow),
+ },
+ },
+ }
+ kubeutil.ApplyPodDisruptionBudgetSpec(podDisruptionBudget,
workflow.Spec.PodTemplate.PodDisruptionBudget)
+ return podDisruptionBudget, nil
+}
diff --git
a/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler.go
b/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler.go
index 80031116d77..865242c7c6c 100644
---
a/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler.go
+++
b/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler.go
@@ -149,6 +149,15 @@ func (d *DeploymentReconciler) ensureObjects(ctx
context.Context, workflow *oper
}
objs := []client.Object{deployment, managedPropsCM, service}
+
+ podDisruptionBudget, err :=
NewPodDisruptionBudgetHandler(d.StateSupport).Ensure(ctx, workflow)
+ if err != nil {
+ return reconcile.Result{}, nil, err
+ }
+ if podDisruptionBudget != nil {
+ objs = append(objs, podDisruptionBudget)
+ }
+
eventingObjs, err := common.NewKnativeEventingHandler(d.StateSupport,
pl).Ensure(ctx, workflow)
if err != nil {
return reconcile.Result{}, nil, err
diff --git
a/packages/sonataflow-operator/internal/controller/profiles/preview/disruption_budget_handler.go
b/packages/sonataflow-operator/internal/controller/profiles/preview/disruption_budget_handler.go
new file mode 100644
index 00000000000..dac01581ba9
--- /dev/null
+++
b/packages/sonataflow-operator/internal/controller/profiles/preview/disruption_budget_handler.go
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package preview
+
+import (
+ "context"
+ "fmt"
+
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/workflowproj"
+
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/workflowdef"
+
+ "k8s.io/klog/v2"
+
+ "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log"
+
+ policyv1 "k8s.io/api/policy/v1"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
+
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/utils/kubernetes"
+
+ operatorapi
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common"
+)
+
+type podDisruptionBudgetHandler struct {
+ stateSupport *common.StateSupport
+ podDisruptionBudget common.ObjectEnsurer
+}
+
+type PodDisruptionBudgetHandler interface {
+ Ensure(ctx context.Context, workflow *operatorapi.SonataFlow)
(client.Object, error)
+}
+
+func NewPodDisruptionBudgetHandler(support *common.StateSupport)
PodDisruptionBudgetHandler {
+ return podDisruptionBudgetHandler{
+ stateSupport: support,
+ podDisruptionBudget: common.NewObjectEnsurer(support.C,
common.PodDisruptionBudgetCreator),
+ }
+}
+
+func (h podDisruptionBudgetHandler) Ensure(ctx context.Context, workflow
*operatorapi.SonataFlow) (client.Object, error) {
+ if workflow.Spec.PodTemplate.DeploymentModel ==
operatorapi.KnativeDeploymentModel {
+ return nil, nil
+ }
+
+ createOrUpdate := false
+ if
!kubernetes.IsEmptyPodDisruptionBudgetSpec(workflow.Spec.PodTemplate.PodDisruptionBudget)
{
+ klog.V(log.D).Infof("Finding HPA for workflow: %s/%s",
workflow.Namespace, workflow.Name)
+ hpa, err := kubernetes.FindHPAForWorkflow(ctx,
h.stateSupport.C, workflow.Namespace, workflow.Name)
+ if err != nil {
+ return nil, fmt.Errorf("failed to find a potential
HorizontalPodAutoscaler for workflow: %s/%s: %v", workflow.Namespace,
workflow.Name, err)
+ }
+ if hpa != nil {
+ klog.V(log.D).Infof("HPA %s/%s was found for workflow
%s/%s", hpa.Namespace, hpa.Name, workflow.Namespace, workflow.Name)
+ // The HPA determines the replicas. Be sure that the
workflow can't be later downscaled to a number of replicas that blocks a drain.
+ // And, also that the user didn't voluntary scaled the
workflow to 0.
+ createOrUpdate =
kubernetes.HPAMinReplicasIsGreaterThan(hpa, int32(1)) &&
!workflowdef.IsScaledToZero(workflow)
+ } else {
+ // The replicas are determined from the workflow spec.
Be sure that the number of replicas don't block a drain.
+ createOrUpdate =
workflowdef.ReplicasIsGreaterThan(workflow, int32(1))
+ }
+ }
+
+ if createOrUpdate {
+ pdb, _, err := h.podDisruptionBudget.Ensure(ctx, workflow,
func(object client.Object) controllerutil.MutateFn {
+ return func() error {
+ targetPdb :=
object.(*policyv1.PodDisruptionBudget)
+ targetPdb.Spec.Selector = &metav1.LabelSelector{
+ MatchLabels:
workflowproj.GetSelectorLabels(workflow),
+ }
+
kubernetes.ApplyPodDisruptionBudgetSpec(targetPdb,
workflow.Spec.PodTemplate.PodDisruptionBudget)
+ return nil
+ }
+ })
+ if err != nil {
+ return nil, fmt.Errorf("failed to create or update
PodDiscruptionBudget for workflow's deployment %s/%s: %v", workflow.Namespace,
workflow.Name, err)
+ }
+ return pdb, nil
+ } else {
+ // Remove a potential previously created PDB if any.
+ if err := kubernetes.SafeDeletePodDisruptionBudget(ctx,
h.stateSupport.C, workflow.Namespace, workflow.Name); err != nil {
+ return nil, err
+ }
+ return nil, nil
+ }
+}
diff --git
a/packages/sonataflow-operator/internal/controller/sonataflow_controller.go
b/packages/sonataflow-operator/internal/controller/sonataflow_controller.go
index 65a3d184aa4..800b8c8191a 100644
--- a/packages/sonataflow-operator/internal/controller/sonataflow_controller.go
+++ b/packages/sonataflow-operator/internal/controller/sonataflow_controller.go
@@ -24,6 +24,14 @@ import (
"fmt"
"time"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/utils/kubernetes"
+
+ autoscalingv2 "k8s.io/api/autoscaling/v2"
+ pkgbuilder "sigs.k8s.io/controller-runtime/pkg/builder"
+ "sigs.k8s.io/controller-runtime/pkg/predicate"
+
+ v1 "k8s.io/api/policy/v1"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles"
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/manager"
@@ -72,6 +80,8 @@ import (
operatorapi
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08"
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform"
+
+ ctrlevent "sigs.k8s.io/controller-runtime/pkg/event"
)
// SonataFlowReconciler reconciles a SonataFlow object
@@ -88,6 +98,7 @@ type SonataFlowReconciler struct {
//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflows/finalizers,verbs=update
//+kubebuilder:rbac:groups="monitoring.coreos.com",resources=servicemonitors,verbs=get;list;watch;create;update;delete
//+kubebuilder:rbac:groups="serving.knative.dev",resources=revisions,verbs=list;watch;delete
+//+kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=get;list;watch;create;update;patch;delete
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
@@ -374,6 +385,7 @@ func (r *SonataFlowReconciler) SetupWithManager(mgr
ctrl.Manager) error {
builder := ctrl.NewControllerManagedBy(mgr).
For(&operatorapi.SonataFlow{}).
Owns(&appsv1.Deployment{}).
+ Owns(&v1.PodDisruptionBudget{}).
Owns(&corev1.Service{}).
Owns(&corev1.ConfigMap{}).
Owns(&operatorapi.SonataFlowBuild{}).
@@ -392,7 +404,8 @@ func (r *SonataFlowReconciler) SetupWithManager(mgr
ctrl.Manager) error {
return []reconcile.Request{}
}
return buildEnqueueRequestsFromMapFunc(mgr.GetClient(),
build)
- }))
+ })).
+ Watches(&autoscalingv2.HorizontalPodAutoscaler{},
handler.EnqueueRequestsFromMapFunc(r.mapHPAToSonataFlowRequests),
pkgbuilder.WithPredicates(hpaToSonataFlowPredicate()))
knativeAvail, err := knative.GetKnativeAvailability(mgr.GetConfig())
if err != nil {
@@ -416,3 +429,43 @@ func (r *SonataFlowReconciler) SetupWithManager(mgr
ctrl.Manager) error {
return builder.Complete(r)
}
+
+// hpaToSonataFlowPredicate filters the HorizontalPodAutoscaler events that
might require attention by the SonataFlow
+// controller, i.e., those HorizontalPodAutoscalers that points to a
SonataFlow and has relevant changes rather than
+// status updates. Note that changes in such HorizontalPodAutoscaler might for
example have impact on the potentially
+// generated PodDisruptionBudget.
+func hpaToSonataFlowPredicate() predicate.Funcs {
+ return predicate.Funcs{
+ CreateFunc: func(e ctrlevent.CreateEvent) bool {
+ return
kubernetes.IsHPAndTargetsASonataFlowAsBool(e.Object)
+ },
+ UpdateFunc: func(e ctrlevent.UpdateEvent) bool {
+ oldHpa, oldHpaOk :=
kubernetes.IsHPAndTargetsASonataFlow(e.ObjectOld)
+ newHpa, newHpaOK :=
kubernetes.IsHPAndTargetsASonataFlow(e.ObjectNew)
+ if oldHpaOk || newHpaOK {
+ return !kubernetes.HPAEqualsBySpec(oldHpa,
newHpa)
+ }
+ return false
+ },
+ DeleteFunc: func(e ctrlevent.DeleteEvent) bool {
+ return
kubernetes.IsHPAndTargetsASonataFlowAsBool(e.Object)
+ },
+ GenericFunc: func(e ctrlevent.GenericEvent) bool {
+ return
kubernetes.IsHPAndTargetsASonataFlowAsBool(e.Object)
+ },
+ }
+}
+
+// mapHPAToSonataFlowRequests given a HorizontalPodAutoscaler that targets a
SonataFlow, returns the recon request to
+// that workflow.
+func (r *SonataFlowReconciler) mapHPAToSonataFlowRequests(ctx context.Context,
object client.Object) []reconcile.Request {
+ hpa := object.(*autoscalingv2.HorizontalPodAutoscaler)
+ return []reconcile.Request{
+ {
+ NamespacedName: types.NamespacedName{
+ Namespace: hpa.Namespace,
+ Name: hpa.Spec.ScaleTargetRef.Name,
+ },
+ },
+ }
+}
diff --git
a/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller.go
b/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller.go
index ecddba47527..e554c7cbde2 100644
---
a/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller.go
+++
b/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller.go
@@ -22,6 +22,7 @@ package controller
import (
"context"
"fmt"
+ "strings"
"time"
autoscalingv2 "k8s.io/api/autoscaling/v2"
@@ -59,6 +60,8 @@ import (
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform/services"
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/constants"
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log"
+
+ kubeutil
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/utils/kubernetes"
)
// SonataFlowPlatformReconciler reconciles a SonataFlowPlatform object
@@ -78,6 +81,7 @@ type SonataFlowPlatformReconciler struct {
//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowplatforms/finalizers,verbs=update
//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=get;list;watch
+//+kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=get;list;watch;create;update;patch;delete
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
@@ -277,21 +281,6 @@ func (r *SonataFlowPlatformReconciler)
updateIfActiveClusterPlatformExists(ctx c
// SetupWithManager sets up the controller with the Manager.
func (r *SonataFlowPlatformReconciler) SetupWithManager(mgr ctrlrun.Manager)
error {
- pred := predicate.Funcs{
- CreateFunc: func(e ctrlevent.CreateEvent) bool {
- return false
- },
- UpdateFunc: func(e ctrlevent.UpdateEvent) bool {
- return false
- },
- DeleteFunc: func(e ctrlevent.DeleteEvent) bool {
- return true
- },
- GenericFunc: func(e ctrlevent.GenericEvent) bool {
- return false
- },
- }
-
builder := ctrlrun.NewControllerManagedBy(mgr).
For(&operatorapi.SonataFlowPlatform{}).
Owns(&appsv1.Deployment{}).
@@ -299,7 +288,7 @@ func (r *SonataFlowPlatformReconciler) SetupWithManager(mgr
ctrlrun.Manager) err
Owns(&corev1.ConfigMap{}).
Watches(&operatorapi.SonataFlowPlatform{},
handler.EnqueueRequestsFromMapFunc(r.mapPlatformToPlatformRequests)).
Watches(&operatorapi.SonataFlowClusterPlatform{},
handler.EnqueueRequestsFromMapFunc(r.mapClusterPlatformToPlatformRequests)).
- Watches(&autoscalingv2.HorizontalPodAutoscaler{},
handler.EnqueueRequestsFromMapFunc(r.mapHPAToPlatformRequests),
pkgbuilder.WithPredicates(pred))
+ Watches(&autoscalingv2.HorizontalPodAutoscaler{},
handler.EnqueueRequestsFromMapFunc(r.mapHPAToPlatformRequests),
pkgbuilder.WithPredicates(hpaToSonataFlowPlatformServicePredicate()))
knativeAvail, err := knative.GetKnativeAvailability(mgr.GetConfig())
if err != nil {
@@ -369,15 +358,15 @@ func contains(slice []operatorapi.WorkFlowCapability, s
operatorapi.WorkFlowCapa
return false
}
-// mapHPAToPlatformRequests determines if the referred HorizontalPodAutoscaler
targets a DI service in given namespace,
-// if any. Analyses if that namespace has an SFP that manges the DI service.
If that is the case, enqueues a Request to
-// the SFP to give the chance to take an action if needed. We normally want to
do that in situations where the
-// HorizontalPodAutoscaler was removed (DeleteEvent) to let the SFP restore
back the DI replicas. In other situations
-// the given HorizontalPodAutoscaler will take the needed actions if needed.
+// mapHPAToPlatformRequests determines if the referred HorizontalPodAutoscaler
targets a DI service in the given namespace,
+// if any. Analyses if that namespace has an SPF that manges the DI service.
If that is the case, enqueues a Request to
+// the SPF to give the chance to take an action if needed. We normally want to
do that in situations where the
+// HorizontalPodAutoscaler was removed or has changed to let the SFP
controller restore back the DI replicas or manage
+// a potential PodDisruptionBudget configuration accordingly.
func (r *SonataFlowPlatformReconciler) mapHPAToPlatformRequests(ctx
context.Context, object client.Object) []reconcile.Request {
- hpa := object.(*autoscalingv2.HorizontalPodAutoscaler)
- klog.V(log.D).Infof("HorizontalPodAutoscaler %s/%s with
spec.ScaleTargetRef.Kind Kind: %s was deleted.", hpa.Namespace, hpa.Name,
hpa.Spec.ScaleTargetRef.Kind)
- if hpa.Spec.ScaleTargetRef.Kind == "Deployment" {
+ hpa, ok := kubeutil.IsHPAndTargetsADeployment(object)
+ klog.V(log.D).Infof("HorizontalPodAutoscaler %s/%s with
spec.ScaleTargetRef.Kind Kind: %s requires SFP controller attention.",
hpa.Namespace, hpa.Name, hpa.Spec.ScaleTargetRef.Kind)
+ if ok {
sfp, err := platform.GetActivePlatform(ctx, r.Client,
hpa.Namespace, false)
if err != nil {
klog.V(log.D).Infof("failed to get active platform in
namespace: %s, %v", hpa.Namespace, err)
@@ -401,3 +390,41 @@ func (r *SonataFlowPlatformReconciler)
mapHPAToPlatformRequests(ctx context.Cont
}
return nil
}
+
+// hpaToSonataFlowPlatformServicePredicate returns the predicate functions to
filter HorizontalPodAutoscaler events
+// that might require attention from the SFP controller.
+func hpaToSonataFlowPlatformServicePredicate() predicate.Funcs {
+ return predicate.Funcs{
+ CreateFunc: func(e ctrlevent.CreateEvent) bool {
+ return
isHPAndCandidateToTargetADataIndexDeploymentAsBool(e.Object)
+ },
+ UpdateFunc: func(e ctrlevent.UpdateEvent) bool {
+ oldHpa, oldHpaOk :=
isHPAndCandidateToTargetADataIndexDeployment(e.ObjectOld)
+ newHpa, newHpaOK :=
isHPAndCandidateToTargetADataIndexDeployment(e.ObjectNew)
+ if oldHpaOk || newHpaOK {
+ return !kubeutil.HPAEqualsBySpec(oldHpa, newHpa)
+ }
+ return false
+ },
+ DeleteFunc: func(e ctrlevent.DeleteEvent) bool {
+ return
isHPAndCandidateToTargetADataIndexDeploymentAsBool(e.Object)
+ },
+ GenericFunc: func(e ctrlevent.GenericEvent) bool {
+ return
isHPAndCandidateToTargetADataIndexDeploymentAsBool(e.Object)
+ },
+ }
+}
+
+func isHPAndCandidateToTargetADataIndexDeployment(obj client.Object)
(*autoscalingv2.HorizontalPodAutoscaler, bool) {
+ if hpa, ok := kubeutil.IsHPAndTargetsADeployment(obj); ok {
+ if strings.HasSuffix(hpa.Spec.ScaleTargetRef.Name,
constants.DataIndexServiceName) {
+ return hpa, true
+ }
+ }
+ return nil, false
+}
+
+func isHPAndCandidateToTargetADataIndexDeploymentAsBool(obj client.Object)
bool {
+ _, ok := isHPAndCandidateToTargetADataIndexDeployment(obj)
+ return ok
+}
diff --git
a/packages/sonataflow-operator/internal/controller/workflowdef/utils.go
b/packages/sonataflow-operator/internal/controller/workflowdef/utils.go
index 17a0eb06bc7..d4b97b5bd07 100644
--- a/packages/sonataflow-operator/internal/controller/workflowdef/utils.go
+++ b/packages/sonataflow-operator/internal/controller/workflowdef/utils.go
@@ -127,3 +127,15 @@ func GenerateScaleSelector(workflow
*operatorapi.SonataFlow) string {
labelSet := labels.Set(workflowproj.GetSelectorLabels(workflow))
return labelSet.AsSelector().String()
}
+
+// IsScaledToZero returns true if the workflow has been explicitly scaled to
zero, i.e., by setting
+// the spec.podTemplate.replicas == 0. False in any other case, including when
spec.podTemplate.replicas == nil.
+func IsScaledToZero(workflow *operatorapi.SonataFlow) bool {
+ return workflow.Spec.PodTemplate.Replicas != nil &&
*workflow.Spec.PodTemplate.Replicas == int32(0)
+}
+
+// ReplicasIsGreaterThan returns true if the workflow configured
Spec.PodTemplate.Replicas is != nil, and greater
+// than the given value. False in any other case.
+func ReplicasIsGreaterThan(workflow *operatorapi.SonataFlow, value int32) bool
{
+ return workflow.Spec.PodTemplate.Replicas != nil &&
*workflow.Spec.PodTemplate.Replicas > value
+}
diff --git a/packages/sonataflow-operator/operator.yaml
b/packages/sonataflow-operator/operator.yaml
index eee14236f37..86f720fea0b 100644
--- a/packages/sonataflow-operator/operator.yaml
+++ b/packages/sonataflow-operator/operator.yaml
@@ -6764,6 +6764,33 @@ spec:
defined in the corresponding RuntimeClass,
otherwise it will remain unset and treated as zero.
More info:
https://git.k8s.io/enhancements/keps/sig-node/688-pod-overhead/README.md
type: object
+ podDisruptionBudget:
+ description: |-
+ Defines the Kubernetes PodDisruptionBudgetSpec
for this service. When configured, the SonataFlowPlatform controller
+ will automatically create a
PodDisruptionBudget based on this specification that targets the service
Deployment.
+ Currently only apply for the Data Index.
+ properties:
+ maxUnavailable:
+ anyOf:
+ - type: integer
+ - type: string
+ description: |-
+ An eviction is allowed if at most
"maxUnavailable" pods selected by
+ "selector" are unavailable after the
eviction, i.e. even in absence of
+ the evicted pod. For example, one can
prevent all voluntary evictions
+ by specifying 0. This is a mutually
exclusive setting with "minAvailable".
+ x-kubernetes-int-or-string: true
+ minAvailable:
+ anyOf:
+ - type: integer
+ - type: string
+ description: |-
+ An eviction is allowed if at least
"minAvailable" pods selected by
+ "selector" will still be available after
the eviction, i.e. even in the
+ absence of the evicted pod. So for
example you can prevent all voluntary
+ evictions by specifying "100%". This is a
mutually exclusive setting with "maxUnavailable".
+ x-kubernetes-int-or-string: true
+ type: object
preemptionPolicy:
description: |-
PreemptionPolicy is the Policy for preempting
pods with lower priority.
@@ -14797,6 +14824,33 @@ spec:
defined in the corresponding RuntimeClass,
otherwise it will remain unset and treated as zero.
More info:
https://git.k8s.io/enhancements/keps/sig-node/688-pod-overhead/README.md
type: object
+ podDisruptionBudget:
+ description: |-
+ Defines the Kubernetes PodDisruptionBudgetSpec
for this service. When configured, the SonataFlowPlatform controller
+ will automatically create a
PodDisruptionBudget based on this specification that targets the service
Deployment.
+ Currently only apply for the Data Index.
+ properties:
+ maxUnavailable:
+ anyOf:
+ - type: integer
+ - type: string
+ description: |-
+ An eviction is allowed if at most
"maxUnavailable" pods selected by
+ "selector" are unavailable after the
eviction, i.e. even in absence of
+ the evicted pod. For example, one can
prevent all voluntary evictions
+ by specifying 0. This is a mutually
exclusive setting with "minAvailable".
+ x-kubernetes-int-or-string: true
+ minAvailable:
+ anyOf:
+ - type: integer
+ - type: string
+ description: |-
+ An eviction is allowed if at least
"minAvailable" pods selected by
+ "selector" will still be available after
the eviction, i.e. even in the
+ absence of the evicted pod. So for
example you can prevent all voluntary
+ evictions by specifying "100%". This is a
mutually exclusive setting with "maxUnavailable".
+ x-kubernetes-int-or-string: true
+ type: object
preemptionPolicy:
description: |-
PreemptionPolicy is the Policy for preempting
pods with lower priority.
@@ -24894,6 +24948,33 @@ spec:
defined in the corresponding RuntimeClass, otherwise
it will remain unset and treated as zero.
More info:
https://git.k8s.io/enhancements/keps/sig-node/688-pod-overhead/README.md
type: object
+ podDisruptionBudget:
+ description: |-
+ Defines the Kubernetes PodDisruptionBudgetSpec for
this workflow. When configured, the SonataFlow controller will
+ automatically create a PodDisruptionBudget based on
this specification that targets the workflow Deployment.
+ Ignored in "knative" deployment model, and dev profile
workflows.
+ properties:
+ maxUnavailable:
+ anyOf:
+ - type: integer
+ - type: string
+ description: |-
+ An eviction is allowed if at most "maxUnavailable"
pods selected by
+ "selector" are unavailable after the eviction,
i.e. even in absence of
+ the evicted pod. For example, one can prevent all
voluntary evictions
+ by specifying 0. This is a mutually exclusive
setting with "minAvailable".
+ x-kubernetes-int-or-string: true
+ minAvailable:
+ anyOf:
+ - type: integer
+ - type: string
+ description: |-
+ An eviction is allowed if at least "minAvailable"
pods selected by
+ "selector" will still be available after the
eviction, i.e. even in the
+ absence of the evicted pod. So for example you
can prevent all voluntary
+ evictions by specifying "100%". This is a mutually
exclusive setting with "maxUnavailable".
+ x-kubernetes-int-or-string: true
+ type: object
preemptionPolicy:
description: |-
PreemptionPolicy is the Policy for preempting pods
with lower priority.
@@ -27833,6 +27914,18 @@ rules:
- list
- update
- watch
+ - apiGroups:
+ - policy
+ resources:
+ - poddisruptionbudgets
+ verbs:
+ - create
+ - delete
+ - get
+ - list
+ - patch
+ - update
+ - watch
- apiGroups:
- serving.knative.dev
resources:
diff --git a/packages/sonataflow-operator/test/e2e/helpers.go
b/packages/sonataflow-operator/test/e2e/helpers.go
index a92c1173bb9..e467c5c7f87 100644
--- a/packages/sonataflow-operator/test/e2e/helpers.go
+++ b/packages/sonataflow-operator/test/e2e/helpers.go
@@ -30,6 +30,8 @@ import (
"strings"
"time"
+ "k8s.io/apimachinery/pkg/util/intstr"
+
"sigs.k8s.io/yaml"
operatorapi
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08"
@@ -250,7 +252,7 @@ func verifyObjectReplicasFromPath(name string, ns string,
objetType string, subR
return false
}
GinkgoWriter.Println(fmt.Sprintf("Got response %s", response))
- replicas, err := extractReplicasFromResponse(response)
+ replicas, err := extractInt32FromResponse(response)
if err != nil {
GinkgoWriter.Println(fmt.Errorf("failed to get scale replicas
from response for object: %s -> %s/%s, subResource: %s, and replicasPath: %s,
%v", objetType, ns, name, subResource, replicasPath, err))
return false
@@ -258,15 +260,12 @@ func verifyObjectReplicasFromPath(name string, ns string,
objetType string, subR
return replicas == expectedReplicas
}
-func extractReplicasFromResponse(response []byte) (int32, error) {
+func extractInt32FromResponse(response []byte) (int32, error) {
strResponse := strings.ToLower(string(response))
if strings.Contains(strResponse, "error") ||
strings.Contains(strResponse, "not found") {
return -1, fmt.Errorf("%s", response)
}
- strResponse = strings.TrimSpace(strings.ReplaceAll(strResponse, "'",
""))
- if len(strResponse) == 0 {
- return -1, fmt.Errorf("%s", response)
- }
+ strResponse = extractFromSingleQuotedResponse(strResponse)
replicas, err := strconv.ParseInt(strResponse, 10, 32)
if err != nil {
return -1, err
@@ -274,6 +273,51 @@ func extractReplicasFromResponse(response []byte) (int32,
error) {
return int32(replicas), nil
}
+func verifyResourceExists(name string, resourceType string, ns string) bool {
+ cmd := exec.Command("kubectl", "get", resourceType, name, "-n", ns,
"-o", "jsonpath='{.metadata.name}'")
+ response, err := utils.Run(cmd)
+ if err != nil {
+ GinkgoWriter.Println(fmt.Errorf("failed to check if resource
exists, name: %s, resourceType: %s, ns: %s, %v", name, resourceType, ns, err))
+ return false
+ }
+ strResponse := string(response)
+ if strings.Contains(strResponse, "NotFound") ||
strings.Contains(strResponse, "not found") {
+ return false
+ }
+ return name == extractFromSingleQuotedResponse(strResponse)
+}
+
+func verifyPodDisruptionBudgetConditionHasStatus(name string, ns string,
condition string, status string) bool {
+ jsonPath :=
fmt.Sprintf("jsonpath='{.status.conditions[?(@.type==\"%s\")].status}'",
condition)
+ cmd := exec.Command("kubectl", "get", "pdb", name, "-n", ns, "-o",
jsonPath)
+ response, err := utils.Run(cmd)
+ if err != nil {
+ GinkgoWriter.Println(fmt.Errorf("failed to get status value for
condition: %s from PodDisruptionBudget: %s/%s, %v", condition, ns, name, err))
+ return false
+ }
+ return status == extractFromSingleQuotedResponse(string(response))
+}
+
+func verifyPodDisruptionBudgetAllowsDisruptionNumber(name string, ns string,
expectedAllowedDisruptions int32) bool {
+ cmd := exec.Command("kubectl", "get", "pdb", name, "-n", ns, "-o",
"jsonpath='{.status.disruptionsAllowed}'")
+ response, err := utils.Run(cmd)
+ if err != nil {
+ GinkgoWriter.Println(fmt.Errorf("failed to get the number of
disruptionsAllowed from PodDisruptionBudget: %s/%s, %v", ns, name, err))
+ return false
+ }
+ allowedDisruptions, err := extractInt32FromResponse(response)
+ if err != nil {
+ return false
+ }
+ return allowedDisruptions == expectedAllowedDisruptions
+}
+
+func extractFromSingleQuotedResponse(response string) string {
+ result := strings.TrimSpace(response)
+ result = strings.TrimPrefix(response, "'")
+ return strings.TrimSuffix(result, "'")
+}
+
func createTmpCopy(srcPath string) string {
tmpDir := GinkgoT().TempDir()
fileName := filepath.Base(srcPath)
@@ -290,12 +334,40 @@ func createTmpCopy(srcPath string) string {
return dstPath
}
+func setReplicasOrFail(workflowFile string, replicas int32) {
+ if err := setReplicas(workflowFile, replicas); err != nil {
+ GinkgoT().Fatal(err)
+ }
+}
+
func setImageAndReplicasOrFail(workflowFile, newImage string, replicas int32) {
if err := setImageAndReplicas(workflowFile, newImage, replicas); err !=
nil {
GinkgoT().Fatal(err)
}
}
+func setPodDisruptionBudgetOrFail(workflowFile string, minAvailable
*intstr.IntOrString, maxUnavailable *intstr.IntOrString) {
+ err := applyWorkflowTransform(workflowFile, func(workflow
*operatorapi.SonataFlow) {
+ if minAvailable == nil && maxUnavailable == nil {
+ workflow.Spec.PodTemplate.PodDisruptionBudget = nil
+ } else {
+ workflow.Spec.PodTemplate.PodDisruptionBudget =
&operatorapi.PodDisruptionBudgetSpec{
+ MinAvailable: minAvailable,
+ MaxUnavailable: maxUnavailable,
+ }
+ }
+ })
+ if err != nil {
+ GinkgoT().Fatal(err)
+ }
+}
+
+func setReplicas(workflowFile string, replicas int32) error {
+ return applyWorkflowTransform(workflowFile, func(workflow
*operatorapi.SonataFlow) {
+ workflow.Spec.PodTemplate.Replicas = &replicas
+ })
+}
+
func setImageAndReplicas(workflowFile, newImage string, replicas int32) error {
return applyWorkflowTransform(workflowFile, func(workflow
*operatorapi.SonataFlow) {
workflow.Spec.PodTemplate.Container.Image = newImage
@@ -620,3 +692,43 @@ func getWorkflowDefinitionStatus(podName string,
containerName string, namespace
}
return available.(string), true, nil
}
+
+// patchSFPDataIndexReplicas executes a patch command for the dataIndex
replicas in the given platform.
+func patchSFPDataIndexReplicas(sfpName string, ns string, replicas string)
error {
+ return patchSFPServiceReplicas(sfpName, ns, "dataIndex", replicas)
+}
+
+// patchSFPDataIndexReplicas executes a patch command for the jobService
replicas in the given platform.
+func patchSFPJobServiceReplicas(sfpName string, ns string, replicas string)
error {
+ return patchSFPServiceReplicas(sfpName, ns, "jobService", replicas)
+}
+
+// patchSFPServiceReplicas use any of the patchSFPDataIndexReplicas or
patchSFPJobServiceReplicas variants.
+func patchSFPServiceReplicas(sfpName string, ns string, serviceField string,
replicas string) error {
+ patch :=
fmt.Sprintf(`{"spec":{"services":{"%s":{"podTemplate":{"replicas":%s}}}}}`,
serviceField, replicas)
+ cmd := exec.Command("kubectl", "patch", "sonataflowplatform", sfpName,
"-n", ns, "--type=merge", "-p", patch)
+ response, err := utils.Run(cmd)
+ if err != nil {
+ return fmt.Errorf("failed to patch the replicas the service: %s
in SonataFlowPlatform %s/%s, %v", serviceField, ns, sfpName, err)
+ }
+ strResponse := extractFromSingleQuotedResponse(string(response))
+ if !strings.HasPrefix(strResponse,
fmt.Sprintf("sonataflowplatform.sonataflow.org/%s patched", sfpName)) {
+ return fmt.Errorf("failed to patch the replicas for the
service: %s in SonataFlowPlatform %s/%s, verify that the platform exists in the
namespace: %s, %v", serviceField, ns, sfpName, ns, strResponse)
+ }
+ return nil
+}
+
+// patchHPAMinReplicas executes a patch command for the
HorizontalPodAutoscaler minReplicas.
+func patchHPAMinReplicas(name string, ns string, replicas string) error {
+ patch := fmt.Sprintf(`{"spec":{"minReplicas":%s}}`, replicas)
+ cmd := exec.Command("kubectl", "patch", "horizontalpodautoscaler",
name, "-n", ns, "--type=merge", "-p", patch)
+ response, err := utils.Run(cmd)
+ if err != nil {
+ return fmt.Errorf("failed to patch the minReplicas for the HPA:
%s/%s, %v", ns, name, err)
+ }
+ strResponse := extractFromSingleQuotedResponse(string(response))
+ if !strings.HasPrefix(strResponse,
fmt.Sprintf("horizontalpodautoscaler.autoscaling/%s patched", name)) {
+ return fmt.Errorf("failed to patch the minReplicas for the HPA
%s/%s, verify that the HPA exists in the namespace: %s, %s", ns, name, ns,
strResponse)
+ }
+ return nil
+}
diff --git a/packages/sonataflow-operator/test/e2e/platform_test.go
b/packages/sonataflow-operator/test/e2e/platform_test.go
index 01615e1e331..251f17652d8 100644
--- a/packages/sonataflow-operator/test/e2e/platform_test.go
+++ b/packages/sonataflow-operator/test/e2e/platform_test.go
@@ -24,6 +24,7 @@ import (
"math/rand"
"os/exec"
"path/filepath"
+ "strconv"
"strings"
"time"
@@ -526,6 +527,146 @@ var _ = Describe("Platform Use Cases :: ",
Label("platform"), Ordered, func() {
Entry("and the HPA is created after the platform",
test.GetPathFromE2EDirectory("platform", "persistence",
"generic_from_platform_cr"), test.GetPathFromE2EDirectory("platform", "hpa",
"generic-data-index-service-hpa.yaml"), false),
Entry("and the HPA is created before the platform",
test.GetPathFromE2EDirectory("platform", "persistence",
"generic_from_platform_cr"), test.GetPathFromE2EDirectory("platform", "hpa",
"generic-data-index-service-hpa.yaml"), true),
)
+
+ DescribeTable("when deploying a SonataFlowPlatform with PDB configured
for the Data Index", Label("data-index-pdb"), func(testcaseDir string,
dataIndexPatchReplicas string, shouldHavePDBAfterPatch bool,
shouldHaveDisruptionsAllowedAfterPatch string,
shouldHaveDisruptionsAllowedNumberAfterPatch int32) {
+ sfpName := "sonataflow-platform"
+ dataIndexServiceName := "sonataflow-platform-data-index-service"
+ expectedPDB := dataIndexServiceName
+
+ By("Deploy the platform and database CRs")
+ EventuallyWithOffset(1, func() error {
+ return kubectlApplyKustomizeOnCluster(testcaseDir,
targetNamespace)
+ }, 2*time.Minute, time.Second).Should(Succeed())
+
+ By("Wait for SonataFlowPlatform CR to complete the deployments")
+ EventuallyWithOffset(1, func() error {
+ return
verifyDataIndexAndJobsServiceAreReady(targetNamespace, "5s")
+ }, 10*time.Minute, 5).Should(Succeed())
+
+ By("Evaluate status of all service's health endpoint")
+ verifyDataIndexAndJobsServiceAreHealthy(targetNamespace)
+
+ By("Check that the Data Index deployment has 3 replicas as
expected")
+ EventuallyWithOffset(1, func() bool {
+ return verifyDeploymentReplicas(dataIndexServiceName,
targetNamespace, int32(3))
+ }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By("check that the PodDisruptionBudget for the Data Index was
created")
+ EventuallyWithOffset(1, func() bool {
+ return verifyResourceExists(dataIndexServiceName,
"pdb", targetNamespace)
+ }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By("check that the PodDisruptionBudget's DisruptionAllowed
condition is True")
+ EventuallyWithOffset(1, func() bool {
+ return
verifyPodDisruptionBudgetConditionHasStatus(expectedPDB, targetNamespace,
"DisruptionAllowed", "True")
+ }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By("check that the PodDisruptionBudget is allowing 2
disruptions")
+ EventuallyWithOffset(1, func() bool {
+ return
verifyPodDisruptionBudgetAllowsDisruptionNumber(expectedPDB, targetNamespace,
int32(2))
+ }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By(fmt.Sprintf("scale the Data Index service to Replicas = %s",
dataIndexPatchReplicas))
+ EventuallyWithOffset(1, func() error {
+ return patchSFPDataIndexReplicas(sfpName,
targetNamespace, dataIndexPatchReplicas)
+ }, 3*time.Minute, 30*time.Second).Should(Succeed())
+
+ if shouldHavePDBAfterPatch {
+ By("check that the PodDisruptionBudget for the Data
Index still exists")
+ } else {
+ By("check that the PodDisruptionBudget for the Data
Index was removed")
+ }
+ EventuallyWithOffset(1, func() bool {
+ return verifyResourceExists(dataIndexServiceName,
"pdb", targetNamespace)
+ }, 3*time.Minute,
30*time.Second).Should(Equal(shouldHavePDBAfterPatch))
+
+ if shouldHavePDBAfterPatch {
+ By(fmt.Sprintf("check that the PodDisruptionBudget's
DisruptionAllowed condition in the expected status = %s",
shouldHaveDisruptionsAllowedAfterPatch))
+ EventuallyWithOffset(1, func() bool {
+ return
verifyPodDisruptionBudgetConditionHasStatus(expectedPDB, targetNamespace,
"DisruptionAllowed", shouldHaveDisruptionsAllowedAfterPatch)
+ }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By(fmt.Sprintf("check that the PodDisruptionBudget is
allowing the disruptions number = %s",
strconv.Itoa(int(shouldHaveDisruptionsAllowedNumberAfterPatch))))
+ EventuallyWithOffset(1, func() bool {
+ return
verifyPodDisruptionBudgetAllowsDisruptionNumber(expectedPDB, targetNamespace,
shouldHaveDisruptionsAllowedNumberAfterPatch)
+ }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+ }
+ },
+
+ Entry("and the Data Index is rescaled to 4 replicas",
test.GetPathFromE2EDirectory("platform", "persistence",
"generic_from_platform_cr_with_pdb"), "4", true, "True", int32(3)),
+ Entry("and the Data Index is rescaled to 2 replicas",
test.GetPathFromE2EDirectory("platform", "persistence",
"generic_from_platform_cr_with_pdb"), "2", true, "True", int32(1)),
+ Entry("and the Data Index is rescaled to 1 replicas",
test.GetPathFromE2EDirectory("platform", "persistence",
"generic_from_platform_cr_with_pdb"), "1", false, "UnUsed", int32(-1)),
+ )
+
+ DescribeTable("when deploying a SonataFlowPlatform with PDB configured
for the Data Index combined with HPA", Label("data-index-pdb-with-hpa"),
func(testcaseDir string, hpaPatchReplicas string, shouldHavePDBAfterPatch bool,
shouldHaveDisruptionsAllowedAfterPatch string,
shouldHaveDisruptionsAllowedNumberAfterPatch int32) {
+ dataIndexServiceName := "sonataflow-platform-data-index-service"
+ expectedPDB := dataIndexServiceName
+
+ By("Deploy the platform and database CRs")
+ EventuallyWithOffset(1, func() error {
+ return kubectlApplyKustomizeOnCluster(testcaseDir,
targetNamespace)
+ }, 2*time.Minute, time.Second).Should(Succeed())
+
+ By("Wait for SonataFlowPlatform CR to complete the deployments")
+ EventuallyWithOffset(1, func() error {
+ return
verifyDataIndexAndJobsServiceAreReady(targetNamespace, "5s")
+ }, 10*time.Minute, 5).Should(Succeed())
+
+ By("Evaluate status of all service's health endpoint")
+ verifyDataIndexAndJobsServiceAreHealthy(targetNamespace)
+
+ By("Check that the Data Index deployment has 2 replicas as
expected from the initial HPA configuration")
+ EventuallyWithOffset(1, func() bool {
+ return verifyDeploymentReplicas(dataIndexServiceName,
targetNamespace, int32(2))
+ }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By("check that the PodDisruptionBudget for the Data Index was
created")
+ EventuallyWithOffset(1, func() bool {
+ return verifyResourceExists(dataIndexServiceName,
"pdb", targetNamespace)
+ }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By("check that the PodDisruptionBudget's DisruptionAllowed
condition is True")
+ EventuallyWithOffset(1, func() bool {
+ return
verifyPodDisruptionBudgetConditionHasStatus(expectedPDB, targetNamespace,
"DisruptionAllowed", "True")
+ }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By("check that the PodDisruptionBudget is allowing 1
disruption")
+ EventuallyWithOffset(1, func() bool {
+ return
verifyPodDisruptionBudgetAllowsDisruptionNumber(expectedPDB, targetNamespace,
int32(1))
+ }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By(fmt.Sprintf("patch the HPA minReplicas to: %s",
hpaPatchReplicas))
+ EventuallyWithOffset(1, func() error {
+ return
patchHPAMinReplicas("sonataflow-platform-data-index-service-hpa",
targetNamespace, hpaPatchReplicas)
+ }, 3*time.Minute, 30*time.Second).Should(Succeed())
+
+ if shouldHavePDBAfterPatch {
+ By("check that the PodDisruptionBudget for the Data
Index still exists after patching the HPA")
+ } else {
+ By("check that the PodDisruptionBudget for the Data
Index was removed after patching the HPA")
+ }
+
+ EventuallyWithOffset(1, func() bool {
+ return verifyResourceExists(dataIndexServiceName,
"pdb", targetNamespace)
+ }, 3*time.Minute,
30*time.Second).Should(Equal(shouldHavePDBAfterPatch))
+
+ if shouldHavePDBAfterPatch {
+ By(fmt.Sprintf("check that the PodDisruptionBudget's
DisruptionAllowed condition in the expected status = %s",
shouldHaveDisruptionsAllowedAfterPatch))
+ EventuallyWithOffset(1, func() bool {
+ return
verifyPodDisruptionBudgetConditionHasStatus(expectedPDB, targetNamespace,
"DisruptionAllowed", shouldHaveDisruptionsAllowedAfterPatch)
+ }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By(fmt.Sprintf("check that the PodDisruptionBudget is
allowing the disruptions number = %s",
strconv.Itoa(int(shouldHaveDisruptionsAllowedNumberAfterPatch))))
+ EventuallyWithOffset(1, func() bool {
+ return
verifyPodDisruptionBudgetAllowsDisruptionNumber(expectedPDB, targetNamespace,
shouldHaveDisruptionsAllowedNumberAfterPatch)
+ }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+ }
+ },
+
+ Entry("and the HPA minReplicas is configured with 4",
test.GetPathFromE2EDirectory("platform", "persistence",
"generic_from_platform_cr_with_pdb_and_hpa"), "4", true, "True", int32(3)),
+ Entry("and the HPA minReplicas is configured with 3",
test.GetPathFromE2EDirectory("platform", "persistence",
"generic_from_platform_cr_with_pdb_and_hpa"), "3", true, "True", int32(2)),
+ Entry("and the HPA minReplicas is configured with 1",
test.GetPathFromE2EDirectory("platform", "persistence",
"generic_from_platform_cr_with_pdb_and_hpa"), "1", false, "UnUsed", int32(-1)),
+ )
})
func verifyDataIndexAndJobsServiceAreReady(namespace string, timeout string)
error {
diff --git
a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb/01-postgres.yaml
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb/01-postgres.yaml
new file mode 100644
index 00000000000..89f6e6fc81d
--- /dev/null
+++
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb/01-postgres.yaml
@@ -0,0 +1,89 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+ labels:
+ app.kubernetes.io/name: postgres
+ name: postgres-pvc
+spec:
+ accessModes:
+ - ReadWriteOnce
+ resources:
+ requests:
+ storage: 1Gi
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ labels:
+ app.kubernetes.io/name: postgres
+ name: postgres
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app.kubernetes.io/name: postgres
+ template:
+ metadata:
+ labels:
+ app.kubernetes.io/name: postgres
+ spec:
+ containers:
+ - name: postgres
+ image: postgres:13.2-alpine
+ imagePullPolicy: "IfNotPresent"
+ ports:
+ - containerPort: 5432
+ volumeMounts:
+ - name: storage
+ mountPath: /var/lib/postgresql/data
+ envFrom:
+ - secretRef:
+ name: postgres-secrets
+ readinessProbe:
+ exec:
+ command: ["pg_isready"]
+ initialDelaySeconds: 15
+ timeoutSeconds: 2
+ livenessProbe:
+ exec:
+ command: ["pg_isready"]
+ initialDelaySeconds: 15
+ timeoutSeconds: 2
+ resources:
+ limits:
+ memory: "256Mi"
+ cpu: "500m"
+ volumes:
+ - name: storage
+ persistentVolumeClaim:
+ claimName: postgres-pvc
+---
+apiVersion: v1
+kind: Service
+metadata:
+ labels:
+ app.kubernetes.io/name: postgres
+ name: postgres
+spec:
+ selector:
+ app.kubernetes.io/name: postgres
+ ports:
+ - port: 5432
diff --git
a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb/02-sonataflow_platform.yaml
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb/02-sonataflow_platform.yaml
new file mode 100644
index 00000000000..d43db8d7cba
--- /dev/null
+++
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb/02-sonataflow_platform.yaml
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+apiVersion: sonataflow.org/v1alpha08
+kind: SonataFlowPlatform
+metadata:
+ name: sonataflow-platform
+spec:
+ persistence:
+ postgresql:
+ secretRef:
+ name: postgres-secrets
+ userKey: POSTGRES_USER
+ passwordKey: POSTGRES_PASSWORD
+ serviceRef:
+ name: postgres
+ port: 5432
+ databaseName: sonataflow
+ services:
+ dataIndex:
+ enabled: false
+ persistence:
+ dbMigrationStrategy: service
+ podTemplate:
+ replicas: 3
+ podDisruptionBudget:
+ minAvailable: 1
+ initContainers:
+ - name: init-postgres
+ image: registry.access.redhat.com/ubi9/ubi-micro:latest
+ imagePullPolicy: IfNotPresent
+ command:
+ [
+ "sh",
+ "-c",
+ 'until (echo 1 > /dev/tcp/postgres.$(cat
/var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local/5432)
>/dev/null 2>&1; do echo "Waiting for postgres server"; sleep 3; done;',
+ ]
+ jobService:
+ enabled: false
+ persistence:
+ dbMigrationStrategy: service
+ podTemplate:
+ initContainers:
+ - name: init-postgres
+ image: registry.access.redhat.com/ubi9/ubi-micro:latest
+ imagePullPolicy: IfNotPresent
+ command:
+ [
+ "sh",
+ "-c",
+ 'until (echo 1 > /dev/tcp/postgres.$(cat
/var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local/5432)
>/dev/null 2>&1; do echo "Waiting for postgres server"; sleep 3; done;',
+ ]
diff --git
a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb/kustomization.yaml
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb/kustomization.yaml
new file mode 100644
index 00000000000..48d72cbd0b8
--- /dev/null
+++
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb/kustomization.yaml
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+resources:
+ - 01-postgres.yaml
+ - 02-sonataflow_platform.yaml
+
+generatorOptions:
+ disableNameSuffixHash: true
+
+secretGenerator:
+ - name: postgres-secrets
+ literals:
+ - POSTGRES_USER=sonataflow
+ - POSTGRES_PASSWORD=sonataflow
+ - POSTGRES_DATABASE=sonataflow
+ - PGDATA=/var/lib/pgsql/data/userdata
+
+sortOptions:
+ order: fifo
diff --git
a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/01-postgres.yaml
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/01-postgres.yaml
new file mode 100644
index 00000000000..89f6e6fc81d
--- /dev/null
+++
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/01-postgres.yaml
@@ -0,0 +1,89 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+ labels:
+ app.kubernetes.io/name: postgres
+ name: postgres-pvc
+spec:
+ accessModes:
+ - ReadWriteOnce
+ resources:
+ requests:
+ storage: 1Gi
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ labels:
+ app.kubernetes.io/name: postgres
+ name: postgres
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app.kubernetes.io/name: postgres
+ template:
+ metadata:
+ labels:
+ app.kubernetes.io/name: postgres
+ spec:
+ containers:
+ - name: postgres
+ image: postgres:13.2-alpine
+ imagePullPolicy: "IfNotPresent"
+ ports:
+ - containerPort: 5432
+ volumeMounts:
+ - name: storage
+ mountPath: /var/lib/postgresql/data
+ envFrom:
+ - secretRef:
+ name: postgres-secrets
+ readinessProbe:
+ exec:
+ command: ["pg_isready"]
+ initialDelaySeconds: 15
+ timeoutSeconds: 2
+ livenessProbe:
+ exec:
+ command: ["pg_isready"]
+ initialDelaySeconds: 15
+ timeoutSeconds: 2
+ resources:
+ limits:
+ memory: "256Mi"
+ cpu: "500m"
+ volumes:
+ - name: storage
+ persistentVolumeClaim:
+ claimName: postgres-pvc
+---
+apiVersion: v1
+kind: Service
+metadata:
+ labels:
+ app.kubernetes.io/name: postgres
+ name: postgres
+spec:
+ selector:
+ app.kubernetes.io/name: postgres
+ ports:
+ - port: 5432
diff --git
a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/02-sonataflow_platform.yaml
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/02-sonataflow_platform.yaml
new file mode 100644
index 00000000000..799c6554e02
--- /dev/null
+++
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/02-sonataflow_platform.yaml
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+apiVersion: sonataflow.org/v1alpha08
+kind: SonataFlowPlatform
+metadata:
+ name: sonataflow-platform
+spec:
+ persistence:
+ postgresql:
+ secretRef:
+ name: postgres-secrets
+ userKey: POSTGRES_USER
+ passwordKey: POSTGRES_PASSWORD
+ serviceRef:
+ name: postgres
+ port: 5432
+ databaseName: sonataflow
+ services:
+ dataIndex:
+ enabled: false
+ persistence:
+ dbMigrationStrategy: service
+ podTemplate:
+ podDisruptionBudget:
+ minAvailable: 1
+ initContainers:
+ - name: init-postgres
+ image: registry.access.redhat.com/ubi9/ubi-micro:latest
+ imagePullPolicy: IfNotPresent
+ command:
+ [
+ "sh",
+ "-c",
+ 'until (echo 1 > /dev/tcp/postgres.$(cat
/var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local/5432)
>/dev/null 2>&1; do echo "Waiting for postgres server"; sleep 3; done;',
+ ]
+ jobService:
+ enabled: false
+ persistence:
+ dbMigrationStrategy: service
+ podTemplate:
+ initContainers:
+ - name: init-postgres
+ image: registry.access.redhat.com/ubi9/ubi-micro:latest
+ imagePullPolicy: IfNotPresent
+ command:
+ [
+ "sh",
+ "-c",
+ 'until (echo 1 > /dev/tcp/postgres.$(cat
/var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local/5432)
>/dev/null 2>&1; do echo "Waiting for postgres server"; sleep 3; done;',
+ ]
diff --git
a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/03-data-index-service-hpa.yaml
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/03-data-index-service-hpa.yaml
new file mode 100644
index 00000000000..27c2408dc2f
--- /dev/null
+++
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/03-data-index-service-hpa.yaml
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+apiVersion: autoscaling/v2
+kind: HorizontalPodAutoscaler
+metadata:
+ name: sonataflow-platform-data-index-service-hpa
+spec:
+ scaleTargetRef:
+ apiVersion: apps/v1
+ kind: Deployment
+ name: sonataflow-platform-data-index-service
+ minReplicas: 2
+ maxReplicas: 5
+ metrics:
+ - type: Resource
+ resource:
+ name: cpu
+ target:
+ type: Utilization
+ averageUtilization: 70
+ - type: Resource
+ resource:
+ name: memory
+ target:
+ type: Utilization
+ averageUtilization: 80
diff --git
a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/kustomization.yaml
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/kustomization.yaml
new file mode 100644
index 00000000000..c0b08ff093c
--- /dev/null
+++
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/kustomization.yaml
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+resources:
+ - 01-postgres.yaml
+ - 02-sonataflow_platform.yaml
+ - 03-data-index-service-hpa.yaml
+
+generatorOptions:
+ disableNameSuffixHash: true
+
+secretGenerator:
+ - name: postgres-secrets
+ literals:
+ - POSTGRES_USER=sonataflow
+ - POSTGRES_PASSWORD=sonataflow
+ - POSTGRES_DATABASE=sonataflow
+ - PGDATA=/var/lib/pgsql/data/userdata
+
+sortOptions:
+ order: fifo
diff --git a/packages/sonataflow-operator/test/e2e/workflow_test.go
b/packages/sonataflow-operator/test/e2e/workflow_test.go
index b160b653ff8..620c377e001 100644
--- a/packages/sonataflow-operator/test/e2e/workflow_test.go
+++ b/packages/sonataflow-operator/test/e2e/workflow_test.go
@@ -27,6 +27,8 @@ import (
"strings"
"time"
+ "k8s.io/apimachinery/pkg/util/intstr"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/test"
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/test/utils"
@@ -504,3 +506,215 @@ var _ = Describe("Workflow HorizontalPodAutoscaling Use
Cases :: ", Label("flows
})
})
})
+
+var _ = Describe("Workflow PodDisruptionBudget Use Cases :: ",
Label("flows-pdb"), Ordered, func() {
+ var targetNamespace string
+ BeforeEach(func() {
+ targetNamespace = fmt.Sprintf("test-%d",
rand.Intn(randomIntRange)+1)
+ err := kubectlCreateNamespace(targetNamespace)
+ Expect(err).NotTo(HaveOccurred())
+ })
+ AfterEach(func() {
+ // Remove resources in test namespace
+ if !CurrentSpecReport().Failed() && len(targetNamespace) > 0 {
+ cmd := exec.Command("kubectl", "delete", "sonataflow",
"--all", "-n", targetNamespace, "--wait")
+ _, err := utils.Run(cmd)
+ Expect(err).NotTo(HaveOccurred())
+ err = kubectlDeleteNamespace(targetNamespace)
+ Expect(err).NotTo(HaveOccurred())
+ }
+ })
+
+ DescribeTable("Ensure PDB works by workflow profile", func(profile
string, usePercent bool) {
+
+ By("create the template file")
+ sonataFlowCrFileTemplate :=
test.GetPathFromE2EDirectory("workflows", prebuiltWorkflows.Greetings.Name,
"01-sonataflow.org_v1alpha08_sonataflow.yaml")
+ sonataFlowCrFile := createTmpCopy(sonataFlowCrFileTemplate)
+
+ if profile == "gitops" {
+ By("setup the workflow file for the gitops profile")
+ setImageAndReplicasOrFail(sonataFlowCrFile,
prebuiltWorkflows.Greetings.Tag, int32(4))
+ setProfileOrFail(sonataFlowCrFile, "gitops")
+ } else {
+ By("setup the workflow file for the preview profile")
+ setImageAndReplicasOrFail(sonataFlowCrFile, "",
int32(4))
+ setProfileOrFail(sonataFlowCrFile, "preview")
+ }
+
+ var minReplicas intstr.IntOrString
+ if usePercent {
+ By("setup the workflow podDisruptionBudget using 50%
minReplicas")
+ minReplicas = intstr.FromString("50%")
+ } else {
+ By("setup the workflow podDisruptionBudget using 2
minReplicas")
+ minReplicas = intstr.FromInt32(int32(2))
+ }
+ setPodDisruptionBudgetOrFail(sonataFlowCrFile, &minReplicas,
nil)
+
+ By("deploy the workflow")
+ EventuallyWithOffset(1, func() error {
+ return kubectlApplyFileOnCluster(sonataFlowCrFile,
targetNamespace)
+ }, 3*time.Minute, time.Second).Should(Succeed())
+
+ By("check the workflow is in running state")
+ EventuallyWithOffset(1, func() bool { return
verifyWorkflowIsInRunningState(prebuiltWorkflows.Greetings.Name,
targetNamespace) }, 15*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By("check that the workflow has 4 replica")
+ EventuallyWithOffset(1, func() bool {
+ return
verifyWorkflowReplicas(prebuiltWorkflows.Greetings.Name, targetNamespace,
int32(4))
+ }, 2*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By("check that the PodDisruptionBudget was created")
+ EventuallyWithOffset(1, func() bool {
+ return
verifyResourceExists(prebuiltWorkflows.Greetings.Name, "pdb", targetNamespace)
+ }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By("check that the PodDisruptionBudget's DisruptionAllowed
condition is True")
+ EventuallyWithOffset(1, func() bool {
+ return
verifyPodDisruptionBudgetConditionHasStatus(prebuiltWorkflows.Greetings.Name,
targetNamespace, "DisruptionAllowed", "True")
+ }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By("check that the PodDisruptionBudget is allowing 2
disruptions")
+ EventuallyWithOffset(1, func() bool {
+ return
verifyPodDisruptionBudgetAllowsDisruptionNumber(prebuiltWorkflows.Greetings.Name,
targetNamespace, int32(2))
+ }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By("scale the workflow to 1")
+ setReplicasOrFail(sonataFlowCrFile, int32(1))
+ EventuallyWithOffset(1, func() error {
+ return kubectlApplyFileOnCluster(sonataFlowCrFile,
targetNamespace)
+ }, 3*time.Minute, time.Second).Should(Succeed())
+
+ By("check that the workflow has 1 replica")
+ EventuallyWithOffset(1, func() bool {
+ return
verifyWorkflowReplicas(prebuiltWorkflows.Greetings.Name, targetNamespace,
int32(1))
+ }, 2*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By("check that the PodDisruptionBudget is removed after scaling
the workflow to 1")
+ EventuallyWithOffset(1, func() bool {
+ return
verifyResourceExists(prebuiltWorkflows.Greetings.Name, "pdb", targetNamespace)
+ }, 3*time.Minute, 30*time.Second).Should(BeFalse())
+ },
+ Entry("Workflow with gitops profile and disruption budget with
minReplicas: 50%", "gitops", true),
+ Entry("Workflow with gitops profile and disruption budget with
minReplicas: 2", "gitops", false),
+ Entry("Workflow with preview profile and disruption budget with
minReplicas: 50%", "preview", true),
+ Entry("Workflow with preview profile and disruption budget with
minReplicas: 2", "preview", false),
+ )
+})
+
+var _ = Describe("Workflow PodDisruptionBudget + HorizontalPodAutoscaler Use
Cases :: ", Label("flows-pdb-with-hpa"), Ordered, func() {
+ var targetNamespace string
+ BeforeEach(func() {
+ targetNamespace = fmt.Sprintf("test-%d",
rand.Intn(randomIntRange)+1)
+ err := kubectlCreateNamespace(targetNamespace)
+ Expect(err).NotTo(HaveOccurred())
+ })
+ AfterEach(func() {
+ // Remove resources in test namespace
+ if !CurrentSpecReport().Failed() && len(targetNamespace) > 0 {
+ cmd := exec.Command("kubectl", "delete", "sonataflow",
"--all", "-n", targetNamespace, "--wait")
+ _, err := utils.Run(cmd)
+ Expect(err).NotTo(HaveOccurred())
+ err = kubectlDeleteNamespace(targetNamespace)
+ Expect(err).NotTo(HaveOccurred())
+ }
+ })
+
+ DescribeTable("Ensure PDB works by workflow profile when using a HPA",
func(profile string, usePercent bool) {
+
+ By("create the template file")
+ sonataFlowCrFileTemplate :=
test.GetPathFromE2EDirectory("workflows", prebuiltWorkflows.Greetings.Name,
"01-sonataflow.org_v1alpha08_sonataflow.yaml")
+ sonataFlowCrFile := createTmpCopy(sonataFlowCrFileTemplate)
+
+ if profile == "gitops" {
+ By("setup the workflow file for the gitops profile and
1 replica")
+ setImageAndReplicasOrFail(sonataFlowCrFile,
prebuiltWorkflows.Greetings.Tag, int32(1))
+ setProfileOrFail(sonataFlowCrFile, "gitops")
+ } else {
+ By("setup the workflow file for the preview profile and
1 replica")
+ setImageAndReplicasOrFail(sonataFlowCrFile, "",
int32(1))
+ setProfileOrFail(sonataFlowCrFile, "preview")
+ }
+
+ var minReplicas intstr.IntOrString
+ if usePercent {
+ By("setup the workflow podDisruptionBudget using 30%
minReplicas")
+ minReplicas = intstr.FromString("30%")
+ } else {
+ By("setup the workflow podDisruptionBudget using 1
minReplicas")
+ minReplicas = intstr.FromInt32(int32(1))
+ }
+ setPodDisruptionBudgetOrFail(sonataFlowCrFile, &minReplicas,
nil)
+
+ By("deploy the workflow")
+ EventuallyWithOffset(1, func() error {
+ return kubectlApplyFileOnCluster(sonataFlowCrFile,
targetNamespace)
+ }, 3*time.Minute, time.Second).Should(Succeed())
+
+ By("check the workflow is in running state")
+ EventuallyWithOffset(1, func() bool { return
verifyWorkflowIsInRunningState(prebuiltWorkflows.Greetings.Name,
targetNamespace) }, 15*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By("check that the workflow has 1 replica")
+ EventuallyWithOffset(1, func() bool {
+ return
verifyWorkflowReplicas(prebuiltWorkflows.Greetings.Name, targetNamespace,
int32(1))
+ }, 2*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By("check that no PodDisruptionBudget was created since the
workflow has 1 replica")
+ EventuallyWithOffset(1, func() bool {
+ return
verifyResourceExists(prebuiltWorkflows.Greetings.Name, "pdb", targetNamespace)
+ }, 3*time.Minute, 30*time.Second).Should(BeFalse())
+
+ By("create the HPA to manage the workflow replicas and
configured to ensure 3 min replicas")
+ sonataFlowCrFileHPA :=
test.GetPathFromE2EDirectory("workflows", prebuiltWorkflows.Greetings.Name,
"01-sonataflow.org_v1alpha08_sonataflow_hpa.yaml")
+ EventuallyWithOffset(1, func() error {
+ return kubectlApplyFileOnCluster(sonataFlowCrFileHPA,
targetNamespace)
+ }, 3*time.Minute, time.Second).Should(Succeed())
+
+ By("check that the workflow has 3 replicas after creating the
HPA")
+ EventuallyWithOffset(1, func() bool {
+ return
verifyWorkflowReplicas(prebuiltWorkflows.Greetings.Name, targetNamespace,
int32(3))
+ }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By("check that the PodDisruptionBudget was created after
creating the HPA")
+ EventuallyWithOffset(1, func() bool {
+ return
verifyResourceExists(prebuiltWorkflows.Greetings.Name, "pdb", targetNamespace)
+ }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By("check that the PodDisruptionBudget's DisruptionAllowed
condition is True")
+ EventuallyWithOffset(1, func() bool {
+ return
verifyPodDisruptionBudgetConditionHasStatus(prebuiltWorkflows.Greetings.Name,
targetNamespace, "DisruptionAllowed", "True")
+ }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By("check that the PodDisruptionBudget is allowing 2
disruptions")
+ EventuallyWithOffset(1, func() bool {
+ return
verifyPodDisruptionBudgetAllowsDisruptionNumber(prebuiltWorkflows.Greetings.Name,
targetNamespace, int32(2))
+ }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By("delete the HPA")
+ EventuallyWithOffset(1, func() error {
+ return kubectlDeleteFileOnCluster(sonataFlowCrFileHPA,
targetNamespace)
+ }, 3*time.Minute, time.Second).Should(Succeed())
+
+ By("scale the workflow to 1")
+ setReplicasOrFail(sonataFlowCrFile, int32(1))
+ EventuallyWithOffset(1, func() error {
+ return kubectlApplyFileOnCluster(sonataFlowCrFile,
targetNamespace)
+ }, 3*time.Minute, time.Second).Should(Succeed())
+
+ By("check that the workflow has 1 replica")
+ EventuallyWithOffset(1, func() bool {
+ return
verifyWorkflowReplicas(prebuiltWorkflows.Greetings.Name, targetNamespace,
int32(1))
+ }, 2*time.Minute, 30*time.Second).Should(BeTrue())
+
+ By("check that the PodDisruptionBudget is removed after scaling
the workflow to 1")
+ EventuallyWithOffset(1, func() bool {
+ return
verifyResourceExists(prebuiltWorkflows.Greetings.Name, "pdb", targetNamespace)
+ }, 3*time.Minute, 30*time.Second).Should(BeFalse())
+
+ },
+ Entry("Workflow with gitops profile and disruption budget with
minReplicas: 30%", "gitops", true),
+ Entry("Workflow with gitops profile and disruption budget with
minReplicas: 1", "gitops", false),
+ Entry("Workflow with preview profile and disruption budget with
minReplicas: 30%", "preview", true),
+ Entry("Workflow with preview profile and disruption budget with
minReplicas: 1", "preview", false),
+ )
+})
diff --git a/packages/sonataflow-operator/utils/kubernetes/autoscaling.go
b/packages/sonataflow-operator/utils/kubernetes/autoscaling.go
new file mode 100644
index 00000000000..d5274b26dca
--- /dev/null
+++ b/packages/sonataflow-operator/utils/kubernetes/autoscaling.go
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package kubernetes
+
+import (
+ "context"
+ "reflect"
+
+ v1 "k8s.io/api/core/v1"
+ "k8s.io/klog/v2"
+
+ autoscalingv2 "k8s.io/api/autoscaling/v2"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+
+ "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log"
+)
+
+// FindHPAForDeployment returns the HorizontalPodAutoscaler targeting a
deployment in a given namespace, or nil if it
+// doesn't exist.
+// Note: By k8s definition, the HorizontalPodAutoscaler must belong to the
same namespace as the managed deployment.
+func FindHPAForDeployment(ctx context.Context, c client.Client, namespace
string, name string) (*autoscalingv2.HorizontalPodAutoscaler, error) {
+ return findHPAForTarget(ctx, c, namespace, "apps/v1", "Deployment",
name)
+}
+
+// FindHPAForWorkflow returns the HorizontalPodAutoscaler targeting a workflow
in a given namespace, or nil if it
+// doesn't exist.
+// Note: By k8s definition, the HorizontalPodAutoscaler must belong to the
same namespace as the managed workflow.
+func FindHPAForWorkflow(ctx context.Context, c client.Client, namespace
string, name string) (*autoscalingv2.HorizontalPodAutoscaler, error) {
+ return findHPAForTarget(ctx, c, namespace, "sonataflow.org/v1alpha08",
"SonataFlow", name)
+}
+
+func findHPAForTarget(ctx context.Context, c client.Client, namespace string,
apiVersion string, kind string, name string)
(*autoscalingv2.HorizontalPodAutoscaler, error) {
+ klog.V(log.D).Infof("Querying HorizontalPodAutoscalers in namespace:
%s.", namespace)
+ var hpaList autoscalingv2.HorizontalPodAutoscalerList
+ if err := c.List(ctx, &hpaList, client.InNamespace(namespace)); err !=
nil {
+ return nil, err
+ }
+ klog.V(log.D).Infof("Total number of returned HorizontalPodAutoscalers
is: %d.", len(hpaList.Items))
+ for _, hpa := range hpaList.Items {
+ ref := hpa.Spec.ScaleTargetRef
+ klog.V(log.D).Infof("Checking if HorizontalPodAutoscaler name:
%s, ref.APIVersion: %s, ref.Kind: %s, ref.Name: %s, targets apiVersion: %s,
kind: %s, name: %s.", hpa.Name, ref.APIVersion, ref.Kind, ref.Name, apiVersion,
kind, name)
+ if ref.Kind == kind && ref.Name == name && ref.APIVersion ==
apiVersion {
+ klog.V(log.D).Infof("HorizontalPodAutoscaler name: %s
targets apiVersion: %s, kind: %s, name: %s.", hpa.Name, apiVersion, kind, name)
+ return &hpa, nil
+ }
+ }
+ klog.V(log.D).Infof("No HorizontalPodAutoscaler targets apiVersion: %s,
kind: %s, name: %s.", apiVersion, kind, name)
+ return nil, nil
+}
+
+// HPAIsActive returns true if the HorizontalPodAutoscaler is active.
+func HPAIsActive(hpa *autoscalingv2.HorizontalPodAutoscaler) bool {
+ klog.V(log.D).Infof("Checking if HorizontalPodAutoscaler is Active.")
+ for _, cond := range hpa.Status.Conditions {
+ klog.V(log.D).Infof("Checking Status condition type: %s, %s.",
cond.Type, cond.Status)
+ if cond.Type == autoscalingv2.ScalingActive {
+ return cond.Status == v1.ConditionTrue
+ }
+ }
+ return false
+}
+
+// HPAIsWorking returns true if the HorizontalPodAutoscaler has started to
take care of the scaling for the
+// corresponding target ref. At this point, our controllers must transfer the
control to the HorizontalPodAutoscaler
+// and let it manage the replicas.
+func HPAIsWorking(hpa *autoscalingv2.HorizontalPodAutoscaler) bool {
+ return HPAIsActive(hpa) || hpa.Status.DesiredReplicas > 0
+}
+
+// HPAEqualsBySpec returns true if to HorizontalPodAutoscaler has the same
Spec, false in any other case.
+func HPAEqualsBySpec(hpa1, hpa2 *autoscalingv2.HorizontalPodAutoscaler) bool {
+ return reflect.DeepEqual(hpa1, hpa2)
+}
+
+// IsHPAndTargetsAKind returns (*autoscalingv2.HorizontalPodAutoscaler, true)
if the object is a HorizontalPodAutoscaler
+// and targets a given kind, (nil, false) in other cases.
+func IsHPAndTargetsAKind(obj client.Object, kind string)
(*autoscalingv2.HorizontalPodAutoscaler, bool) {
+ if hpa, ok := obj.(*autoscalingv2.HorizontalPodAutoscaler); ok {
+ if hpa != nil && hpa.Spec.ScaleTargetRef.Kind == kind {
+ return hpa, true
+ }
+ }
+ return nil, false
+}
+
+// IsHPAndTargetsADeployment returns (*autoscalingv2.HorizontalPodAutoscaler,
true) if the object is a HorizontalPodAutoscaler
+// and targets a Deployment, (nil, false) in other cases.
+func IsHPAndTargetsADeployment(obj client.Object)
(*autoscalingv2.HorizontalPodAutoscaler, bool) {
+ return IsHPAndTargetsAKind(obj, "Deployment")
+}
+
+// IsHPAndTargetsASonataFlow returns (*autoscalingv2.HorizontalPodAutoscaler,
true) if the object is a HorizontalPodAutoscaler
+// and targets a SonataFlow, (nil, false) in other cases.
+func IsHPAndTargetsASonataFlow(obj client.Object)
(*autoscalingv2.HorizontalPodAutoscaler, bool) {
+ return IsHPAndTargetsAKind(obj, "SonataFlow")
+}
+
+// IsHPAndTargetsASonataFlowAsBool returns true if the object is a
HorizontalPodAutoscaler and targets a SonataFlow,
+// false in other cases.
+func IsHPAndTargetsASonataFlowAsBool(obj client.Object) bool {
+ _, ok := IsHPAndTargetsAKind(obj, "SonataFlow")
+ return ok
+}
+
+// HPAMinReplicasIsGreaterThan returns true if the HorizontalPodAutoscaler
configured minReplicas is != nil, and greater
+// than the given value. False in any other case.
+func HPAMinReplicasIsGreaterThan(hpa *autoscalingv2.HorizontalPodAutoscaler,
value int32) bool {
+ return hpa.Spec.MinReplicas != nil && *hpa.Spec.MinReplicas > value
+}
diff --git a/packages/sonataflow-operator/utils/kubernetes/deployment.go
b/packages/sonataflow-operator/utils/kubernetes/deployment.go
index 775a35e8b81..08c04b9612a 100644
--- a/packages/sonataflow-operator/utils/kubernetes/deployment.go
+++ b/packages/sonataflow-operator/utils/kubernetes/deployment.go
@@ -204,3 +204,15 @@ func AddOrReplaceContainer(containerName string, container
v1.Container, podSpec
podSpec.Containers[idx] = container
}
}
+
+// DeploymentReplicasIsGreaterThan returns true if the Deployment configured
replicas is != nil, and greater than
+// the given value. False in any other case.
+func DeploymentReplicasIsGreaterThan(deployment *appsv1.Deployment, value
int32) bool {
+ return deployment.Spec.Replicas != nil && *deployment.Spec.Replicas >
value
+}
+
+// DeploymentIsScaledToZero returns true if the Deployment has been explicitly
scaled to zero, i.e., by setting
+// the replicas == 0. False in any other case, including when replicas == nil.
+func DeploymentIsScaledToZero(deployment *appsv1.Deployment) bool {
+ return deployment.Spec.Replicas != nil && *deployment.Spec.Replicas ==
int32(0)
+}
diff --git a/packages/sonataflow-operator/utils/kubernetes/disruption_budget.go
b/packages/sonataflow-operator/utils/kubernetes/disruption_budget.go
new file mode 100644
index 00000000000..348e256596a
--- /dev/null
+++ b/packages/sonataflow-operator/utils/kubernetes/disruption_budget.go
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package kubernetes
+
+import (
+ "context"
+
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+ operatorapi
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08"
+
+ policyv1 "k8s.io/api/policy/v1"
+ "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/klog/v2"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+
+ "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log"
+)
+
+// IsEmptyPodDisruptionBudgetSpec returns true if the PodDisruptionBudgetSpec
is nil or has no configured values at all,
+// false in any other case.
+func IsEmptyPodDisruptionBudgetSpec(spec *operatorapi.PodDisruptionBudgetSpec)
bool {
+ return spec == nil || (spec.MinAvailable == nil && spec.MaxUnavailable
== nil)
+}
+
+// ApplyPodDisruptionBudgetSpec applies an operatorapi.PodDisruptionBudgetSpec
to the PodDisruptionBudget.
+func ApplyPodDisruptionBudgetSpec(pdb *policyv1.PodDisruptionBudget, spec
*operatorapi.PodDisruptionBudgetSpec) {
+ if spec.MinAvailable != nil {
+ pdb.Spec.MinAvailable = spec.MinAvailable
+ pdb.Spec.MaxUnavailable = nil
+ } else {
+ pdb.Spec.MaxUnavailable = spec.MaxUnavailable
+ pdb.Spec.MinAvailable = nil
+ }
+}
+
+// SafeDeletePodDisruptionBudget deletes a potentially existing
PodDisruptionBudget, ignoring the not existing error.
+func SafeDeletePodDisruptionBudget(ctx context.Context, c client.Client,
namespace, name string) error {
+ err := c.Delete(ctx, &policyv1.PodDisruptionBudget{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: namespace,
+ Name: name,
+ },
+ })
+ if err != nil {
+ if errors.IsNotFound(err) {
+ klog.V(log.D).Infof("PodDisruptionBudget %s/%s was
already deleted or never existed.", namespace, name)
+ return nil
+ } else {
+ return err
+ }
+ }
+ return nil
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]