This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new b3fd13c755e CAMEL-20403: Support Knative Broker reference in Pipe YAML
(#13078)
b3fd13c755e is described below
commit b3fd13c755efd97cf7a316c28f9774ac785c6274
Author: Christoph Deppisch <[email protected]>
AuthorDate: Sat Feb 10 10:07:26 2024 +0100
CAMEL-20403: Support Knative Broker reference in Pipe YAML (#13078)
- Pipes may reference a Knative broker as a source/sink
- Properly configure the Knative component endpoint URI on the resulting
route definition
- Make sure to always use Locale.ENGLISH when performing schema validation
in YAML DSL unit tests (avoids assertion errors due to internationalized error
messages when tests are run on a machine with different default Locale set e.g.
GERMAN)
---
.../camel/dsl/yaml/YamlRoutesBuilderLoader.java | 31 +++++-
.../apache/camel/dsl/yaml/PipeLoaderTest.groovy | 123 ++++++++++++++++++++-
.../camel/dsl/yaml/support/YamlTestSupport.groovy | 14 ++-
3 files changed, 158 insertions(+), 10 deletions(-)
diff --git
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/main/java/org/apache/camel/dsl/yaml/YamlRoutesBuilderLoader.java
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/main/java/org/apache/camel/dsl/yaml/YamlRoutesBuilderLoader.java
index 2a9c7ffd224..9b95c5fb9f7 100644
---
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/main/java/org/apache/camel/dsl/yaml/YamlRoutesBuilderLoader.java
+++
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/main/java/org/apache/camel/dsl/yaml/YamlRoutesBuilderLoader.java
@@ -105,7 +105,9 @@ public class YamlRoutesBuilderLoader extends
YamlRoutesBuilderLoaderSupport {
private static final String BINDING_VERSION = "camel.apache.org/v1alpha1";
private static final String PIPE_VERSION = "camel.apache.org/v1";
private static final String STRIMZI_VERSION = "kafka.strimzi.io/v1";
- private static final String KNATIVE_VERSION = "messaging.knative.dev/v1";
+ private static final String KNATIVE_MESSAGING_VERSION =
"messaging.knative.dev/v1";
+ private static final String KNATIVE_EVENTING_VERSION =
"eventing.knative.dev/v1";
+ private static final String KNATIVE_EVENT_TYPE = "org.apache.camel.event";
private final Map<String, Boolean> preparseDone = new
ConcurrentHashMap<>();
@@ -875,11 +877,17 @@ public class YamlRoutesBuilderLoader extends
YamlRoutesBuilderLoaderSupport {
boolean strimzi
= !kamelet && mn != null && anyTupleMatches(mn.getValue(),
"apiVersion", v -> v.startsWith(STRIMZI_VERSION))
&& anyTupleMatches(mn.getValue(), "kind",
"KafkaTopic");
- boolean knative
+ boolean knativeBroker
+ = !kamelet && mn != null
+ && anyTupleMatches(mn.getValue(), "apiVersion", v ->
v.startsWith(KNATIVE_EVENTING_VERSION))
+ && anyTupleMatches(mn.getValue(), "kind", "Broker");
+ boolean knativeChannel
= !kamelet && !strimzi && mn != null
- && anyTupleMatches(mn.getValue(), "apiVersion", v ->
v.startsWith(KNATIVE_VERSION));
+ && anyTupleMatches(mn.getValue(), "apiVersion", v ->
v.startsWith(KNATIVE_MESSAGING_VERSION));
String uri;
- if (kamelet || strimzi || knative) {
+ if (knativeBroker) {
+ uri = KNATIVE_EVENT_TYPE;
+ } else if (kamelet || strimzi || knativeChannel) {
uri = extractTupleValue(mn.getValue(), "name");
} else {
uri = extractTupleValue(node.getValue(), "uri");
@@ -888,6 +896,12 @@ public class YamlRoutesBuilderLoader extends
YamlRoutesBuilderLoaderSupport {
// properties
MappingNode prop = asMappingNode(nodeAt(node, "/properties"));
Map<String, Object> params = asMap(prop);
+
+ if (knativeBroker && params != null && params.containsKey("type")) {
+ // Use explicit event type from properties - remove setting from
params and set as uri
+ uri = params.remove("type").toString();
+ }
+
if (params != null && !params.isEmpty()) {
String query = URISupport.createQueryString(params);
uri = uri + "?" + query;
@@ -897,7 +911,14 @@ public class YamlRoutesBuilderLoader extends
YamlRoutesBuilderLoaderSupport {
return "kamelet:" + uri;
} else if (strimzi) {
return "kafka:" + uri;
- } else if (knative) {
+ } else if (knativeBroker) {
+ if (uri.contains("?")) {
+ uri += "&kind=Broker&name=" + extractTupleValue(mn.getValue(),
"name");
+ } else {
+ uri += "?kind=Broker&name=" + extractTupleValue(mn.getValue(),
"name");
+ }
+ return "knative:event/" + uri;
+ } else if (knativeChannel) {
return "knative:channel/" + uri;
} else {
return uri;
diff --git
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/PipeLoaderTest.groovy
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/PipeLoaderTest.groovy
index da3cbd39252..e917ed7c719 100644
---
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/PipeLoaderTest.groovy
+++
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/PipeLoaderTest.groovy
@@ -516,7 +516,7 @@ class PipeLoaderTest extends YamlTestSupport {
}
}
- def "Pipe from kamelet to knative"() {
+ def "Pipe from kamelet to knative channel"() {
when:
// stub knative for testing as it requires to setup connection to a
real knative broker
@@ -555,6 +555,127 @@ class PipeLoaderTest extends YamlTestSupport {
}
}
+ def "Pipe from knative channel to kamelet"() {
+ when:
+
+ // stub knative for testing as it requires to setup connection to a
real knative broker
+ context.removeComponent("knative")
+ context.addComponent("knative", context.getComponent("stub"))
+
+ loadBindings('''
+ apiVersion: camel.apache.org/v1
+ kind: Pipe
+ metadata:
+ name: knative-event-source
+ spec:
+ source:
+ ref:
+ kind: InMemoryChannel
+ apiVersion: messaging.knative.dev/v1
+ name: my-messages
+ sink:
+ ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1
+ name: log-sink
+ properties:
+ showHeaders: true
+ ''')
+ then:
+ context.routeDefinitions.size() == 2
+
+ with (context.routeDefinitions[0]) {
+ routeId == 'knative-event-source'
+ input.endpointUri == 'knative:channel/my-messages'
+ outputs.size() == 1
+ with (outputs[0], ToDefinition) {
+ endpointUri == 'kamelet:log-sink?showHeaders=true'
+ }
+ }
+ }
+
+ def "Pipe from kamelet to knative broker"() {
+ when:
+
+ // stub knative for testing as it requires to setup connection to a
real knative broker
+ context.removeComponent("knative")
+ context.addComponent("knative", context.getComponent("stub"))
+
+ loadBindings('''
+ apiVersion: camel.apache.org/v1
+ kind: Pipe
+ metadata:
+ name: timer-event-source
+ spec:
+ source:
+ ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1
+ name: timer-source
+ properties:
+ message: "Hello world!"
+ sink:
+ ref:
+ kind: Broker
+ apiVersion: eventing.knative.dev/v1
+ name: foo-broker
+ properties:
+ type: org.apache.camel.event.messages
+ ''')
+ then:
+ context.routeDefinitions.size() == 2
+
+ with (context.routeDefinitions[0]) {
+ routeId == 'timer-event-source'
+ input.endpointUri == 'kamelet:timer-source?message=Hello+world%21'
+ outputs.size() == 1
+ with (outputs[0], ToDefinition) {
+ endpointUri ==
'knative:event/org.apache.camel.event.messages?kind=Broker&name=foo-broker'
+ }
+ }
+ }
+
+ def "Pipe from knative broker to kamelet"() {
+ when:
+
+ // stub knative for testing as it requires to setup connection to a
real knative broker
+ context.removeComponent("knative")
+ context.addComponent("knative", context.getComponent("stub"))
+
+ loadBindings('''
+ apiVersion: camel.apache.org/v1
+ kind: Pipe
+ metadata:
+ name: knative-event-source
+ spec:
+ source:
+ ref:
+ kind: Broker
+ apiVersion: eventing.knative.dev/v1
+ name: foo-broker
+ properties:
+ type: org.apache.camel.event.messages
+ sink:
+ ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1
+ name: log-sink
+ properties:
+ showHeaders: true
+ ''')
+ then:
+ context.routeDefinitions.size() == 2
+
+ with (context.routeDefinitions[0]) {
+ routeId == 'knative-event-source'
+ input.endpointUri ==
'knative:event/org.apache.camel.event.messages?kind=Broker&name=foo-broker'
+ outputs.size() == 1
+ with (outputs[0], ToDefinition) {
+ endpointUri == 'kamelet:log-sink?showHeaders=true'
+ }
+ }
+ }
+
def "kamelet start route"() {
when:
loadBindings('''
diff --git
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/support/YamlTestSupport.groovy
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/support/YamlTestSupport.groovy
index c98f84813f4..0114841fe4d 100644
---
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/support/YamlTestSupport.groovy
+++
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/support/YamlTestSupport.groovy
@@ -18,7 +18,8 @@ package org.apache.camel.dsl.yaml.support
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
-import com.networknt.schema.JsonSchemaFactory;
+import com.networknt.schema.JsonSchemaFactory
+import com.networknt.schema.SchemaValidatorsConfig
import com.networknt.schema.SpecVersionDetector
import groovy.util.logging.Slf4j
import org.apache.camel.CamelContext
@@ -40,9 +41,14 @@ import java.nio.charset.StandardCharsets
@Slf4j
class YamlTestSupport extends Specification implements HasCamelContext {
static def MAPPER = new ObjectMapper(new YAMLFactory())
- static def SCHEMA_NODE =
MAPPER.readTree(ResourceHelper.getResourceAsStream('/schema/camelYamlDsl.json'));
- static def FACTORY =
JsonSchemaFactory.getInstance(SpecVersionDetector.detect(SCHEMA_NODE));
- static def SCHEMA = FACTORY.getSchema(SCHEMA_NODE);
+ static def SCHEMA_NODE =
MAPPER.readTree(ResourceHelper.getResourceAsStream('/schema/camelYamlDsl.json'))
+ static def FACTORY =
JsonSchemaFactory.getInstance(SpecVersionDetector.detect(SCHEMA_NODE))
+ static def SCHEMA_VALIDATORS_CONFIG = {
+ SchemaValidatorsConfig config = new SchemaValidatorsConfig()
+ config.setLocale(Locale.ENGLISH)
+ return config
+ }()
+ static def SCHEMA = FACTORY.getSchema(SCHEMA_NODE,
SCHEMA_VALIDATORS_CONFIG)
@AutoCleanup
def context = new DefaultCamelContext()