This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 1d78fb2  CAMEL-17065: Fix cluster view release on all routes stop. Add 
autoStartup check before starting route. (#6238)
1d78fb2 is described below

commit 1d78fb2958dd8e60662d5726de67eaa20a6053cf
Author: iliya-gr <[email protected]>
AuthorDate: Tue Oct 12 07:16:57 2021 +0300

    CAMEL-17065: Fix cluster view release on all routes stop. Add autoStartup 
check before starting route. (#6238)
---
 .../camel/impl/cluster/ClusteredRoutePolicy.java   | 17 ++++-
 .../camel/cluster/ClusteredRoutePolicyTest.java    | 27 +++++++
 ...redRoutePolicyUnmanagedClusterServiceTest.java} | 89 ++++------------------
 3 files changed, 58 insertions(+), 75 deletions(-)

diff --git 
a/core/camel-cluster/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java
 
b/core/camel-cluster/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java
index 9d57b71..bcbcf05 100644
--- 
a/core/camel-cluster/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java
+++ 
b/core/camel-cluster/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java
@@ -53,6 +53,7 @@ public final class ClusteredRoutePolicy extends 
RoutePolicySupport implements Ca
     private static final Logger LOG = 
LoggerFactory.getLogger(ClusteredRoutePolicy.class);
 
     private final AtomicBoolean leader;
+    private final Set<Route> autoStartupRoutes;
     private final Set<Route> startedRoutes;
     private final Set<Route> stoppedRoutes;
     private final ReferenceCount refCount;
@@ -84,6 +85,7 @@ public final class ClusteredRoutePolicy extends 
RoutePolicySupport implements Ca
 
         this.stoppedRoutes = new HashSet<>();
         this.startedRoutes = new HashSet<>();
+        this.autoStartupRoutes = new HashSet<>();
         this.leader = new AtomicBoolean();
         this.contextStarted = new AtomicBoolean();
         this.initialDelay = Duration.ofMillis(0);
@@ -178,6 +180,10 @@ public final class ClusteredRoutePolicy extends 
RoutePolicySupport implements Ca
 
         this.refCount.retain();
 
+        if (route.isAutoStartup()) {
+            autoStartupRoutes.add(route);
+        }
+
         if (camelContext.isStarted() && isLeader()) {
             // when camel context is already started, and we add new routes
             // then let the route controller start the route as usual (no need 
to mark as auto startup false)
@@ -214,7 +220,12 @@ public final class ClusteredRoutePolicy extends 
RoutePolicySupport implements Ca
     }
 
     @Override
-    public void doShutdown() throws Exception {
+    public void onRemove(Route route) {
+        autoStartupRoutes.remove(route);
+    }
+
+    @Override
+    public void onStop(Route route) {
         this.refCount.release();
     }
 
@@ -266,7 +277,9 @@ public final class ClusteredRoutePolicy extends 
RoutePolicySupport implements Ca
         try {
             for (Route route : stoppedRoutes) {
                 ServiceStatus status = getStatus(route);
-                if (status != null && status.isStartable()) {
+                boolean autostart = autoStartupRoutes.contains(route);
+
+                if (status != null && status.isStartable() && autostart) {
                     LOG.debug("Starting route '{}'", route.getId());
                     
camelContext.getRouteController().startRoute(route.getId());
 
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyTest.java
index e8b8fa4..97c92d8 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyTest.java
@@ -31,6 +31,7 @@ import 
org.apache.camel.support.cluster.AbstractCamelClusterView;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 
 public class ClusteredRoutePolicyTest extends ContextTestSupport {
 
@@ -67,6 +68,23 @@ public class ClusteredRoutePolicyTest extends 
ContextTestSupport {
     }
 
     @Test
+    public void testClusteredRoutePolicyStopAllRoutes() throws Exception {
+        cs.getView().setLeader(true);
+
+        context.getRouteController().stopRoute("foo");
+        context.getRouteController().stopRoute("baz");
+
+        assertFalse(cs.getView().isRunning());
+    }
+
+    @Test
+    public void testClusteredRoutePolicyDontStartAutoStartFalseRoutes() throws 
Exception {
+        cs.getView().setLeader(true);
+
+        assertEquals(ServiceStatus.Stopped, 
context.getRouteController().getRouteStatus("baz"));
+    }
+
+    @Test
     public void testClusteredRoutePolicyAddRoute() throws Exception {
         context.addRoutes(new RouteBuilder() {
             @Override
@@ -129,6 +147,8 @@ public class ClusteredRoutePolicyTest extends 
ContextTestSupport {
             public void configure() throws Exception {
                 from("seda:foo").routeId("foo").routePolicy(policy)
                         .to("mock:foo");
+                
from("seda:baz").autoStartup(false).routeId("baz").routePolicy(policy)
+                        .to("mock:baz");
             }
         };
     }
@@ -139,6 +159,7 @@ public class ClusteredRoutePolicyTest extends 
ContextTestSupport {
 
     private static class TestClusterView extends AbstractCamelClusterView {
         private boolean leader;
+        private boolean running;
 
         public TestClusterView(CamelClusterService cluster, String namespace) {
             super(cluster, namespace);
@@ -176,10 +197,12 @@ public class ClusteredRoutePolicyTest extends 
ContextTestSupport {
 
         @Override
         protected void doStart() throws Exception {
+            running = true;
         }
 
         @Override
         protected void doStop() throws Exception {
+            running = false;
         }
 
         public boolean isLeader() {
@@ -193,6 +216,10 @@ public class ClusteredRoutePolicyTest extends 
ContextTestSupport {
                 fireLeadershipChangedEvent(getLeader());
             }
         }
+
+        public boolean isRunning() {
+            return running;
+        }
     }
 
     private static class TestClusterService extends 
AbstractCamelClusterService<TestClusterView> {
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyUnmanagedClusterServiceTest.java
similarity index 58%
copy from 
core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyTest.java
copy to 
core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyUnmanagedClusterServiceTest.java
index e8b8fa4..0003baa 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyUnmanagedClusterServiceTest.java
@@ -22,17 +22,15 @@ import java.util.Optional;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
-import org.apache.camel.ServiceStatus;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.impl.cluster.ClusteredRoutePolicy;
 import org.apache.camel.support.cluster.AbstractCamelClusterService;
 import org.apache.camel.support.cluster.AbstractCamelClusterView;
 import org.junit.jupiter.api.Test;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 
-public class ClusteredRoutePolicyTest extends ContextTestSupport {
+public class ClusteredRoutePolicyUnmanagedClusterServiceTest extends 
ContextTestSupport {
 
     private ClusteredRoutePolicy policy;
     private TestClusterService cs;
@@ -42,84 +40,20 @@ public class ClusteredRoutePolicyTest extends 
ContextTestSupport {
         CamelContext context = super.createCamelContext();
 
         cs = new TestClusterService("my-cluster-service");
-        context.addService(cs);
+        cs.start();
 
-        policy = ClusteredRoutePolicy.forNamespace("my-ns");
+        policy = ClusteredRoutePolicy.forNamespace(cs, "my-ns");
 
         return context;
     }
 
     @Test
-    public void testClusteredRoutePolicy() throws Exception {
-        // route is stopped as we are not leader yet
-        assertEquals(ServiceStatus.Stopped, 
context.getRouteController().getRouteStatus("foo"));
-
-        MockEndpoint mock = getMockEndpoint("mock:foo");
-        mock.expectedBodiesReceived("Hello Foo");
-
+    public void testClusteredRoutePolicyReleaseViewOnCamelContextStop() {
         cs.getView().setLeader(true);
 
-        template.sendBody("seda:foo", "Hello Foo");
-
-        assertMockEndpointsSatisfied();
+        context.stop();
 
-        assertEquals(ServiceStatus.Started, 
context.getRouteController().getRouteStatus("foo"));
-    }
-
-    @Test
-    public void testClusteredRoutePolicyAddRoute() throws Exception {
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("seda:bar").routeId("bar").routePolicy(policy)
-                        .to("mock:bar");
-            }
-        });
-
-        // route is stopped as we are not leader yet
-        assertEquals(ServiceStatus.Stopped, 
context.getRouteController().getRouteStatus("foo"));
-        assertEquals(ServiceStatus.Stopped, 
context.getRouteController().getRouteStatus("bar"));
-
-        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello Foo");
-        getMockEndpoint("mock:bar").expectedBodiesReceived("Hello Bar");
-
-        cs.getView().setLeader(true);
-
-        template.sendBody("seda:foo", "Hello Foo");
-        template.sendBody("seda:bar", "Hello Bar");
-
-        assertMockEndpointsSatisfied();
-
-        assertEquals(ServiceStatus.Started, 
context.getRouteController().getRouteStatus("foo"));
-        assertEquals(ServiceStatus.Started, 
context.getRouteController().getRouteStatus("bar"));
-    }
-
-    @Test
-    public void testClusteredRoutePolicyAddRouteAlreadyLeader() throws 
Exception {
-        cs.getView().setLeader(true);
-
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("seda:bar").routeId("bar").routePolicy(policy)
-                        .to("mock:bar");
-            }
-        });
-
-        // route is started as we are leader
-        assertEquals(ServiceStatus.Started, 
context.getRouteController().getRouteStatus("foo"));
-        assertEquals(ServiceStatus.Started, 
context.getRouteController().getRouteStatus("bar"));
-
-        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello Foo");
-        getMockEndpoint("mock:bar").expectedBodiesReceived("Hello Bar");
-
-        template.sendBody("seda:foo", "Hello Foo");
-        template.sendBody("seda:bar", "Hello Bar");
-
-        assertMockEndpointsSatisfied();
-
-        assertEquals(ServiceStatus.Started, 
context.getRouteController().getRouteStatus("foo"));
-        assertEquals(ServiceStatus.Started, 
context.getRouteController().getRouteStatus("bar"));
+        assertFalse(cs.getView().isRunning());
     }
 
     @Override
@@ -129,6 +63,8 @@ public class ClusteredRoutePolicyTest extends 
ContextTestSupport {
             public void configure() throws Exception {
                 from("seda:foo").routeId("foo").routePolicy(policy)
                         .to("mock:foo");
+                from("seda:bar").routeId("bar").routePolicy(policy)
+                        .to("mock:bar");
             }
         };
     }
@@ -139,6 +75,7 @@ public class ClusteredRoutePolicyTest extends 
ContextTestSupport {
 
     private static class TestClusterView extends AbstractCamelClusterView {
         private boolean leader;
+        private boolean running;
 
         public TestClusterView(CamelClusterService cluster, String namespace) {
             super(cluster, namespace);
@@ -176,10 +113,12 @@ public class ClusteredRoutePolicyTest extends 
ContextTestSupport {
 
         @Override
         protected void doStart() throws Exception {
+            running = true;
         }
 
         @Override
         protected void doStop() throws Exception {
+            running = false;
         }
 
         public boolean isLeader() {
@@ -193,6 +132,10 @@ public class ClusteredRoutePolicyTest extends 
ContextTestSupport {
                 fireLeadershipChangedEvent(getLeader());
             }
         }
+
+        public boolean isRunning() {
+            return running;
+        }
     }
 
     private static class TestClusterService extends 
AbstractCamelClusterService<TestClusterView> {

Reply via email to