This is an automated email from the ASF dual-hosted git repository.

pcongiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git


The following commit(s) were added to refs/heads/main by this push:
     new 356693ea6 feat(kafka): support Kafka binding resource
356693ea6 is described below

commit 356693ea6dadf21373e2b39dc0c90a7b12c5097f
Author: Pasquale Congiusti <[email protected]>
AuthorDate: Tue Mar 26 11:19:39 2024 +0100

    feat(kafka): support Kafka binding resource
    
    * Added logic to serialize a Camel Kafka endpoint uri from a Kafka or a 
KafkaTopic
    * Updated the `.status.listener` type to name, as in v1beta2, the status 
don't report type any longer
    
    Closes #4337
---
 addons/strimzi/duck/v1beta2/duck_types.go |   4 +-
 addons/strimzi/strimzi.go                 | 100 +++++++++++++++++++++------
 addons/strimzi/strimzi_test.go            | 109 ++++++++++++++++++++++++++++--
 3 files changed, 186 insertions(+), 27 deletions(-)

diff --git a/addons/strimzi/duck/v1beta2/duck_types.go 
b/addons/strimzi/duck/v1beta2/duck_types.go
index 936c76b33..620affa00 100644
--- a/addons/strimzi/duck/v1beta2/duck_types.go
+++ b/addons/strimzi/duck/v1beta2/duck_types.go
@@ -28,7 +28,7 @@ const (
 
        StrimziKafkaClusterLabel = "strimzi.io/cluster"
 
-       StrimziListenerTypePlain = "plain"
+       StrimziListenerNamePlain = "plain"
 )
 
 // +genclient
@@ -78,7 +78,7 @@ type KafkaStatus struct {
 // KafkaStatusListener contains listener information.
 type KafkaStatusListener struct {
        BootstrapServers string `json:"bootstrapServers,omitempty"`
-       Type             string `json:"type,omitempty"`
+       Name             string `json:"name,omitempty"`
 }
 
 // +kubebuilder:object:root=true
diff --git a/addons/strimzi/strimzi.go b/addons/strimzi/strimzi.go
index fdd17fcd5..5d5c58895 100644
--- a/addons/strimzi/strimzi.go
+++ b/addons/strimzi/strimzi.go
@@ -32,6 +32,12 @@ import (
        "k8s.io/apimachinery/pkg/runtime/schema"
 )
 
+// camelKafka represent the configuration required by Camel Kafka component.
+type camelKafka struct {
+       topicName  string
+       properties map[string]string
+}
+
 // BindingProvider allows to connect to a Kafka topic via Binding.
 type BindingProvider struct {
        Client internalclientset.Interface
@@ -43,19 +49,77 @@ func (s BindingProvider) ID() string {
 
 func (s BindingProvider) Translate(ctx bindings.BindingContext, _ 
bindings.EndpointContext, endpoint camelv1.Endpoint) (*bindings.Binding, error) 
{
        if endpoint.Ref == nil {
-               // React only on refs
+               // IMPORTANT: just pass through if this provider cannot manage 
the binding. Another provider in the chain may take care or it.
                return nil, nil
        }
        gv, err := schema.ParseGroupVersion(endpoint.Ref.APIVersion)
        if err != nil {
                return nil, err
        }
-
-       if gv.Group != v1beta2.StrimziGroup || endpoint.Ref.Kind != 
v1beta2.StrimziKindTopic {
-               // Only operates on Strimzi Topics
+       if gv.Group != v1beta2.StrimziGroup {
+               // IMPORTANT: just pass through if this provider cannot manage 
the binding. Another provider in the chain may take care or it.
                return nil, nil
        }
 
+       camelKafka, err := s.toCamelKafka(ctx, endpoint)
+       if err != nil {
+               return nil, err
+       }
+       kafkaURI := fmt.Sprintf("kafka:%s", camelKafka.topicName)
+       kafkaURI = uri.AppendParameters(kafkaURI, camelKafka.properties)
+
+       return &bindings.Binding{
+               URI: kafkaURI,
+       }, nil
+}
+
+// toCamelKafka serialize an endpoint to a camelKafka struct.
+func (s BindingProvider) toCamelKafka(ctx bindings.BindingContext, endpoint 
camelv1.Endpoint) (*camelKafka, error) {
+       switch endpoint.Ref.Kind {
+       case v1beta2.StrimziKindKafkaCluster:
+               return s.fromKafkaToCamel(ctx, endpoint)
+       case v1beta2.StrimziKindTopic:
+               return s.fromKafkaTopicToCamel(ctx, endpoint)
+       }
+
+       return nil, fmt.Errorf("invalid endpoint kind. Can only work with %s or 
%s kind", v1beta2.StrimziKindKafkaCluster, v1beta2.StrimziKindTopic)
+}
+
+// Verify and transform a Kafka resource to Camel Kafka endpoint parameters.
+func (s BindingProvider) fromKafkaToCamel(ctx bindings.BindingContext, 
endpoint camelv1.Endpoint) (*camelKafka, error) {
+       props, err := endpoint.Properties.GetPropertyMap()
+       if err != nil {
+               return nil, err
+       }
+       if props == nil || props["topic"] == "" {
+               return nil, fmt.Errorf("invalid endpoint configuration: missing 
topic property")
+       }
+       topicName := props["topic"]
+       delete(props, "topic")
+       if props["brokers"] == "" {
+               // build the client if needed
+               if s.Client == nil {
+                       kafkaClient, err := 
internalclientset.NewForConfig(ctx.Client.GetConfig())
+                       if err != nil {
+                               return nil, err
+                       }
+                       s.Client = kafkaClient
+               }
+               bootstrapServers, err := s.getBootstrapServers(ctx, 
endpoint.Ref.Name)
+               if err != nil {
+                       return nil, err
+               }
+               props["brokers"] = bootstrapServers
+       }
+
+       return &camelKafka{
+               topicName:  topicName,
+               properties: props,
+       }, nil
+}
+
+// Verify and transform a KafkaTopic resource to Camel Kafka endpoint 
parameters.
+func (s BindingProvider) fromKafkaTopicToCamel(ctx bindings.BindingContext, 
endpoint camelv1.Endpoint) (*camelKafka, error) {
        props, err := endpoint.Properties.GetPropertyMap()
        if err != nil {
                return nil, err
@@ -63,7 +127,6 @@ func (s BindingProvider) Translate(ctx 
bindings.BindingContext, _ bindings.Endpo
        if props == nil {
                props = make(map[string]string)
        }
-
        if props["brokers"] == "" {
                // build the client if needed
                if s.Client == nil {
@@ -73,11 +136,11 @@ func (s BindingProvider) Translate(ctx 
bindings.BindingContext, _ bindings.Endpo
                        }
                        s.Client = kafkaClient
                }
-
-               // look them up
+               // look them up: first, look at the CR name
                topic, err := 
s.Client.KafkaV1beta2().KafkaTopics(ctx.Namespace).Get(ctx.Ctx, 
endpoint.Ref.Name, v1.GetOptions{})
                if err != nil {
                        if k8serrors.IsNotFound(err) {
+                               // if not found, then, look at the 
.status.topicName (it may be autogenerated)
                                topicList, err := 
s.Client.KafkaV1beta2().KafkaTopics(ctx.Namespace).List(ctx.Ctx, v1.ListOptions{
                                        FieldSelector: "status.topicName=" + 
endpoint.Ref.Name,
                                })
@@ -93,25 +156,20 @@ func (s BindingProvider) Translate(ctx 
bindings.BindingContext, _ bindings.Endpo
                                return nil, err
                        }
                }
-
                clusterName := topic.Labels[v1beta2.StrimziKafkaClusterLabel]
                if clusterName == "" {
                        return nil, fmt.Errorf("no %q label defined on topic 
%s", v1beta2.StrimziKafkaClusterLabel, endpoint.Ref.Name)
                }
-
                bootstrapServers, err := s.getBootstrapServers(ctx, clusterName)
                if err != nil {
                        return nil, err
                }
-
                props["brokers"] = bootstrapServers
        }
 
-       kafkaURI := fmt.Sprintf("kafka:%s", endpoint.Ref.Name)
-       kafkaURI = uri.AppendParameters(kafkaURI, props)
-
-       return &bindings.Binding{
-               URI: kafkaURI,
+       return &camelKafka{
+               topicName:  endpoint.Ref.Name,
+               properties: props,
        }, nil
 }
 
@@ -122,16 +180,16 @@ func (s BindingProvider) getBootstrapServers(ctx 
bindings.BindingContext, cluste
        }
 
        for _, l := range cluster.Status.Listeners {
-               if l.Type == v1beta2.StrimziListenerTypePlain {
+               if l.Name == v1beta2.StrimziListenerNamePlain {
                        if l.BootstrapServers == "" {
-                               return "", fmt.Errorf("cluster %q has no 
bootstrap servers in %q listener", clusterName, 
v1beta2.StrimziListenerTypePlain)
+                               return "", fmt.Errorf("cluster %q has no 
bootstrap servers in %q listener", clusterName, 
v1beta2.StrimziListenerNamePlain)
                        }
 
                        return l.BootstrapServers, nil
                }
        }
 
-       return "", fmt.Errorf("cluster %q has no listeners of type %q", 
clusterName, v1beta2.StrimziListenerTypePlain)
+       return "", fmt.Errorf("cluster %q has no listeners of name %q", 
clusterName, v1beta2.StrimziListenerNamePlain)
 }
 
 // Order --.
@@ -222,16 +280,16 @@ func (s V1alpha1BindingProvider) getBootstrapServers(ctx 
bindings.V1alpha1Bindin
        }
 
        for _, l := range cluster.Status.Listeners {
-               if l.Type == v1beta2.StrimziListenerTypePlain {
+               if l.Name == v1beta2.StrimziListenerNamePlain {
                        if l.BootstrapServers == "" {
-                               return "", fmt.Errorf("cluster %q has no 
bootstrap servers in %q listener", clusterName, 
v1beta2.StrimziListenerTypePlain)
+                               return "", fmt.Errorf("cluster %q has no 
bootstrap servers in %q listener", clusterName, 
v1beta2.StrimziListenerNamePlain)
                        }
 
                        return l.BootstrapServers, nil
                }
        }
 
-       return "", fmt.Errorf("cluster %q has no listeners of type %q", 
clusterName, v1beta2.StrimziListenerTypePlain)
+       return "", fmt.Errorf("cluster %q has no listeners of type %q", 
clusterName, v1beta2.StrimziListenerNamePlain)
 }
 
 // Order --.
diff --git a/addons/strimzi/strimzi_test.go b/addons/strimzi/strimzi_test.go
index 967cf83c4..d3e8a4d98 100644
--- a/addons/strimzi/strimzi_test.go
+++ b/addons/strimzi/strimzi_test.go
@@ -80,11 +80,11 @@ func TestStrimziLookup(t *testing.T) {
                Status: v1beta2.KafkaStatus{
                        Listeners: []v1beta2.KafkaStatusListener{
                                {
-                                       Type: "tls",
+                                       Name: "tls",
                                },
                                {
                                        BootstrapServers: 
"my-clusterx-kafka-bootstrap:9092",
-                                       Type:             "plain",
+                                       Name:             "plain",
                                },
                        },
                },
@@ -150,11 +150,11 @@ func TestStrimziLookupByTopicName(t *testing.T) {
                Status: v1beta2.KafkaStatus{
                        Listeners: []v1beta2.KafkaStatusListener{
                                {
-                                       Type: "tls",
+                                       Name: "tls",
                                },
                                {
                                        BootstrapServers: 
"my-clusterx-kafka-bootstrap:9092",
-                                       Type:             "plain",
+                                       Name:             "plain",
                                },
                        },
                },
@@ -200,3 +200,104 @@ func TestStrimziLookupByTopicName(t *testing.T) {
        assert.Equal(t, 
"kafka:my-topic-name?brokers=my-clusterx-kafka-bootstrap%3A9092", binding.URI)
        assert.Equal(t, camelv1.Traits{}, binding.Traits)
 }
+
+func TestStrimziKafkaCR(t *testing.T) {
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       cluster := v1beta2.Kafka{
+               ObjectMeta: metav1.ObjectMeta{
+                       Namespace: "test",
+                       Name:      "my-kafka",
+               },
+               Status: v1beta2.KafkaStatus{
+                       Listeners: []v1beta2.KafkaStatusListener{
+                               {
+                                       Name: "tls",
+                               },
+                               {
+                                       BootstrapServers: 
"my-clusterx-kafka-bootstrap:9092",
+                                       Name:             "plain",
+                               },
+                       },
+               },
+       }
+
+       client := fake.NewSimpleClientset(&cluster)
+       provider := BindingProvider{
+               Client: client,
+       }
+
+       bindingContext := bindings.BindingContext{
+               Ctx:       ctx,
+               Namespace: "test",
+               Profile:   camelv1.TraitProfileKubernetes,
+       }
+
+       endpoint := camelv1.Endpoint{
+               Ref: &v1.ObjectReference{
+                       Kind:       "Kafka",
+                       Name:       "my-kafka",
+                       APIVersion: "kafka.strimzi.io/v1beta2",
+               },
+               Properties: asEndpointProperties(map[string]string{
+                       "topic": "my-topic",
+               }),
+       }
+
+       binding, err := provider.Translate(bindingContext, 
bindings.EndpointContext{
+               Type: camelv1.EndpointTypeSink,
+       }, endpoint)
+       require.NoError(t, err)
+       assert.NotNil(t, binding)
+       assert.Equal(t, 
"kafka:my-topic?brokers=my-clusterx-kafka-bootstrap%3A9092", binding.URI)
+       assert.Equal(t, camelv1.Traits{}, binding.Traits)
+}
+
+func TestStrimziPassThrough(t *testing.T) {
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       cluster := v1beta2.Kafka{
+               ObjectMeta: metav1.ObjectMeta{
+                       Namespace: "test",
+                       Name:      "my-kafka",
+               },
+               Status: v1beta2.KafkaStatus{
+                       Listeners: []v1beta2.KafkaStatusListener{
+                               {
+                                       Name: "tls",
+                               },
+                               {
+                                       BootstrapServers: 
"my-clusterx-kafka-bootstrap:9092",
+                                       Name:             "plain",
+                               },
+                       },
+               },
+       }
+
+       client := fake.NewSimpleClientset(&cluster)
+       provider := BindingProvider{
+               Client: client,
+       }
+
+       bindingContext := bindings.BindingContext{
+               Ctx:       ctx,
+               Namespace: "test",
+               Profile:   camelv1.TraitProfileKubernetes,
+       }
+
+       endpoint := camelv1.Endpoint{
+               Ref: &v1.ObjectReference{
+                       Kind:       "AnotherKind",
+                       Name:       "my-kafka",
+                       APIVersion: "anotherApiVersion",
+               },
+       }
+
+       binding, err := provider.Translate(bindingContext, 
bindings.EndpointContext{
+               Type: camelv1.EndpointTypeSink,
+       }, endpoint)
+       require.NoError(t, err)
+       assert.Nil(t, binding)
+}

Reply via email to