This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 1d78fb2 CAMEL-17065: Fix cluster view release on all routes stop. Add
autoStartup check before starting route. (#6238)
1d78fb2 is described below
commit 1d78fb2958dd8e60662d5726de67eaa20a6053cf
Author: iliya-gr <[email protected]>
AuthorDate: Tue Oct 12 07:16:57 2021 +0300
CAMEL-17065: Fix cluster view release on all routes stop. Add autoStartup
check before starting route. (#6238)
---
.../camel/impl/cluster/ClusteredRoutePolicy.java | 17 ++++-
.../camel/cluster/ClusteredRoutePolicyTest.java | 27 +++++++
...redRoutePolicyUnmanagedClusterServiceTest.java} | 89 ++++------------------
3 files changed, 58 insertions(+), 75 deletions(-)
diff --git
a/core/camel-cluster/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java
b/core/camel-cluster/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java
index 9d57b71..bcbcf05 100644
---
a/core/camel-cluster/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java
+++
b/core/camel-cluster/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java
@@ -53,6 +53,7 @@ public final class ClusteredRoutePolicy extends
RoutePolicySupport implements Ca
private static final Logger LOG =
LoggerFactory.getLogger(ClusteredRoutePolicy.class);
private final AtomicBoolean leader;
+ private final Set<Route> autoStartupRoutes;
private final Set<Route> startedRoutes;
private final Set<Route> stoppedRoutes;
private final ReferenceCount refCount;
@@ -84,6 +85,7 @@ public final class ClusteredRoutePolicy extends
RoutePolicySupport implements Ca
this.stoppedRoutes = new HashSet<>();
this.startedRoutes = new HashSet<>();
+ this.autoStartupRoutes = new HashSet<>();
this.leader = new AtomicBoolean();
this.contextStarted = new AtomicBoolean();
this.initialDelay = Duration.ofMillis(0);
@@ -178,6 +180,10 @@ public final class ClusteredRoutePolicy extends
RoutePolicySupport implements Ca
this.refCount.retain();
+ if (route.isAutoStartup()) {
+ autoStartupRoutes.add(route);
+ }
+
if (camelContext.isStarted() && isLeader()) {
// when camel context is already started, and we add new routes
// then let the route controller start the route as usual (no need
to mark as auto startup false)
@@ -214,7 +220,12 @@ public final class ClusteredRoutePolicy extends
RoutePolicySupport implements Ca
}
@Override
- public void doShutdown() throws Exception {
+ public void onRemove(Route route) {
+ autoStartupRoutes.remove(route);
+ }
+
+ @Override
+ public void onStop(Route route) {
this.refCount.release();
}
@@ -266,7 +277,9 @@ public final class ClusteredRoutePolicy extends
RoutePolicySupport implements Ca
try {
for (Route route : stoppedRoutes) {
ServiceStatus status = getStatus(route);
- if (status != null && status.isStartable()) {
+ boolean autostart = autoStartupRoutes.contains(route);
+
+ if (status != null && status.isStartable() && autostart) {
LOG.debug("Starting route '{}'", route.getId());
camelContext.getRouteController().startRoute(route.getId());
diff --git
a/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyTest.java
b/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyTest.java
index e8b8fa4..97c92d8 100644
---
a/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyTest.java
@@ -31,6 +31,7 @@ import
org.apache.camel.support.cluster.AbstractCamelClusterView;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
public class ClusteredRoutePolicyTest extends ContextTestSupport {
@@ -67,6 +68,23 @@ public class ClusteredRoutePolicyTest extends
ContextTestSupport {
}
@Test
+ public void testClusteredRoutePolicyStopAllRoutes() throws Exception {
+ cs.getView().setLeader(true);
+
+ context.getRouteController().stopRoute("foo");
+ context.getRouteController().stopRoute("baz");
+
+ assertFalse(cs.getView().isRunning());
+ }
+
+ @Test
+ public void testClusteredRoutePolicyDontStartAutoStartFalseRoutes() throws
Exception {
+ cs.getView().setLeader(true);
+
+ assertEquals(ServiceStatus.Stopped,
context.getRouteController().getRouteStatus("baz"));
+ }
+
+ @Test
public void testClusteredRoutePolicyAddRoute() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
@@ -129,6 +147,8 @@ public class ClusteredRoutePolicyTest extends
ContextTestSupport {
public void configure() throws Exception {
from("seda:foo").routeId("foo").routePolicy(policy)
.to("mock:foo");
+
from("seda:baz").autoStartup(false).routeId("baz").routePolicy(policy)
+ .to("mock:baz");
}
};
}
@@ -139,6 +159,7 @@ public class ClusteredRoutePolicyTest extends
ContextTestSupport {
private static class TestClusterView extends AbstractCamelClusterView {
private boolean leader;
+ private boolean running;
public TestClusterView(CamelClusterService cluster, String namespace) {
super(cluster, namespace);
@@ -176,10 +197,12 @@ public class ClusteredRoutePolicyTest extends
ContextTestSupport {
@Override
protected void doStart() throws Exception {
+ running = true;
}
@Override
protected void doStop() throws Exception {
+ running = false;
}
public boolean isLeader() {
@@ -193,6 +216,10 @@ public class ClusteredRoutePolicyTest extends
ContextTestSupport {
fireLeadershipChangedEvent(getLeader());
}
}
+
+ public boolean isRunning() {
+ return running;
+ }
}
private static class TestClusterService extends
AbstractCamelClusterService<TestClusterView> {
diff --git
a/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyTest.java
b/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyUnmanagedClusterServiceTest.java
similarity index 58%
copy from
core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyTest.java
copy to
core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyUnmanagedClusterServiceTest.java
index e8b8fa4..0003baa 100644
---
a/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyUnmanagedClusterServiceTest.java
@@ -22,17 +22,15 @@ import java.util.Optional;
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
-import org.apache.camel.ServiceStatus;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.cluster.ClusteredRoutePolicy;
import org.apache.camel.support.cluster.AbstractCamelClusterService;
import org.apache.camel.support.cluster.AbstractCamelClusterView;
import org.junit.jupiter.api.Test;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
-public class ClusteredRoutePolicyTest extends ContextTestSupport {
+public class ClusteredRoutePolicyUnmanagedClusterServiceTest extends
ContextTestSupport {
private ClusteredRoutePolicy policy;
private TestClusterService cs;
@@ -42,84 +40,20 @@ public class ClusteredRoutePolicyTest extends
ContextTestSupport {
CamelContext context = super.createCamelContext();
cs = new TestClusterService("my-cluster-service");
- context.addService(cs);
+ cs.start();
- policy = ClusteredRoutePolicy.forNamespace("my-ns");
+ policy = ClusteredRoutePolicy.forNamespace(cs, "my-ns");
return context;
}
@Test
- public void testClusteredRoutePolicy() throws Exception {
- // route is stopped as we are not leader yet
- assertEquals(ServiceStatus.Stopped,
context.getRouteController().getRouteStatus("foo"));
-
- MockEndpoint mock = getMockEndpoint("mock:foo");
- mock.expectedBodiesReceived("Hello Foo");
-
+ public void testClusteredRoutePolicyReleaseViewOnCamelContextStop() {
cs.getView().setLeader(true);
- template.sendBody("seda:foo", "Hello Foo");
-
- assertMockEndpointsSatisfied();
+ context.stop();
- assertEquals(ServiceStatus.Started,
context.getRouteController().getRouteStatus("foo"));
- }
-
- @Test
- public void testClusteredRoutePolicyAddRoute() throws Exception {
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("seda:bar").routeId("bar").routePolicy(policy)
- .to("mock:bar");
- }
- });
-
- // route is stopped as we are not leader yet
- assertEquals(ServiceStatus.Stopped,
context.getRouteController().getRouteStatus("foo"));
- assertEquals(ServiceStatus.Stopped,
context.getRouteController().getRouteStatus("bar"));
-
- getMockEndpoint("mock:foo").expectedBodiesReceived("Hello Foo");
- getMockEndpoint("mock:bar").expectedBodiesReceived("Hello Bar");
-
- cs.getView().setLeader(true);
-
- template.sendBody("seda:foo", "Hello Foo");
- template.sendBody("seda:bar", "Hello Bar");
-
- assertMockEndpointsSatisfied();
-
- assertEquals(ServiceStatus.Started,
context.getRouteController().getRouteStatus("foo"));
- assertEquals(ServiceStatus.Started,
context.getRouteController().getRouteStatus("bar"));
- }
-
- @Test
- public void testClusteredRoutePolicyAddRouteAlreadyLeader() throws
Exception {
- cs.getView().setLeader(true);
-
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("seda:bar").routeId("bar").routePolicy(policy)
- .to("mock:bar");
- }
- });
-
- // route is started as we are leader
- assertEquals(ServiceStatus.Started,
context.getRouteController().getRouteStatus("foo"));
- assertEquals(ServiceStatus.Started,
context.getRouteController().getRouteStatus("bar"));
-
- getMockEndpoint("mock:foo").expectedBodiesReceived("Hello Foo");
- getMockEndpoint("mock:bar").expectedBodiesReceived("Hello Bar");
-
- template.sendBody("seda:foo", "Hello Foo");
- template.sendBody("seda:bar", "Hello Bar");
-
- assertMockEndpointsSatisfied();
-
- assertEquals(ServiceStatus.Started,
context.getRouteController().getRouteStatus("foo"));
- assertEquals(ServiceStatus.Started,
context.getRouteController().getRouteStatus("bar"));
+ assertFalse(cs.getView().isRunning());
}
@Override
@@ -129,6 +63,8 @@ public class ClusteredRoutePolicyTest extends
ContextTestSupport {
public void configure() throws Exception {
from("seda:foo").routeId("foo").routePolicy(policy)
.to("mock:foo");
+ from("seda:bar").routeId("bar").routePolicy(policy)
+ .to("mock:bar");
}
};
}
@@ -139,6 +75,7 @@ public class ClusteredRoutePolicyTest extends
ContextTestSupport {
private static class TestClusterView extends AbstractCamelClusterView {
private boolean leader;
+ private boolean running;
public TestClusterView(CamelClusterService cluster, String namespace) {
super(cluster, namespace);
@@ -176,10 +113,12 @@ public class ClusteredRoutePolicyTest extends
ContextTestSupport {
@Override
protected void doStart() throws Exception {
+ running = true;
}
@Override
protected void doStop() throws Exception {
+ running = false;
}
public boolean isLeader() {
@@ -193,6 +132,10 @@ public class ClusteredRoutePolicyTest extends
ContextTestSupport {
fireLeadershipChangedEvent(getLeader());
}
}
+
+ public boolean isRunning() {
+ return running;
+ }
}
private static class TestClusterService extends
AbstractCamelClusterService<TestClusterView> {