This is an automated email from the ASF dual-hosted git repository.
pcongiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git
The following commit(s) were added to refs/heads/main by this push:
new 356693ea6 feat(kafka): support Kafka binding resource
356693ea6 is described below
commit 356693ea6dadf21373e2b39dc0c90a7b12c5097f
Author: Pasquale Congiusti <[email protected]>
AuthorDate: Tue Mar 26 11:19:39 2024 +0100
feat(kafka): support Kafka binding resource
* Added logic to serialize a Camel Kafka endpoint uri from a Kafka or a
KafkaTopic
* Updated the `.status.listener` type to name, as in v1beta2, the status
don't report type any longer
Closes #4337
---
addons/strimzi/duck/v1beta2/duck_types.go | 4 +-
addons/strimzi/strimzi.go | 100 +++++++++++++++++++++------
addons/strimzi/strimzi_test.go | 109 ++++++++++++++++++++++++++++--
3 files changed, 186 insertions(+), 27 deletions(-)
diff --git a/addons/strimzi/duck/v1beta2/duck_types.go
b/addons/strimzi/duck/v1beta2/duck_types.go
index 936c76b33..620affa00 100644
--- a/addons/strimzi/duck/v1beta2/duck_types.go
+++ b/addons/strimzi/duck/v1beta2/duck_types.go
@@ -28,7 +28,7 @@ const (
StrimziKafkaClusterLabel = "strimzi.io/cluster"
- StrimziListenerTypePlain = "plain"
+ StrimziListenerNamePlain = "plain"
)
// +genclient
@@ -78,7 +78,7 @@ type KafkaStatus struct {
// KafkaStatusListener contains listener information.
type KafkaStatusListener struct {
BootstrapServers string `json:"bootstrapServers,omitempty"`
- Type string `json:"type,omitempty"`
+ Name string `json:"name,omitempty"`
}
// +kubebuilder:object:root=true
diff --git a/addons/strimzi/strimzi.go b/addons/strimzi/strimzi.go
index fdd17fcd5..5d5c58895 100644
--- a/addons/strimzi/strimzi.go
+++ b/addons/strimzi/strimzi.go
@@ -32,6 +32,12 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
)
+// camelKafka represent the configuration required by Camel Kafka component.
+type camelKafka struct {
+ topicName string
+ properties map[string]string
+}
+
// BindingProvider allows to connect to a Kafka topic via Binding.
type BindingProvider struct {
Client internalclientset.Interface
@@ -43,19 +49,77 @@ func (s BindingProvider) ID() string {
func (s BindingProvider) Translate(ctx bindings.BindingContext, _
bindings.EndpointContext, endpoint camelv1.Endpoint) (*bindings.Binding, error)
{
if endpoint.Ref == nil {
- // React only on refs
+ // IMPORTANT: just pass through if this provider cannot manage
the binding. Another provider in the chain may take care or it.
return nil, nil
}
gv, err := schema.ParseGroupVersion(endpoint.Ref.APIVersion)
if err != nil {
return nil, err
}
-
- if gv.Group != v1beta2.StrimziGroup || endpoint.Ref.Kind !=
v1beta2.StrimziKindTopic {
- // Only operates on Strimzi Topics
+ if gv.Group != v1beta2.StrimziGroup {
+ // IMPORTANT: just pass through if this provider cannot manage
the binding. Another provider in the chain may take care or it.
return nil, nil
}
+ camelKafka, err := s.toCamelKafka(ctx, endpoint)
+ if err != nil {
+ return nil, err
+ }
+ kafkaURI := fmt.Sprintf("kafka:%s", camelKafka.topicName)
+ kafkaURI = uri.AppendParameters(kafkaURI, camelKafka.properties)
+
+ return &bindings.Binding{
+ URI: kafkaURI,
+ }, nil
+}
+
+// toCamelKafka serialize an endpoint to a camelKafka struct.
+func (s BindingProvider) toCamelKafka(ctx bindings.BindingContext, endpoint
camelv1.Endpoint) (*camelKafka, error) {
+ switch endpoint.Ref.Kind {
+ case v1beta2.StrimziKindKafkaCluster:
+ return s.fromKafkaToCamel(ctx, endpoint)
+ case v1beta2.StrimziKindTopic:
+ return s.fromKafkaTopicToCamel(ctx, endpoint)
+ }
+
+ return nil, fmt.Errorf("invalid endpoint kind. Can only work with %s or
%s kind", v1beta2.StrimziKindKafkaCluster, v1beta2.StrimziKindTopic)
+}
+
+// Verify and transform a Kafka resource to Camel Kafka endpoint parameters.
+func (s BindingProvider) fromKafkaToCamel(ctx bindings.BindingContext,
endpoint camelv1.Endpoint) (*camelKafka, error) {
+ props, err := endpoint.Properties.GetPropertyMap()
+ if err != nil {
+ return nil, err
+ }
+ if props == nil || props["topic"] == "" {
+ return nil, fmt.Errorf("invalid endpoint configuration: missing
topic property")
+ }
+ topicName := props["topic"]
+ delete(props, "topic")
+ if props["brokers"] == "" {
+ // build the client if needed
+ if s.Client == nil {
+ kafkaClient, err :=
internalclientset.NewForConfig(ctx.Client.GetConfig())
+ if err != nil {
+ return nil, err
+ }
+ s.Client = kafkaClient
+ }
+ bootstrapServers, err := s.getBootstrapServers(ctx,
endpoint.Ref.Name)
+ if err != nil {
+ return nil, err
+ }
+ props["brokers"] = bootstrapServers
+ }
+
+ return &camelKafka{
+ topicName: topicName,
+ properties: props,
+ }, nil
+}
+
+// Verify and transform a KafkaTopic resource to Camel Kafka endpoint
parameters.
+func (s BindingProvider) fromKafkaTopicToCamel(ctx bindings.BindingContext,
endpoint camelv1.Endpoint) (*camelKafka, error) {
props, err := endpoint.Properties.GetPropertyMap()
if err != nil {
return nil, err
@@ -63,7 +127,6 @@ func (s BindingProvider) Translate(ctx
bindings.BindingContext, _ bindings.Endpo
if props == nil {
props = make(map[string]string)
}
-
if props["brokers"] == "" {
// build the client if needed
if s.Client == nil {
@@ -73,11 +136,11 @@ func (s BindingProvider) Translate(ctx
bindings.BindingContext, _ bindings.Endpo
}
s.Client = kafkaClient
}
-
- // look them up
+ // look them up: first, look at the CR name
topic, err :=
s.Client.KafkaV1beta2().KafkaTopics(ctx.Namespace).Get(ctx.Ctx,
endpoint.Ref.Name, v1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
+ // if not found, then, look at the
.status.topicName (it may be autogenerated)
topicList, err :=
s.Client.KafkaV1beta2().KafkaTopics(ctx.Namespace).List(ctx.Ctx, v1.ListOptions{
FieldSelector: "status.topicName=" +
endpoint.Ref.Name,
})
@@ -93,25 +156,20 @@ func (s BindingProvider) Translate(ctx
bindings.BindingContext, _ bindings.Endpo
return nil, err
}
}
-
clusterName := topic.Labels[v1beta2.StrimziKafkaClusterLabel]
if clusterName == "" {
return nil, fmt.Errorf("no %q label defined on topic
%s", v1beta2.StrimziKafkaClusterLabel, endpoint.Ref.Name)
}
-
bootstrapServers, err := s.getBootstrapServers(ctx, clusterName)
if err != nil {
return nil, err
}
-
props["brokers"] = bootstrapServers
}
- kafkaURI := fmt.Sprintf("kafka:%s", endpoint.Ref.Name)
- kafkaURI = uri.AppendParameters(kafkaURI, props)
-
- return &bindings.Binding{
- URI: kafkaURI,
+ return &camelKafka{
+ topicName: endpoint.Ref.Name,
+ properties: props,
}, nil
}
@@ -122,16 +180,16 @@ func (s BindingProvider) getBootstrapServers(ctx
bindings.BindingContext, cluste
}
for _, l := range cluster.Status.Listeners {
- if l.Type == v1beta2.StrimziListenerTypePlain {
+ if l.Name == v1beta2.StrimziListenerNamePlain {
if l.BootstrapServers == "" {
- return "", fmt.Errorf("cluster %q has no
bootstrap servers in %q listener", clusterName,
v1beta2.StrimziListenerTypePlain)
+ return "", fmt.Errorf("cluster %q has no
bootstrap servers in %q listener", clusterName,
v1beta2.StrimziListenerNamePlain)
}
return l.BootstrapServers, nil
}
}
- return "", fmt.Errorf("cluster %q has no listeners of type %q",
clusterName, v1beta2.StrimziListenerTypePlain)
+ return "", fmt.Errorf("cluster %q has no listeners of name %q",
clusterName, v1beta2.StrimziListenerNamePlain)
}
// Order --.
@@ -222,16 +280,16 @@ func (s V1alpha1BindingProvider) getBootstrapServers(ctx
bindings.V1alpha1Bindin
}
for _, l := range cluster.Status.Listeners {
- if l.Type == v1beta2.StrimziListenerTypePlain {
+ if l.Name == v1beta2.StrimziListenerNamePlain {
if l.BootstrapServers == "" {
- return "", fmt.Errorf("cluster %q has no
bootstrap servers in %q listener", clusterName,
v1beta2.StrimziListenerTypePlain)
+ return "", fmt.Errorf("cluster %q has no
bootstrap servers in %q listener", clusterName,
v1beta2.StrimziListenerNamePlain)
}
return l.BootstrapServers, nil
}
}
- return "", fmt.Errorf("cluster %q has no listeners of type %q",
clusterName, v1beta2.StrimziListenerTypePlain)
+ return "", fmt.Errorf("cluster %q has no listeners of type %q",
clusterName, v1beta2.StrimziListenerNamePlain)
}
// Order --.
diff --git a/addons/strimzi/strimzi_test.go b/addons/strimzi/strimzi_test.go
index 967cf83c4..d3e8a4d98 100644
--- a/addons/strimzi/strimzi_test.go
+++ b/addons/strimzi/strimzi_test.go
@@ -80,11 +80,11 @@ func TestStrimziLookup(t *testing.T) {
Status: v1beta2.KafkaStatus{
Listeners: []v1beta2.KafkaStatusListener{
{
- Type: "tls",
+ Name: "tls",
},
{
BootstrapServers:
"my-clusterx-kafka-bootstrap:9092",
- Type: "plain",
+ Name: "plain",
},
},
},
@@ -150,11 +150,11 @@ func TestStrimziLookupByTopicName(t *testing.T) {
Status: v1beta2.KafkaStatus{
Listeners: []v1beta2.KafkaStatusListener{
{
- Type: "tls",
+ Name: "tls",
},
{
BootstrapServers:
"my-clusterx-kafka-bootstrap:9092",
- Type: "plain",
+ Name: "plain",
},
},
},
@@ -200,3 +200,104 @@ func TestStrimziLookupByTopicName(t *testing.T) {
assert.Equal(t,
"kafka:my-topic-name?brokers=my-clusterx-kafka-bootstrap%3A9092", binding.URI)
assert.Equal(t, camelv1.Traits{}, binding.Traits)
}
+
+func TestStrimziKafkaCR(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ cluster := v1beta2.Kafka{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: "test",
+ Name: "my-kafka",
+ },
+ Status: v1beta2.KafkaStatus{
+ Listeners: []v1beta2.KafkaStatusListener{
+ {
+ Name: "tls",
+ },
+ {
+ BootstrapServers:
"my-clusterx-kafka-bootstrap:9092",
+ Name: "plain",
+ },
+ },
+ },
+ }
+
+ client := fake.NewSimpleClientset(&cluster)
+ provider := BindingProvider{
+ Client: client,
+ }
+
+ bindingContext := bindings.BindingContext{
+ Ctx: ctx,
+ Namespace: "test",
+ Profile: camelv1.TraitProfileKubernetes,
+ }
+
+ endpoint := camelv1.Endpoint{
+ Ref: &v1.ObjectReference{
+ Kind: "Kafka",
+ Name: "my-kafka",
+ APIVersion: "kafka.strimzi.io/v1beta2",
+ },
+ Properties: asEndpointProperties(map[string]string{
+ "topic": "my-topic",
+ }),
+ }
+
+ binding, err := provider.Translate(bindingContext,
bindings.EndpointContext{
+ Type: camelv1.EndpointTypeSink,
+ }, endpoint)
+ require.NoError(t, err)
+ assert.NotNil(t, binding)
+ assert.Equal(t,
"kafka:my-topic?brokers=my-clusterx-kafka-bootstrap%3A9092", binding.URI)
+ assert.Equal(t, camelv1.Traits{}, binding.Traits)
+}
+
+func TestStrimziPassThrough(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ cluster := v1beta2.Kafka{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: "test",
+ Name: "my-kafka",
+ },
+ Status: v1beta2.KafkaStatus{
+ Listeners: []v1beta2.KafkaStatusListener{
+ {
+ Name: "tls",
+ },
+ {
+ BootstrapServers:
"my-clusterx-kafka-bootstrap:9092",
+ Name: "plain",
+ },
+ },
+ },
+ }
+
+ client := fake.NewSimpleClientset(&cluster)
+ provider := BindingProvider{
+ Client: client,
+ }
+
+ bindingContext := bindings.BindingContext{
+ Ctx: ctx,
+ Namespace: "test",
+ Profile: camelv1.TraitProfileKubernetes,
+ }
+
+ endpoint := camelv1.Endpoint{
+ Ref: &v1.ObjectReference{
+ Kind: "AnotherKind",
+ Name: "my-kafka",
+ APIVersion: "anotherApiVersion",
+ },
+ }
+
+ binding, err := provider.Translate(bindingContext,
bindings.EndpointContext{
+ Type: camelv1.EndpointTypeSink,
+ }, endpoint)
+ require.NoError(t, err)
+ assert.Nil(t, binding)
+}