This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 93af946 CAMEL-17228: Route reload issue when using Kamelet EIP.
93af946 is described below
commit 93af946d6927e3f11b6493b5c8fee95c79ff3b9b
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Nov 25 10:45:02 2021 +0100
CAMEL-17228: Route reload issue when using Kamelet EIP.
---
.../apache/camel/component/kamelet/KameletConsumer.java | 8 +++++---
.../src/main/java/org/apache/camel/CamelContext.java | 8 ++++++++
.../org/apache/camel/impl/engine/SimpleCamelContext.java | 5 +++++
.../java/org/apache/camel/impl/DefaultCamelContext.java | 16 ++++++++++++++++
.../main/java/org/apache/camel/impl/DefaultModel.java | 10 ++++++++++
.../apache/camel/impl/lw/LightweightCamelContext.java | 10 ++++++++++
.../camel/impl/lw/LightweightRuntimeCamelContext.java | 5 +++++
.../src/main/java/org/apache/camel/model/Model.java | 8 ++++++++
.../apache/camel/support/RouteWatcherReloadStrategy.java | 3 +++
.../src/main/java/org/apache/camel/main/KameletMain.java | 2 +-
.../src/test/resources/my-camel-k.yaml | 2 --
11 files changed, 71 insertions(+), 6 deletions(-)
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java
index 15a02cd..d875701 100644
---
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java
@@ -19,11 +19,13 @@ package org.apache.camel.component.kamelet;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.Suspendable;
+import org.apache.camel.spi.InflightRepository;
import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.support.DefaultConsumer;
final class KameletConsumer extends DefaultConsumer implements ShutdownAware,
Suspendable {
+ private final InflightRepository inflight;
private final KameletComponent component;
private final String key;
@@ -31,6 +33,7 @@ final class KameletConsumer extends DefaultConsumer
implements ShutdownAware, Su
super(endpoint, processor);
this.component = endpoint.getComponent();
this.key = key;
+ this.inflight = endpoint.getCamelContext().getInflightRepository();
}
@Override
@@ -71,9 +74,8 @@ final class KameletConsumer extends DefaultConsumer
implements ShutdownAware, Su
@Override
public int getPendingExchangesSize() {
- // return 0 as we do not have an internal memory queue with a variable
- // size of inflight messages.
- return 0;
+ // capture the inflight counter from the route
+ return inflight.size(getRouteId());
}
@Override
diff --git a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
index 49c5bee..436c270 100644
--- a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
@@ -635,6 +635,14 @@ public interface CamelContext extends
CamelContextLifecycle, RuntimeConfiguratio
throws Exception;
/**
+ * Removes the route templates matching the pattern
+ *
+ * @param pattern pattern, such as * for all, or foo* to remove all foo
templates
+ * @throws Exception is thrown if error during removing route templates
+ */
+ void removeRouteTemplates(String pattern) throws Exception;
+
+ /**
* Adds the given route policy factory
*
* @param routePolicyFactory the factory
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
index 11e0f67..9dee233 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
@@ -631,6 +631,11 @@ public class SimpleCamelContext extends
AbstractCamelContext {
}
@Override
+ public void removeRouteTemplates(String pattern) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public String getTestExcludeRoutes() {
throw new UnsupportedOperationException();
}
diff --git
a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 5b88933..d66704f 100644
---
a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++
b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -408,6 +408,14 @@ public class DefaultCamelContext extends
SimpleCamelContext implements ModelCame
}
@Override
+ public void removeRouteTemplateDefinitions(String pattern) throws
Exception {
+ if (model == null && isLightweight()) {
+ throw new IllegalStateException("Access to model not supported in
lightweight mode");
+ }
+ model.removeRouteTemplateDefinitions(pattern);
+ }
+
+ @Override
public void addRouteTemplateDefinitionConverter(String templateIdPattern,
RouteTemplateDefinition.Converter converter) {
if (model == null && isLightweight()) {
throw new IllegalStateException("Access to model not supported in
lightweight mode");
@@ -434,6 +442,14 @@ public class DefaultCamelContext extends
SimpleCamelContext implements ModelCame
}
@Override
+ public void removeRouteTemplates(String pattern) throws Exception {
+ if (model == null && isLightweight()) {
+ throw new IllegalStateException("Access to model not supported in
lightweight mode");
+ }
+ model.removeRouteTemplateDefinitions(pattern);
+ }
+
+ @Override
public List<RestDefinition> getRestDefinitions() {
if (model == null && isLightweight()) {
throw new IllegalStateException("Access to model not supported in
lightweight mode");
diff --git
a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java
b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java
index b658b6b..2ff0362 100644
---
a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java
+++
b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java
@@ -65,6 +65,7 @@ import org.apache.camel.spi.RouteTemplateLoaderListener;
import org.apache.camel.spi.RouteTemplateParameterSource;
import org.apache.camel.spi.ScriptingLanguage;
import org.apache.camel.support.CamelContextHelper;
+import org.apache.camel.support.PatternHelper;
import org.apache.camel.support.PropertyBindingSupport;
import org.apache.camel.support.RouteTemplateHelper;
import org.apache.camel.support.ScriptHelper;
@@ -214,6 +215,15 @@ public class DefaultModel implements Model {
}
@Override
+ public synchronized void removeRouteTemplateDefinitions(String pattern)
throws Exception {
+ for (RouteTemplateDefinition def : new
ArrayList<>(routeTemplateDefinitions)) {
+ if (PatternHelper.matchPattern(def.getId(), pattern)) {
+ removeRouteTemplateDefinition(def);
+ }
+ }
+ }
+
+ @Override
public synchronized List<RouteDefinition> getRouteDefinitions() {
return routeDefinitions;
}
diff --git
a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
index d73e066..1ed731c 100644
---
a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
+++
b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
@@ -1124,6 +1124,11 @@ public class LightweightCamelContext implements
ExtendedCamelContext, CatalogCam
delegate.setAutowiredEnabled(autowiredEnabled);
}
+ @Override
+ public void removeRouteTemplates(String pattern) throws Exception {
+ delegate.removeRouteTemplates(pattern);
+ }
+
//
// ExtendedCamelContext
//
@@ -1796,6 +1801,11 @@ public class LightweightCamelContext implements
ExtendedCamelContext, CatalogCam
}
@Override
+ public void removeRouteTemplateDefinitions(String pattern) throws
Exception {
+ getModelCamelContext().removeRouteTemplateDefinitions(pattern);
+ }
+
+ @Override
public List<RestDefinition> getRestDefinitions() {
return getModelCamelContext().getRestDefinitions();
}
diff --git
a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
index d9f7f3b..910c19d 100644
---
a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
+++
b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
@@ -1941,6 +1941,11 @@ public class LightweightRuntimeCamelContext implements
ExtendedCamelContext, Cat
}
@Override
+ public void removeRouteTemplates(String pattern) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void setLightweight(boolean lightweight) {
throw new UnsupportedOperationException();
}
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/model/Model.java
b/core/camel-core-model/src/main/java/org/apache/camel/model/Model.java
index 225965d..6d3ca19 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/Model.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/Model.java
@@ -179,6 +179,14 @@ public interface Model {
void removeRouteTemplateDefinition(RouteTemplateDefinition
routeTemplateDefinition) throws Exception;
/**
+ * Removes the route templates matching the pattern - stopping any
previously running routes if any of them are
+ * actively running
+ *
+ * @param pattern pattern, such as * for all, or foo* to remove all foo
templates
+ */
+ void removeRouteTemplateDefinitions(String pattern) throws Exception;
+
+ /**
* Add a converter to translate a {@link RouteTemplateDefinition} to a
{@link RouteDefinition}.
*
* @param templateIdPattern the route template ut to whom a pattern should
eb applied
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/RouteWatcherReloadStrategy.java
b/core/camel-support/src/main/java/org/apache/camel/support/RouteWatcherReloadStrategy.java
index 89e56b6..25443ef 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/RouteWatcherReloadStrategy.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/RouteWatcherReloadStrategy.java
@@ -123,6 +123,9 @@ public class RouteWatcherReloadStrategy extends
FileWatcherResourceReloadStrateg
for (Route route : getCamelContext().getRoutes()) {
getCamelContext().removeRoute(route.getRouteId());
}
+ // remove left-over route templates and endpoints, so
we can start on a fresh
+ getCamelContext().removeRouteTemplates("*");
+ getCamelContext().getEndpointRegistry().clear();
}
Set<String> ids
=
getCamelContext().adapt(ExtendedCamelContext.class).getRoutesLoader().updateRoutes(resource);
diff --git
a/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/KameletMain.java
b/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/KameletMain.java
index 93dcc81..bb655ff 100644
---
a/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/KameletMain.java
+++
b/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/KameletMain.java
@@ -171,7 +171,7 @@ public class KameletMain extends MainCommandLineSupport {
* Sets initial properties that are specific to camel-kamelet-main
*/
protected void configureInitialProperties() {
- addInitialProperty("camel.component.kamelet.location",
"classpath:/kamelets,github:apache:camel-kamelets");
+ addInitialProperty("camel.component.kamelet.location",
"classpath:/kamelets,github:apache:camel-kamelets/kamelets");
}
}
diff --git a/dsl/camel-kamelet-main/src/test/resources/my-camel-k.yaml
b/dsl/camel-kamelet-main/src/test/resources/my-camel-k.yaml
index 60b805e..ade286b 100644
--- a/dsl/camel-kamelet-main/src/test/resources/my-camel-k.yaml
+++ b/dsl/camel-kamelet-main/src/test/resources/my-camel-k.yaml
@@ -27,8 +27,6 @@ spec:
message: Hello Camel K
period: 1000
steps:
-# this works
-# - to: "kamelet:log-sink?showHeaders=false"
- kamelet:
name: log-sink
parameters: