This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 24fbc3c CAMEL-13580: Remove zookeeper route policy as there are
better alternatives.
24fbc3c is described below
commit 24fbc3c6c4db4868b571e712170a7abcb3be2e6d
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Jun 3 20:46:03 2019 +0200
CAMEL-13580: Remove zookeeper route policy as there are better alternatives.
---
MIGRATION.md | 2 +
.../src/main/docs/zookeeper-component.adoc | 67 ---
.../zookeeper/policy/CuratorLeaderElection.java | 161 -------
.../zookeeper/policy/CuratorLeaderRoutePolicy.java | 170 --------
.../policy/CuratorMultiMasterLeaderElection.java | 171 --------
.../CuratorMultiMasterLeaderRoutePolicy.java | 191 ---------
.../zookeeper/policy/ElectionWatcher.java | 27 --
.../zookeeper/policy/ZooKeeperElection.java | 253 -----------
.../zookeeper/policy/ZooKeeperRoutePolicy.java | 170 --------
.../FailoverCuratorLeaderRoutePolicyTest.java | 141 -------
.../zookeeper/policy/FailoverRoutePolicyTest.java | 118 ------
.../MultiMasterCuratorLeaderRoutePolicyTest.java | 461 ---------------------
.../ZookeeperDoubleRouteAndDoublePolicyTest.java | 57 ---
.../zookeeper/policy/ZookeeperElectionTest.java | 148 -------
.../zookeeper/policy/ZookeeperRoutePolicyTest.java | 48 ---
15 files changed, 2 insertions(+), 2183 deletions(-)
diff --git a/MIGRATION.md b/MIGRATION.md
index 05e0a66..0431a17 100644
--- a/MIGRATION.md
+++ b/MIGRATION.md
@@ -88,6 +88,8 @@ We have removed all deprecated components from Camel 2.x,
also including the old
We removed `camel-jibx` component which wasn't working on JDK 8.
+The `camel-zookeeper` has its route policy functionality removed, instead use
`ZooKeeperClusterService` or the `camel-zookeeper-master` component.
+
### Renamed components
The `test` component has been renamed to `dataset-test` and moved out of
`camel-core` into `camel-dataset` JAR.
diff --git a/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc
b/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc
index 1351b68..92720fb 100644
--- a/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc
+++ b/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc
@@ -12,9 +12,6 @@ following features to Camel:
being set must be convertible to `byte[]`).
3. Create and retrieve the list the child nodes attached to a
particular node.
-4. A Distributed `RoutePolicy` that leverages a
-Leader election coordinated by ZooKeeper to determine if exchanges
-should get processed.
Maven users will need to add the following dependency to their `pom.xml`
for this component:
@@ -306,67 +303,3 @@ Object testPayload = ...
template.sendBodyAndHeader("direct:create-and-write-to-persistent-znode",
testPayload, "CamelZooKeeperCreateMode", "PERSISTENT");
----
-### ZooKeeper enabled Route policies
-
-ZooKeeper allows for very simple and effective leader election out of
-the box. This component exploits this election capability in a
-`RoutePolicy` to control when and how routes are
-enabled. This policy would typically be used in fail-over scenarios, to
-control identical instances of a route across a cluster of Camel based
-servers. A very common scenario is a simple 'Master-Slave' setup where
-there are multiple instances of a route distributed across a cluster but
-only one of them, that of the master, should be running at a time. If
-the master fails, a new master should be elected from the available
-slaves and the route in this new master should be started.
-
-The policy uses a common _znode_ path across all instances of the
-`RoutePolicy` that will be involved in the election. Each policy writes
-its id into this node and Zookeeper will order the writes in the order
-it received them. The policy then reads the listing of the node to see
-what position of its id; this position is used to determine if the route
-should be started or not. The policy is configured at startup with the
-number of route instances that should be started across the cluster and
-if its position in the list is less than this value then its route will
-be started. For a Master-slave scenario, the route is configured with 1
-route instance and only the first entry in the listing will start its
-route. All policies watch for updates to the listing and if the listing
-changes they recalculate if their route should be started. For more info
-on Zookeeper's leader election capability see
-http://zookeeper.apache.org/doc/trunk/recipes.html#sc_leaderElection[this
-page].
-
-The following example uses the node `/someapplication/somepolicy` for
-the election and is set up to start only the top '1' entries in the node
-listing i.e. elect a master:
-
-[source,java]
-----
-ZooKeeperRoutePolicy policy = new
ZooKeeperRoutePolicy("zookeeper:localhost:39913/someapp/somepolicy", 1);
-from("direct:policy-controlled")
- .routePolicy(policy)
- .to("mock:controlled");
-----
-
-There are currently 3 policies defined in the component, with different SLAs:
-
-* `ZooKeeperRoutePolicy`
-* `CuratorLeaderRoutePolicy` (since *2.19*)
-* `MultiMasterCuratorLeaderRoutePolicy` (since *2.19*)
-
-*ZooKeeperRoutePolicy* supports multiple active nodes, but it's activation
kicks in only after a Camel component and its correspondent Consumer have
already been started,
- this introduces, depending on your routes definition, the risk that you
component can already start consuming events and producing `Exchange`s, before
the policy could estabilish
- that the node should not be activated.
-
-*CuratorLeaderRoutePolicy* supports only a single active node, but it's bound
to a different `CamelContext` lifecycle method; this Policy kicks in before any
route or consumer is started
- thus you can be sure that no even is processed before the Policy takes its
decision.
-
-*MultiMasterCuratorLeaderRoutePolicy* support multiple active nodes, and it's
bound to the same lifecycle method as `CuratorLeaderRoutePolicy`; this Policy
kicks in before any route or consumer is started
- thus you can be sure that no even is processed before the Policy takes its
decision.
-
-
-### See Also
-
-* Configuring Camel
-* Component
-* Endpoint
-* Getting Started
diff --git
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorLeaderElection.java
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorLeaderElection.java
deleted file mode 100644
index 5a4d35c..0000000
---
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorLeaderElection.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.StatefulService;
-import org.apache.camel.impl.engine.JavaUuidGenerator;
-import org.apache.camel.spi.UuidGenerator;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.leader.LeaderSelector;
-import
org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <code>CuratorLeaderElection</code> uses the leader election capabilities of
a
- * ZooKeeper cluster to control which nodes are enabled. It is typically used
in
- * fail-over scenarios controlling identical instances of an application across
- * a cluster of Camel based servers. <p> The election is configured with a
single
- * server that should be marked as master.
- * <p> All instances of the election must also be configured with the same
path on the ZooKeeper
- * cluster where the election will be carried out. It is good practice for this
- * to indicate the application e.g. <tt>/someapplication/someroute/</tt> note
- * that these nodes should exist before using the election. <p> See <a
- *
href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection">
- * for more on how Leader election</a> is archived with ZooKeeper.
- */
-public class CuratorLeaderElection {
-
- private static final Logger LOG =
LoggerFactory.getLogger(CuratorLeaderElection.class);
- private final CamelContext camelContext;
- private final String uri;
-
- private final String candidateName;
- private final Lock lock = new ReentrantLock();
- private final CountDownLatch electionComplete = new CountDownLatch(1);
- private final List<ElectionWatcher> watchers = new ArrayList<>();
- private AtomicBoolean masterNode = new AtomicBoolean(false);
- private volatile boolean isCandidateCreated;
- private int enabledCount = 1;
- private UuidGenerator uuidGenerator = new JavaUuidGenerator();
- private LeaderSelector leaderSelector;
- private CuratorFramework client;
-
- public CuratorLeaderElection(CamelContext camelContext, String uri) {
- this.camelContext = camelContext;
- this.uri = uri;
- this.candidateName = createCandidateName();
-
- String connectionString = uri.substring(1 +
uri.indexOf(':')).split("/")[0];
- String protocol = uri.substring(0, uri.indexOf(':'));
- String path = uri.replace(protocol + ":" + connectionString, "");
- client = CuratorFrameworkFactory.newClient(connectionString, new
ExponentialBackoffRetry(1000, 3));
- client.start();
-
- leaderSelector = new LeaderSelector(client, path, new
CamelLeaderElectionListener());
- leaderSelector.start();
- }
-
- // stolen from org/apache/camel/processor/CamelInternalProcessor
- public static boolean isCamelStopping(CamelContext context) {
- if (context instanceof StatefulService) {
- StatefulService ss = (StatefulService) context;
- return ss.isStopping() || ss.isStopped();
- }
- return false;
- }
-
- public void shutdownClients() {
- try {
- leaderSelector.close();
- } finally {
- client.close();
- }
- }
-
- public boolean isMaster() {
- return masterNode.get();
- }
-
- private String createCandidateName() {
- StringBuilder builder = new StringBuilder();
- try {
- /* UUID would be enough, also using hostname for human readability
*/
- builder.append(InetAddress.getLocalHost().getCanonicalHostName());
- } catch (UnknownHostException ex) {
- LOG.warn("Failed to get the local hostname.", ex);
- builder.append("unknown-host");
- }
- builder.append("-").append(uuidGenerator.generateUuid());
- return builder.toString();
- }
-
- public String getCandidateName() {
- return candidateName;
- }
-
- private void notifyElectionWatchers() {
- for (ElectionWatcher watcher : watchers) {
- try {
- watcher.electionResultChanged();
- } catch (Exception e) {
- LOG.warn("Election watcher " + watcher + " of type " +
watcher.getClass() + " threw an exception.", e);
- }
- }
- }
-
- public boolean addElectionWatcher(ElectionWatcher e) {
- return watchers.add(e);
- }
-
- class CamelLeaderElectionListener extends LeaderSelectorListenerAdapter {
-
- @Override
- public void takeLeadership(CuratorFramework curatorFramework) throws
Exception {
- masterNode.set(true);
- LOG.info("{} is now leader", getCandidateName());
- notifyElectionWatchers();
-
- // this is supposed to never return as long as it wants to keep
its own leader status
- while (!isCamelStopping(camelContext)) {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- Thread.interrupted();
- break;
- }
- }
- masterNode.set(false);
- LOG.info("{} has given up its own leadership", getCandidateName());
- notifyElectionWatchers();
- }
- }
-}
-
-
diff --git
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorLeaderRoutePolicy.java
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorLeaderRoutePolicy.java
deleted file mode 100644
index cf3af30..0000000
---
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorLeaderRoutePolicy.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.camel.NonManagedService;
-import org.apache.camel.Route;
-import org.apache.camel.model.RouteDefinition;
-import org.apache.camel.support.RoutePolicySupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-
-/**
- * <code>CuratorLeaderRoutePolicy</code> uses Apache Curator LeaderElection
recipe to implement the behavior of having
- * at max 1 instance of a route, controlled by a specific policy, running. It
is typically used in
- * fail-over scenarios controlling identical instances of a route across a
- * cluster of Camel based servers.
- * <p>
- * The policy affects the normal startup lifecycle of CamelContext and Routes,
automatically set autoStart property of
- * routes controlled by this policy to false.
- * After Curator recipe identifies the current Policy instance as the Leader
between a set of clients that are
- * competing for the role, it will start the route, and only at that moment
the route will start its business.
- * This specific behavior is designed to avoid scenarios where such a policy
would kick in only after a route had
- * already been started, with the risk, for consumers for example, that some
source event might have already been
- * consumed.
- * <p>
- * All instances of the policy must also be configured with the same path on
the
- * ZooKeeper cluster where the election will be carried out. It is good
practice
- * for this to indicate the application e.g.
<tt>/someapplication/someroute/</tt> note
- * that these nodes should exist before using the policy.
- * <p>
- * See <a
href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection">
- * for more on how Leader election</a> is archived with ZooKeeper.
- */
-public class CuratorLeaderRoutePolicy extends RoutePolicySupport implements
ElectionWatcher, NonManagedService {
-
- private static final Logger LOG =
LoggerFactory.getLogger(CuratorLeaderRoutePolicy.class);
- private final String uri;
- private final Lock lock = new ReentrantLock();
- private final Set<Route> suspendedRoutes = new CopyOnWriteArraySet<>();
- private final AtomicBoolean shouldProcessExchanges = new AtomicBoolean();
- private volatile boolean shouldStopRoute = true;
-
- private final Lock electionLock = new ReentrantLock();
- private CuratorLeaderElection election;
-
- public CuratorLeaderRoutePolicy(String uri) {
- this.uri = uri;
- }
-
- public CuratorLeaderRoutePolicy(CuratorLeaderElection election) {
- this.election = election;
- this.uri = null;
- }
-
- @Override
- public void onInit(Route route) {
- ensureElectionIsCreated(route);
- LOG.info("Route managed by {}. Setting route {} AutoStartup flag to
false.", this.getClass(), route.getId());
- route.getRouteContext().setAutoStartup(false);
- ensureElectionIsCreated(route);
-
- if (election.isMaster()) {
- if (shouldStopRoute) {
- startManagedRoute(route);
- }
- } else {
- if (shouldStopRoute) {
- stopManagedRoute(route);
- }
- }
- }
-
- private void ensureElectionIsCreated(Route route) {
- if (election == null) {
- electionLock.lock();
- try {
- if (election == null) { // re-test
- election = new
CuratorLeaderElection(route.getCamelContext(), uri);
- election.addElectionWatcher(this);
- }
- } finally {
- electionLock.unlock();
- }
- }
- }
-
- private void startManagedRoute(Route route) {
- try {
- lock.lock();
- if (suspendedRoutes.contains(route)) {
- startRoute(route);
- suspendedRoutes.remove(route);
- }
- } catch (Exception e) {
- handleException(e);
- } finally {
- lock.unlock();
- }
- }
-
- private void stopManagedRoute(Route route) {
- try {
- lock.lock();
- // check that we should still suspend once the lock is acquired
- if (!suspendedRoutes.contains(route) &&
!shouldProcessExchanges.get()) {
- stopRoute(route);
- suspendedRoutes.add(route);
- }
- } catch (Exception e) {
- handleException(e);
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public void electionResultChanged() {
- if (election.isMaster()) {
- startAllStoppedRoutes();
- }
- }
-
- private void startAllStoppedRoutes() {
- try {
- lock.lock();
- if (!suspendedRoutes.isEmpty()) {
- if (log.isDebugEnabled()) {
- log.debug("{} route(s) have been stopped previously by
policy, restarting.", suspendedRoutes.size());
- }
- for (Route suspended : suspendedRoutes) {
- log.debug("Starting route {}.", suspended.getId());
- startRoute(suspended);
- }
- suspendedRoutes.clear();
- }
-
- } catch (Exception e) {
- handleException(e);
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- protected void doShutdown() throws Exception {
- election.shutdownClients();
- }
-}
diff --git
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderElection.java
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderElection.java
deleted file mode 100644
index d027015..0000000
---
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderElection.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.StatefulService;
-import org.apache.camel.impl.engine.JavaUuidGenerator;
-import org.apache.camel.spi.UuidGenerator;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
-import org.apache.curator.framework.recipes.locks.Lease;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <code>CuratorMultiMasterLeaderElection</code> uses the leader election
capabilities of a
- * ZooKeeper cluster to control which nodes are enabled. It is typically used
in
- * fail-over scenarios controlling identical instances of an application across
- * a cluster of Camel based servers. <p> The election is configured providing
the number of instances that are required
- * to be active..
- * <p> All instances of the election must also be configured with the same
path on the ZooKeeper
- * cluster where the election will be carried out. It is good practice for this
- * to indicate the application e.g. <tt>/someapplication/someroute/</tt> note
- * that these nodes should exist before using the election. <p> See <a
- *
href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection">
- * for more on how Leader election</a> is archived with ZooKeeper.
- */
-public class CuratorMultiMasterLeaderElection implements
ConnectionStateListener {
-
- private static final Logger LOG =
LoggerFactory.getLogger(CuratorMultiMasterLeaderElection.class);
-
- private final String candidateName;
- private final List<ElectionWatcher> watchers = new ArrayList<>();
- private final int desiredActiveNodes;
- private AtomicBoolean activeNode = new AtomicBoolean(false);
- private UuidGenerator uuidGenerator = new JavaUuidGenerator();
- private InterProcessSemaphoreV2 leaderSelector;
- private CuratorFramework client;
- private Lease lease;
-
- public CuratorMultiMasterLeaderElection(String uri, int
desiredActiveNodes) {
- this.candidateName = createCandidateName();
- this.desiredActiveNodes = desiredActiveNodes;
-
- String connectionString = uri.substring(1 +
uri.indexOf(':')).split("/")[0];
- String protocol = uri.substring(0, uri.indexOf(':'));
- String path = uri.replace(protocol + ":" + connectionString, "");
- client = CuratorFrameworkFactory.newClient(connectionString, new
ExponentialBackoffRetry(1000, 3));
- client.getConnectionStateListenable().addListener(this);
- leaderSelector = new InterProcessSemaphoreV2(client, path,
this.desiredActiveNodes);
- client.start();
-
-
- }
-
- // stolen from org/apache/camel/processor/CamelInternalProcessor
- public static boolean isCamelStopping(CamelContext context) {
- if (context instanceof StatefulService) {
- StatefulService ss = (StatefulService) context;
- return ss.isStopping() || ss.isStopped();
- }
- return false;
- }
-
- public void shutdownClients() {
- try {
- leaderSelector.returnLease(lease);
- } finally {
- client.close();
- }
- }
-
- /*
- * Blocking method
- */
- public void requestResource() {
- LOG.info("Requested to become active from {}", candidateName);
- try {
- lease = leaderSelector.acquire();
- } catch (Exception e) {
- throw new RuntimeException("Unable to obtain access to become a
leader node.");
- }
- LOG.info("{} is now active", candidateName);
- activeNode.set(true);
- notifyElectionWatchers();
- }
-
- public boolean isMaster() {
- return activeNode.get();
- }
-
- private String createCandidateName() {
- StringBuilder builder = new StringBuilder();
- try {
- /* UUID would be enough, also using hostname for human readability
*/
- builder.append(InetAddress.getLocalHost().getCanonicalHostName());
- } catch (UnknownHostException ex) {
- LOG.warn("Failed to get the local hostname.", ex);
- builder.append("unknown-host");
- }
- builder.append("-").append(uuidGenerator.generateUuid());
- return builder.toString();
- }
-
- public String getCandidateName() {
- return candidateName;
- }
-
- private void notifyElectionWatchers() {
- for (ElectionWatcher watcher : watchers) {
- try {
- watcher.electionResultChanged();
- } catch (Exception e) {
- LOG.warn("Election watcher " + watcher + " of type " +
watcher.getClass() + " threw an exception.", e);
- }
- }
- }
-
- public boolean addElectionWatcher(ElectionWatcher e) {
- return watchers.add(e);
- }
-
- @Override
- public void stateChanged(CuratorFramework curatorFramework,
ConnectionState connectionState) {
- switch (connectionState) {
- case SUSPENDED:
- case LOST:
- LOG.info("Received {} state from connection. Giving up lock.",
connectionState);
-
- try {
- leaderSelector.returnLease(lease);
- } finally {
- this.activeNode.set(false);
- notifyElectionWatchers();
- }
-
- break;
- default:
- LOG.info("Connection state changed: {}", connectionState);
- requestResource();
-
- }
- }
-
-}
-
-
diff --git
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderRoutePolicy.java
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderRoutePolicy.java
deleted file mode 100644
index cfe952c..0000000
---
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderRoutePolicy.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.NonManagedService;
-import org.apache.camel.Route;
-import org.apache.camel.support.RoutePolicySupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-
-/**
- * <code>CuratorMultiMasterLeaderRoutePolicy</code> uses Apache Curator
InterProcessSemaphoreV2 receipe to implement the behavior of having
- * at multiple active instances of a route, controlled by a specific policy,
running. It is typically used in
- * fail-over scenarios controlling identical instances of a route across a
cluster of Camel based servers.
- * <p>
- * The policy affects the normal startup lifecycle of CamelContext and Routes,
automatically set autoStart property of
- * routes controlled by this policy to false.
- * After Curator receipe identifies the current Policy instance as the Leader
between a set of clients that are
- * competing for the role, it will start the route, and only at that moment
the route will start its business.
- * This specific behavior is designed to avoid scenarios where such a policy
would kick in only after a route had
- * already been started, with the risk, for consumers for example, that some
source event might have already been
- * consumed.
- * <p>
- * All instances of the policy must also be configured with the same path on
the
- * ZooKeeper cluster where the election will be carried out. It is good
practice
- * for this to indicate the application e.g.
<tt>/someapplication/someroute/</tt> note
- * that these nodes should exist before using the policy.
- * <p>
- * See <a
href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection">
- * for more on how Leader election</a> is archived with ZooKeeper.
- */
-public class CuratorMultiMasterLeaderRoutePolicy extends RoutePolicySupport
implements ElectionWatcher, NonManagedService {
-
- private static final Logger LOG =
LoggerFactory.getLogger(CuratorMultiMasterLeaderRoutePolicy.class);
- private final String uri;
- private final Lock lock = new ReentrantLock();
- private final Set<Route> suspendedRoutes = new CopyOnWriteArraySet<>();
- private final AtomicBoolean shouldProcessExchanges = new AtomicBoolean();
- private volatile boolean shouldStopRoute = true;
- private final int enabledCount;
-
-
- private final Lock electionLock = new ReentrantLock();
-
- private CuratorMultiMasterLeaderElection election;
-
- public CuratorMultiMasterLeaderRoutePolicy(String uri, int enabledCount) {
- this.uri = uri;
- this.enabledCount = enabledCount;
- }
- public CuratorMultiMasterLeaderRoutePolicy(String uri) {
- this(uri, 1);
- }
-
- @Override
- public void onInit(Route route) {
- ensureElectionIsCreated();
- LOG.info("Route managed by {}. Setting route [{}] AutoStartup flag to
false.", this.getClass(), route.getId());
- route.getRouteContext().setAutoStartup(false);
-
-
- if (election.isMaster()) {
- if (shouldStopRoute) {
- startManagedRoute(route);
- }
- } else {
- if (shouldStopRoute) {
- stopManagedRoute(route);
- }
- }
-
- }
-
- private void ensureElectionIsCreated() {
- if (election == null) {
- electionLock.lock();
- try {
- if (election == null) { // re-test
- election = new CuratorMultiMasterLeaderElection(uri,
enabledCount);
- election.addElectionWatcher(this);
-
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
- electionLock.unlock();
- }
- }
- }
-
- private void startManagedRoute(Route route) {
- try {
- lock.lock();
- if (suspendedRoutes.contains(route)) {
- startRoute(route);
- suspendedRoutes.remove(route);
- }
- } catch (Exception e) {
- handleException(e);
- } finally {
- lock.unlock();
- }
- }
-
- private void stopManagedRoute(Route route) {
- try {
- lock.lock();
- // check that we should still suspend once the lock is acquired
- if (!suspendedRoutes.contains(route) &&
!shouldProcessExchanges.get()) {
- stopRoute(route);
- suspendedRoutes.add(route);
- }
- } catch (Exception e) {
- handleException(e);
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public void electionResultChanged() {
- if (election.isMaster()) {
- startAllStoppedRoutes();
- }
- }
-
- private void startAllStoppedRoutes() {
- try {
- lock.lock();
-
- if (!suspendedRoutes.isEmpty()) {
- if (log.isDebugEnabled()) {
- log.info("{} route(s) have been stopped previously by
policy, restarting.", suspendedRoutes.size());
- }
- for (Route suspended : suspendedRoutes) {
- CamelContext ctx = suspended.getCamelContext();
- while (!ctx.isStarted()) {
- log.info("Context {} is not started yet. Sleeping for
a bit.", ctx.getName());
- Thread.sleep(5000);
- }
- log.info("Starting route [{}] defined in context [{}].",
suspended.getId(), ctx.getName());
- startRoute(suspended);
- }
- suspendedRoutes.clear();
- }
-
- } catch (Exception e) {
- handleException(e);
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- protected void doShutdown() throws Exception {
- try {
- electionLock.lock();
- election.shutdownClients();
- election = null;
- } finally {
- electionLock.unlock();
- }
- }
-
- public CuratorMultiMasterLeaderElection getElection() {
- return election;
- }
-}
diff --git
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ElectionWatcher.java
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ElectionWatcher.java
deleted file mode 100644
index 62e199b..0000000
---
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ElectionWatcher.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-public interface ElectionWatcher {
-
- /**
- * This method is called when there is a potential change to the master.
- * Implementations should call "isMaster" on their ZookeeperElection
- * instance to re-validate their status.
- */
- void electionResultChanged();
-}
diff --git
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperElection.java
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperElection.java
deleted file mode 100644
index b491f1c..0000000
---
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperElection.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Processor;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.zookeeper.SequenceComparator;
-import org.apache.camel.component.zookeeper.ZooKeeperEndpoint;
-import org.apache.camel.component.zookeeper.ZooKeeperMessage;
-import org.apache.camel.impl.engine.JavaUuidGenerator;
-import org.apache.camel.spi.UuidGenerator;
-import org.apache.zookeeper.CreateMode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <code>ZooKeeperElection</code> uses the leader election capabilities of a
- * ZooKeeper cluster to control which nodes are enabled. It is typically used
in
- * fail-over scenarios controlling identical instances of an application across
- * a cluster of Camel based servers. <p> The election is configured with a 'top
- * n' number of servers that should be marked as master, for a simple
- * master/slave scenario this would be 1. Each instance will execute the
- * election algorithm to obtain its position in the hierarchy of servers, if it
- * is within the 'top n' servers then the node is enabled and isMaster() will
- * return 'true'. If not it waits for a change in the leader hierarchy and then
- * reruns this scenario to see if it is now in the top n. <p> All instances of
- * the election must also be configured with the same path on the ZooKeeper
- * cluster where the election will be carried out. It is good practice for this
- * to indicate the application e.g. <tt>/someapplication/someroute/</tt> note
- * that these nodes should exist before using the election. <p> See <a
- *
href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection">
- * for more on how Leader election</a> is archived with ZooKeeper.
- */
-public class ZooKeeperElection {
-
- private static final Logger LOG =
LoggerFactory.getLogger(ZooKeeperElection.class);
- private final ProducerTemplate producerTemplate;
- private final CamelContext camelContext;
- private final String uri;
- private final String candidateName;
- private final Lock lock = new ReentrantLock();
- private final CountDownLatch electionComplete = new CountDownLatch(1);
- private AtomicBoolean masterNode = new AtomicBoolean();
- private volatile boolean isCandidateCreated;
- private int enabledCount = 1;
- private UuidGenerator uuidGenerator = new JavaUuidGenerator();
- private final List<ElectionWatcher> watchers = new ArrayList<>();
-
- public ZooKeeperElection(CamelContext camelContext, String uri, int
enabledCount) {
- this(camelContext.createProducerTemplate(), camelContext, uri,
enabledCount);
- }
-
- public ZooKeeperElection(ProducerTemplate producerTemplate, CamelContext
camelContext, String uri, int enabledCount) {
- this.camelContext = camelContext;
- this.producerTemplate = producerTemplate;
- this.uri = uri;
- this.enabledCount = enabledCount;
- this.candidateName = createCandidateName();
- }
-
- public boolean isMaster() {
- if (!isCandidateCreated) {
- testAndCreateCandidateNode();
- awaitElectionResults();
-
- }
- return masterNode.get();
- }
-
- private String createCandidateName() {
- StringBuilder builder = new StringBuilder();
- try {
- /* UUID would be enough, also using hostname for human readability
*/
- builder.append(InetAddress.getLocalHost().getCanonicalHostName());
- } catch (UnknownHostException ex) {
- LOG.warn("Failed to get the local hostname.", ex);
- builder.append("unknown-host");
- }
- builder.append("-").append(uuidGenerator.generateUuid());
- return builder.toString();
- }
-
- private void testAndCreateCandidateNode() {
- try {
- lock.lock();
- if (!isCandidateCreated) {
- createCandidateNode(camelContext);
- isCandidateCreated = true;
- }
- } catch (Exception e) {
- handleException(e);
- } finally {
- lock.unlock();
- }
- }
-
- private void awaitElectionResults() {
- while (electionComplete.getCount() > 0) {
- try {
- LOG.debug("Awaiting election results...");
- electionComplete.await();
- } catch (InterruptedException e1) {
- // do nothing here
- }
- }
- }
-
- private ZooKeeperEndpoint createCandidateNode(CamelContext camelContext) {
- LOG.info("Initializing ZookeeperElection with uri '{}'", uri);
- ZooKeeperEndpoint zep = camelContext.getEndpoint(uri,
ZooKeeperEndpoint.class);
- zep.getConfiguration().setCreate(true);
- String fullpath = createFullPathToCandidate(zep);
- Exchange e = zep.createExchange();
- e.setPattern(ExchangePattern.InOut);
- e.getIn().setHeader(ZooKeeperMessage.ZOOKEEPER_NODE, fullpath);
- e.getIn().setHeader(ZooKeeperMessage.ZOOKEEPER_CREATE_MODE,
CreateMode.EPHEMERAL_SEQUENTIAL);
- producerTemplate.send(zep, e);
-
- if (e.isFailed()) {
- LOG.warn("Error setting up election node {}", fullpath,
e.getException());
- } else {
- LOG.info("Candidate node '{}' has been created", fullpath);
- try {
- camelContext.addRoutes(new ElectoralMonitorRoute(zep));
- } catch (Exception ex) {
- LOG.warn("Error configuring ZookeeperElection", ex);
- }
- }
- return zep;
-
- }
-
- private String createFullPathToCandidate(ZooKeeperEndpoint zep) {
- String fullpath = zep.getConfiguration().getPath();
- if (!fullpath.endsWith("/")) {
- fullpath += "/";
- }
- fullpath += candidateName;
- return fullpath;
- }
-
- private void handleException(Exception e) {
- throw new RuntimeException(e.getMessage(), e);
- }
-
- private void notifyElectionWatchers() {
- for (ElectionWatcher watcher : watchers) {
- try {
- watcher.electionResultChanged();
- } catch (Exception e) {
- LOG.warn("Election watcher " + watcher + " of type " +
watcher.getClass() + " threw an exception.", e);
- }
- }
- }
-
- public boolean addElectionWatcher(ElectionWatcher e) {
- return watchers.add(e);
- }
-
- public boolean removeElectionWatcher(ElectionWatcher o) {
- return watchers.remove(o);
- }
-
- private class ElectoralMonitorRoute extends RouteBuilder {
-
- private SequenceComparator comparator = new SequenceComparator();
- private ZooKeeperEndpoint zep;
-
- ElectoralMonitorRoute(ZooKeeperEndpoint zep) {
- this.zep = zep;
- zep.getConfiguration().setListChildren(true);
- zep.getConfiguration().setSendEmptyMessageOnDelete(true);
- zep.getConfiguration().setRepeat(true);
- }
-
- @Override
- public void configure() throws Exception {
-
- /**
- * TODO: this is cheap cheerful but suboptimal; it suffers from the
- * 'herd effect' that on any change to the candidates list every
- * policy instance will ask for the entire candidate list again.
- * This is fine for small numbers of nodes (for scenarios like
- * Master-Slave it is perfect) but could get noisy if large numbers
- * of nodes were involved. <p> Better would be to find the position
- * of this node in the list and watch the node in the position
ahead
- * node ahead of this and only request the candidate list when its
- * status changes. This will require enhancing the consumer to
allow
- * custom operation lists.
- */
- from(zep).id("election-route-" + candidateName).sort(body(),
comparator).process(new Processor() {
- @Override
- public void process(Exchange e) throws Exception {
- @SuppressWarnings("unchecked")
- List<String> candidates =
e.getIn().getMandatoryBody(List.class);
- // we cannot use the binary search here and the candidates
a not sorted in the normal way
- /**
- * check if the item at this location starts with this
nodes
- * candidate name
- */
- int location =
findCandidateLocationInCandidatesList(candidates, candidateName);
- if (location != -1) {
- // set the nodes
- masterNode.set(location <= enabledCount);
- LOG.debug("This node is number '{}' on the candidate
list, election is configured for the top '{}'. this node will be {}",
- new Object[]{location, enabledCount,
masterNode.get() ? "enabled" : "disabled"}
- );
- }
- electionComplete.countDown();
-
- notifyElectionWatchers();
- }
-
- private int findCandidateLocationInCandidatesList(List<String>
candidates, String candidateName) {
-
- for (int location = 1; location <= candidates.size();
location++) {
- if (candidates.get(location -
1).startsWith(candidateName)) {
- return location;
- }
- }
- return -1;
- }
- });
- }
- }
-}
diff --git
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java
deleted file mode 100644
index bb6a760..0000000
---
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.NonManagedService;
-import org.apache.camel.Route;
-import org.apache.camel.support.RoutePolicySupport;
-
-/**
- * <code>ZooKeeperRoutePolicy</code> uses the leader election capabilities of a
- * ZooKeeper cluster to control how routes are enabled. It is typically used in
- * fail-over scenarios controlling identical instances of a route across a
- * cluster of Camel based servers.
- * <p>
- * The policy is configured with a 'top n' number of routes that should be
- * allowed to start, for a master/slave scenario this would be 1. Each instance
- * of the policy will execute the election algorithm to obtain its position in
- * the hierarchy of servers, if it is within the 'top n' servers then the
policy
- * is enabled and exchanges can be processed by the route. If not it waits for
a
- * change in the leader hierarchy and then reruns this scenario to see if it is
- * now in the top n.
- * <p>
- * All instances of the policy must also be configured with the same path on
the
- * ZooKeeper cluster where the election will be carried out. It is good
practice
- * for this to indicate the application e.g.
<tt>/someapplication/someroute/</tt> note
- * that these nodes should exist before using the policy.
- * <p>
- * See <a
href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection">
- * for more on how Leader election</a> is archived with ZooKeeper.
- */
-public class ZooKeeperRoutePolicy extends RoutePolicySupport implements
ElectionWatcher, NonManagedService {
-
- private final String uri;
- private final int enabledCount;
- private final Lock lock = new ReentrantLock();
- private final Set<Route> suspendedRoutes = new CopyOnWriteArraySet<>();
- private final AtomicBoolean shouldProcessExchanges = new AtomicBoolean();
- private volatile boolean shouldStopConsumer = true;
-
- private final Lock electionLock = new ReentrantLock();
- private ZooKeeperElection election;
-
- public ZooKeeperRoutePolicy(String uri, int enabledCount) {
- this.uri = uri;
- this.enabledCount = enabledCount;
- }
-
- public ZooKeeperRoutePolicy(ZooKeeperElection election) {
- this.election = election;
- this.uri = null;
- this.enabledCount = -1;
- }
-
- @Override
- public void onExchangeBegin(Route route, Exchange exchange) {
- ensureElectionIsCreated(route);
-
- if (election.isMaster()) {
- if (shouldStopConsumer) {
- startConsumer(route);
- }
- } else {
- if (shouldStopConsumer) {
- stopConsumer(route);
- }
-
- IllegalStateException e = new IllegalStateException("Zookeeper
based route policy prohibits processing exchanges, stopping route and failing
the exchange");
- exchange.setException(e);
- }
- }
-
- private void ensureElectionIsCreated(Route route) {
- if (election == null) {
- electionLock.lock();
- try {
- if (election == null) { // re-test
- election = new ZooKeeperElection(route.getCamelContext(),
uri, enabledCount);
- election.addElectionWatcher(this);
- }
- } finally {
- electionLock.unlock();
- }
- }
- }
-
- private void startConsumer(Route route) {
- try {
- lock.lock();
- if (suspendedRoutes.contains(route)) {
- startConsumer(route.getConsumer());
- suspendedRoutes.remove(route);
- }
- } catch (Exception e) {
- handleException(e);
- } finally {
- lock.unlock();
- }
- }
-
- private void stopConsumer(Route route) {
- try {
- lock.lock();
- // check that we should still suspend once the lock is acquired
- if (!suspendedRoutes.contains(route) &&
!shouldProcessExchanges.get()) {
- stopConsumer(route.getConsumer());
- suspendedRoutes.add(route);
- }
- } catch (Exception e) {
- handleException(e);
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public void electionResultChanged() {
- if (election.isMaster()) {
- startAllStoppedConsumers();
- }
- }
-
- private void startAllStoppedConsumers() {
- try {
- lock.lock();
- if (!suspendedRoutes.isEmpty()) {
- if (log.isDebugEnabled()) {
- log.debug("{} have been stopped previously by policy,
restarting.", suspendedRoutes.size());
- }
- for (Route suspended : suspendedRoutes) {
- startConsumer(suspended.getConsumer());
- }
- suspendedRoutes.clear();
- }
-
- } catch (Exception e) {
- handleException(e);
- } finally {
- lock.unlock();
- }
- }
-
- public boolean isShouldStopConsumer() {
- return shouldStopConsumer;
- }
-
- public void setShouldStopConsumer(boolean shouldStopConsumer) {
- this.shouldStopConsumer = shouldStopConsumer;
- }
-}
diff --git
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverCuratorLeaderRoutePolicyTest.java
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverCuratorLeaderRoutePolicyTest.java
deleted file mode 100644
index 28ab4ae..0000000
---
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverCuratorLeaderRoutePolicyTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.hamcrest.CoreMatchers.is;
-
-public class FailoverCuratorLeaderRoutePolicyTest extends ZooKeeperTestSupport
{
- public static final String ZNODE = "/curatorleader";
- public static final String BASE_ZNODE = "/someapp";
- private static final Logger LOG =
LoggerFactory.getLogger(FailoverCuratorLeaderRoutePolicyTest.class);
-
-
- protected CamelContext createCamelContext() throws Exception {
- disableJMX();
- return super.createCamelContext();
- }
-
- @Test
- public void masterSlaveScenarioContolledByPolicy() throws Exception {
- ZookeeperPolicyEnforcedContext master =
createEnforcedContext("master");
- ZookeeperPolicyEnforcedContext slave = createEnforcedContext("slave");
- Thread.sleep(5000);
- // Send messages to the master and the slave.
- // The route is enabled in the master and gets through, but that sent
to
- // the slave context is rejected.
- master.sendMessageToEnforcedRoute("message for master", 1);
- slave.sendMessageToEnforcedRoute("message for slave", 0);
-
- // trigger failover by killing the master... then assert that the slave
- // can now receive messages.
- master.shutdown();
- slave.sendMessageToEnforcedRoute("second message for slave", 1);
- slave.shutdown();
- }
-
-
- @Test
- public void ensureRoutesDoNotStartBeforeElection() throws Exception {
- DefaultCamelContext context = new DefaultCamelContext();
-
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- CuratorLeaderRoutePolicy policy = new
CuratorLeaderRoutePolicy("zookeeper:localhost:" + getServerPort() + BASE_ZNODE
+ ZNODE + 2);
-
from("timer://foo?fixedRate=true&period=5").routePolicy(policy).id("single_route").autoStartup(true).to("mock:controlled");
- }
- });
- context.start();
- // this check verifies that a route marked as autostartable is not
started automatically. It will be the policy responsibility to eventually start
it.
-
assertThat(context.getRouteController().getRouteStatus("single_route").isStarted(),
is(false));
-
assertThat(context.getRouteController().getRouteStatus("single_route").isStarting(),
is(false));
-
- context.shutdown();
- }
-
- private static class ZookeeperPolicyEnforcedContext {
- private CamelContext controlledContext;
- private ProducerTemplate template;
- private MockEndpoint mock;
- private String routename;
-
- ZookeeperPolicyEnforcedContext(String name) throws Exception {
- controlledContext = new DefaultCamelContext();
- routename = name;
- template = controlledContext.createProducerTemplate();
- mock = controlledContext.getEndpoint("mock:controlled",
MockEndpoint.class);
- controlledContext.addRoutes(new FailoverRoute(name));
- controlledContext.start();
- }
-
- public void sendMessageToEnforcedRoute(String message, int expected)
throws InterruptedException {
- mock.expectedMessageCount(expected);
- try {
- template.sendBody("vm:" + routename, ExchangePattern.InOut,
message);
- } catch (Exception e) {
- if (expected > 0) {
- LOG.error(e.getMessage(), e);
- fail("Expected messages...");
- }
- }
- mock.await(2, TimeUnit.SECONDS);
- mock.assertIsSatisfied(2000);
- }
-
- public void shutdown() throws Exception {
- LoggerFactory.getLogger(getClass()).debug("stopping");
- controlledContext.stop();
- LoggerFactory.getLogger(getClass()).debug("stopped");
- }
- }
-
- private ZookeeperPolicyEnforcedContext createEnforcedContext(String name)
throws Exception, InterruptedException {
- ZookeeperPolicyEnforcedContext context = new
ZookeeperPolicyEnforcedContext(name);
- delay(1000);
- return context;
- }
-
- public static class FailoverRoute extends RouteBuilder {
-
- private String routename;
-
- public FailoverRoute(String routename) {
- // need names as if we use the same direct ep name in two contexts
- // in the same vm shutting down one context shuts the endpoint for
- // both.
- this.routename = routename;
- }
-
- public void configure() throws Exception {
- CuratorLeaderRoutePolicy policy = new
CuratorLeaderRoutePolicy("zookeeper:localhost:" + getServerPort() + BASE_ZNODE
+ ZNODE);
- from("vm:" +
routename).routePolicy(policy).id(routename).to("mock:controlled");
- }
- }
-}
diff --git
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverRoutePolicyTest.java
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverRoutePolicyTest.java
deleted file mode 100644
index 14e9d8a..0000000
---
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverRoutePolicyTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FailoverRoutePolicyTest extends ZooKeeperTestSupport {
- private static final Logger LOG =
LoggerFactory.getLogger(FailoverRoutePolicyTest.class);
-
- protected CamelContext createCamelContext() throws Exception {
- disableJMX();
- // set up the parent nodes used to control the election
- client.createPersistent("/someapp", "App node to contain policy
election nodes...");
- client.createPersistent("/someapp/somepolicy", "Node used by route
policy to control active routes...");
- return super.createCamelContext();
- }
-
- @Test
- public void masterSlaveScenarioContolledByPolicy() throws Exception {
- ZookeeperPolicyEnforcedContext tetrisisMasterOfBlocks =
createEnforcedContext("master");
- ZookeeperPolicyEnforcedContext slave = createEnforcedContext("slave");
-
- // http://bit.ly/9gTlGe ;). Send messages to the master and the slave.
- // The route is enabled in the master and gets through, but that sent
to
- // the slave context is rejected.
-
tetrisisMasterOfBlocks.sendMessageToEnforcedRoute("LIIIIIIIIIINNNNNNNNNEEEEEEE
PEEEEEEICCCE", 1);
- slave.sendMessageToEnforcedRoute("But lord there is no place for a
square!??!", 0);
-
- // trigger failover by killing the master... then assert that the slave
- // can now receive messages.
- tetrisisMasterOfBlocks.shutdown();
- slave.sendMessageToEnforcedRoute("What a cruel and angry god...", 1);
- }
-
- private static class ZookeeperPolicyEnforcedContext {
- private CamelContext controlledContext;
- private ProducerTemplate template;
- private MockEndpoint mock;
- private String routename;
-
- ZookeeperPolicyEnforcedContext(String name) throws Exception {
- controlledContext = new DefaultCamelContext();
- routename = name;
- template = controlledContext.createProducerTemplate();
- mock = controlledContext.getEndpoint("mock:controlled",
MockEndpoint.class);
- controlledContext.addRoutes(new FailoverRoute(name));
- controlledContext.start();
- }
-
- public void sendMessageToEnforcedRoute(String message, int expected)
throws InterruptedException {
- mock.expectedMessageCount(expected);
- try {
- template.sendBody("vm:" + routename, ExchangePattern.InOut,
message);
- } catch (Exception e) {
- if (expected > 0) {
- LOG.error(e.getMessage(), e);
- fail("Expected messages...");
- }
- }
- mock.await(2, TimeUnit.SECONDS);
- mock.assertIsSatisfied(1000);
- }
-
- public void shutdown() throws Exception {
- LoggerFactory.getLogger(getClass()).debug("stopping");
- controlledContext.stop();
- LoggerFactory.getLogger(getClass()).debug("stopped");
- }
- }
-
- private ZookeeperPolicyEnforcedContext createEnforcedContext(String name)
throws Exception, InterruptedException {
- ZookeeperPolicyEnforcedContext context = new
ZookeeperPolicyEnforcedContext(name);
- delay(1000);
- return context;
- }
-
- public static class FailoverRoute extends RouteBuilder {
-
- private String routename;
-
- public FailoverRoute(String routename) {
- // need names as if we use the same direct ep name in two contexts
- // in the same vm shutting down one context shuts the endpoint for
- // both.
- this.routename = routename;
- }
-
- public void configure() throws Exception {
- ZooKeeperRoutePolicy policy = new
ZooKeeperRoutePolicy("zookeeper:localhost:" + getServerPort() +
"/someapp/somepolicy", 1);
- from("vm:" +
routename).routePolicy(policy).id(routename).to("mock:controlled");
- }
- };
-}
diff --git
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/MultiMasterCuratorLeaderRoutePolicyTest.java
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/MultiMasterCuratorLeaderRoutePolicyTest.java
deleted file mode 100644
index 05afc3f..0000000
---
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/MultiMasterCuratorLeaderRoutePolicyTest.java
+++ /dev/null
@@ -1,461 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.model.Model;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.notNullValue;
-
-public class MultiMasterCuratorLeaderRoutePolicyTest extends
ZooKeeperTestSupport {
- public static final String ZNODE = "/multimaster";
- public static final String BASE_ZNODE = "/someapp";
- private static final Logger LOG =
LoggerFactory.getLogger(MultiMasterCuratorLeaderRoutePolicyTest.class);
-
-
- protected CamelContext createCamelContext() throws Exception {
- disableJMX();
- return super.createCamelContext();
- }
-
-
- @Test
- public void ensureRoutesDoNotStartAutomatically() throws Exception {
- DefaultCamelContext context = new DefaultCamelContext();
-
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- CuratorMultiMasterLeaderRoutePolicy policy = new
CuratorMultiMasterLeaderRoutePolicy("zookeeper:localhost:" + getServerPort() +
BASE_ZNODE + ZNODE + 2);
-
from("timer://foo?fixedRate=true&period=5").routePolicy(policy).id("single_route").autoStartup(true).to("mock:controlled");
- }
- });
- context.start();
- // this check verifies that a route marked as autostartable is not
started automatically. It will be the policy responsibility to eventually start
it.
-
assertThat(context.getRouteController().getRouteStatus("single_route").isStarted(),
is(false));
-
assertThat(context.getRouteController().getRouteStatus("single_route").isStarting(),
is(false));
- try {
- context.shutdown();
- } catch (Exception e) {
- //concurrency can raise some InterruptedException but we don't
really care in this scenario.
- }
- }
-
- @Test
- public void oneMasterOneSlaveScenarioContolledByPolicy() throws Exception {
- final String path = "oneMasterOneSlaveScenarioContolledByPolicy";
- final String firstDestination = "first" + System.currentTimeMillis();
- final String secondDestination = "second" + System.currentTimeMillis();
- final CountDownLatch waitForSecondRouteCompletedLatch = new
CountDownLatch(1);
- final int activeNodesDesired = 1;
-
- MultiMasterZookeeperPolicyEnforcedContext first =
createEnforcedContext(firstDestination, activeNodesDesired, path);
- DefaultCamelContext controlledContext = (DefaultCamelContext)
first.controlledContext;
- // get reference to the Policy object to check if it's already a master
- CuratorMultiMasterLeaderRoutePolicy routePolicy =
(CuratorMultiMasterLeaderRoutePolicy)
controlledContext.getRouteDefinition(firstDestination).getRoutePolicies().get(0);
-
- assertWeHaveMasters(routePolicy);
-
- LOG.info("Starting first CamelContext");
- final MultiMasterZookeeperPolicyEnforcedContext[] arr = new
MultiMasterZookeeperPolicyEnforcedContext[1];
-
- new Thread() {
- @Override
- public void run() {
- MultiMasterZookeeperPolicyEnforcedContext second = null;
- try {
- LOG.info("Starting second CamelContext in a separate
thread");
- second = createEnforcedContext(secondDestination,
activeNodesDesired, path);
- arr[0] = second;
- second.sendMessageToEnforcedRoute("message for second", 0);
- waitForSecondRouteCompletedLatch.countDown();
- } catch (Exception e) {
- LOG.error("Error in the thread controlling the second
context", e);
- fail("Error in the thread controlling the second context:
" + e.getMessage());
- }
-
-
- }
- }.start();
-
- first.sendMessageToEnforcedRoute("message for first", 1);
-
- waitForSecondRouteCompletedLatch.await(2, TimeUnit.MINUTES);
- LOG.info("Explicitly shutting down the first camel context.");
-
- LOG.info("Shutting down first con");
- first.shutdown();
-
- MultiMasterZookeeperPolicyEnforcedContext second = arr[0];
-
- DefaultCamelContext secondCamelContext = (DefaultCamelContext)
second.controlledContext;
-
assertWeHaveMasters((CuratorMultiMasterLeaderRoutePolicy)secondCamelContext.getRouteDefinition(secondDestination).getRoutePolicies().get(0));
-
- //second.mock = secondCamelContext.getEndpoint("mock:controlled",
MockEndpoint.class);
- second.sendMessageToEnforcedRoute("message for slave", 1);
- second.shutdown();
- }
-
-
- @Test
- public void oneMasterOneSlaveAndFlippedAgainScenarioContolledByPolicy()
throws Exception {
- final String path = "oneMasterOneSlaveScenarioContolledByPolicy";
- final String firstDestination = "first" + System.currentTimeMillis();
- final String secondDestination = "second" + System.currentTimeMillis();
- final CountDownLatch waitForSecondRouteCompletedLatch = new
CountDownLatch(1);
- final int activeNodeDesired = 1;
-
- MultiMasterZookeeperPolicyEnforcedContext first =
createEnforcedContext(firstDestination, activeNodeDesired, path);
- DefaultCamelContext controlledContext = (DefaultCamelContext)
first.controlledContext;
- // get reference to the Policy object to check if it's already a master
- CuratorMultiMasterLeaderRoutePolicy routePolicy =
(CuratorMultiMasterLeaderRoutePolicy)
controlledContext.getRouteDefinition(firstDestination).getRoutePolicies().get(0);
-
- assertWeHaveMasters(routePolicy);
-
- LOG.info("Starting first CamelContext");
- final MultiMasterZookeeperPolicyEnforcedContext[] arr = new
MultiMasterZookeeperPolicyEnforcedContext[1];
-
- new Thread() {
- @Override
- public void run() {
- MultiMasterZookeeperPolicyEnforcedContext slave = null;
- try {
- LOG.info("Starting second CamelContext in a separate
thread");
- slave = createEnforcedContext(secondDestination,
activeNodeDesired, path);
- arr[0] = slave;
- slave.sendMessageToEnforcedRoute("message for second", 0);
- waitForSecondRouteCompletedLatch.countDown();
- } catch (Exception e) {
- LOG.error("Error in the thread controlling the second
context", e);
- fail("Error in the thread controlling the second context:
" + e.getMessage());
- }
-
-
- }
- }.start();
-
- first.sendMessageToEnforcedRoute("message for first", 1);
-
- waitForSecondRouteCompletedLatch.await(2, TimeUnit.MINUTES);
- MultiMasterZookeeperPolicyEnforcedContext second = arr[0];
-
- LOG.info("Explicitly shutting down the first camel context.");
- first.shutdown();
-
- DefaultCamelContext secondCamelContext = (DefaultCamelContext)
second.controlledContext;
-
assertWeHaveMasters((CuratorMultiMasterLeaderRoutePolicy)secondCamelContext.getRouteDefinition(secondDestination).getRoutePolicies().get(0));
-
- CountDownLatch restartFirstLatch = new CountDownLatch(1);
- LOG.info("Start back first context");
- new Thread() {
- @Override
- public void run() {
- try {
- first.startup();
- restartFirstLatch.countDown();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }.start();
- restartFirstLatch.await();
- second.sendMessageToEnforcedRoute("message for second", 1);
- first.mock.reset();
- first.sendMessageToEnforcedRoute("message for first", 0);
- second.shutdown();
- controlledContext = (DefaultCamelContext) first.controlledContext;
- // get reference to the Policy object to check if it's already a master
- routePolicy = (CuratorMultiMasterLeaderRoutePolicy)
controlledContext.getRouteDefinition(firstDestination).getRoutePolicies().get(0);
- log.info("Asserting route is up. context: [{}]",
controlledContext.getName());
- assertWeHaveMasters(routePolicy);
- first.controlledContext.setTracing(true);
- first.mock = controlledContext.getEndpoint("mock:controlled",
MockEndpoint.class);
- first.sendMessageToEnforcedRoute("message for first", 1);
- first.shutdown();
- }
-
-
-
-
- @Test
- public void oneMasterTwoSlavesScenarioContolledByPolicy() throws Exception
{
- final String path = "oneMasterTwoSlavesScenarioContolledByPolicy";
- final String master = "master" + System.currentTimeMillis();
- final String secondDestination = "second" + System.currentTimeMillis();
- final String thirdDestination = "third" + System.currentTimeMillis();
- final CountDownLatch waitForNonActiveRoutesLatch = new
CountDownLatch(2);
- final int activeNodesDesired = 1;
-
- LOG.info("Starting first CamelContext");
- MultiMasterZookeeperPolicyEnforcedContext first =
createEnforcedContext(master, activeNodesDesired, path);
- DefaultCamelContext controlledContext = (DefaultCamelContext)
first.controlledContext;
- // get reference to the Policy object to check if it's already a master
- CuratorMultiMasterLeaderRoutePolicy routePolicy =
(CuratorMultiMasterLeaderRoutePolicy)
controlledContext.getRouteDefinition(master).getRoutePolicies().get(0);
-
- assertWeHaveMasters(routePolicy);
-
- final MultiMasterZookeeperPolicyEnforcedContext[] arr = new
MultiMasterZookeeperPolicyEnforcedContext[2];
-
- new Thread() {
- @Override
- public void run() {
- MultiMasterZookeeperPolicyEnforcedContext second = null;
- try {
- LOG.info("Starting second CamelContext");
- second = createEnforcedContext(secondDestination,
activeNodesDesired, path);
- arr[0] = second;
- second.sendMessageToEnforcedRoute("message for second", 0);
- waitForNonActiveRoutesLatch.countDown();
- } catch (Exception e) {
- LOG.error("Error in the thread controlling the second
context", e);
- fail("Error in the thread controlling the second context:
" + e.getMessage());
- }
-
-
- }
- }.start();
-
- new Thread() {
- @Override
- public void run() {
- MultiMasterZookeeperPolicyEnforcedContext third = null;
- try {
- LOG.info("Starting third CamelContext");
- third = createEnforcedContext(thirdDestination,
activeNodesDesired, path);
- arr[1] = third;
- third.sendMessageToEnforcedRoute("message for third", 0);
- waitForNonActiveRoutesLatch.countDown();
- } catch (Exception e) {
- LOG.error("Error in the thread controlling the third
context", e);
- fail("Error in the thread controlling the third context: "
+ e.getMessage());
- }
-
-
- }
- }.start();
-
- // Send messages to the master and the slave.
- // The route is enabled in the master and gets through, but that sent
to
- // the slave context is rejected.
- first.sendMessageToEnforcedRoute("message for master", 1);
-
- waitForNonActiveRoutesLatch.await();
- LOG.info("Explicitly shutting down the first camel context.");
- // trigger failover by killing the master..
- first.shutdown();
- // let's find out who's active now:
-
- CuratorMultiMasterLeaderRoutePolicy routePolicySecond =
(CuratorMultiMasterLeaderRoutePolicy)
arr[0].controlledContext.getExtension(Model.class)
-
.getRouteDefinition(secondDestination).getRoutePolicies().get(0);
- CuratorMultiMasterLeaderRoutePolicy routePolicyThird =
(CuratorMultiMasterLeaderRoutePolicy)
arr[1].controlledContext.getExtension(Model.class)
-
.getRouteDefinition(thirdDestination).getRoutePolicies().get(0);
-
- MultiMasterZookeeperPolicyEnforcedContext newMaster = null;
- MultiMasterZookeeperPolicyEnforcedContext slave = null;
-
- final int maxWait = 20;
- for (int i = 0; i < maxWait; i++) {
- if (routePolicySecond.getElection().isMaster()) {
- newMaster = arr[0];
- slave = arr[1];
- LOG.info("[second] is the new master");
- break;
- } else if (routePolicyThird.getElection().isMaster()) {
- newMaster = arr[1];
- slave = arr[0];
- LOG.info("[third] is the new master");
- break;
- } else {
- Thread.sleep(2000);
- LOG.info("waiting for a new master to be elected");
- }
- }
- assertThat(newMaster, is(notNullValue()));
-
- newMaster.sendMessageToEnforcedRoute("message for second", 1);
- slave.sendMessageToEnforcedRoute("message for third", 0);
- slave.shutdown();
- newMaster.shutdown();
- }
-
-
- @Test
- public void twoMasterOneSlavesScenarioContolledByPolicy() throws Exception
{
- final String path = "twoMasterOneSlavesScenarioContolledByPolicy";
- final String firstDestination = "first" + System.currentTimeMillis();
- final String secondDestination = "second" + System.currentTimeMillis();
- final String thirdDestination = "third" + System.currentTimeMillis();
- final CountDownLatch waitForThirdRouteCompletedLatch = new
CountDownLatch(1);
- final int activeNodeDesired = 2;
-
- MultiMasterZookeeperPolicyEnforcedContext first =
createEnforcedContext(firstDestination, activeNodeDesired, path);
- DefaultCamelContext firstControlledContext = (DefaultCamelContext)
first.controlledContext;
- CuratorMultiMasterLeaderRoutePolicy firstRoutePolicy =
(CuratorMultiMasterLeaderRoutePolicy)
firstControlledContext.getRouteDefinition(firstDestination).getRoutePolicies().get(0);
-
- MultiMasterZookeeperPolicyEnforcedContext second =
createEnforcedContext(secondDestination, activeNodeDesired, path);
- DefaultCamelContext secondControlledContext = (DefaultCamelContext)
second.controlledContext;
- CuratorMultiMasterLeaderRoutePolicy secondRoutePolicy =
(CuratorMultiMasterLeaderRoutePolicy)
secondControlledContext.getRouteDefinition(secondDestination).getRoutePolicies().get(0);
-
- assertWeHaveMasters(firstRoutePolicy, secondRoutePolicy);
-
- final MultiMasterZookeeperPolicyEnforcedContext[] arr = new
MultiMasterZookeeperPolicyEnforcedContext[1];
-
-
- new Thread() {
- @Override
- public void run() {
- MultiMasterZookeeperPolicyEnforcedContext third = null;
- try {
- LOG.info("Starting third CamelContext");
- third = createEnforcedContext(thirdDestination,
activeNodeDesired, path);
- arr[0] = third;
- third.sendMessageToEnforcedRoute("message for third", 0);
- waitForThirdRouteCompletedLatch.countDown();
- } catch (Exception e) {
- LOG.error("Error in the thread controlling the third
context", e);
- fail("Error in the thread controlling the third context: "
+ e.getMessage());
- }
-
-
- }
- }.start();
-
- first.sendMessageToEnforcedRoute("message for first", 1);
- second.sendMessageToEnforcedRoute("message for second", 1);
-
-
- waitForThirdRouteCompletedLatch.await();
-
- LOG.info("Explicitly shutting down the first camel context.");
- first.shutdown();
-
-
- arr[0].sendMessageToEnforcedRoute("message for third", 1);
- second.shutdown();
- arr[0].shutdown();
- }
-
- void assertWeHaveMasters(CuratorMultiMasterLeaderRoutePolicy...
routePolicies) throws InterruptedException {
- final int maxWait = 20;
- boolean global = false;
- for (int i = 0; i < maxWait; i++) {
- boolean iteration = true;
- for (CuratorMultiMasterLeaderRoutePolicy policy : routePolicies) {
- log.info("Policy: {}, master: {}", policy,
policy.getElection().isMaster());
- iteration = iteration & policy.getElection().isMaster();
- }
- if (iteration) {
- LOG.info("the number of required active routes is available");
- global = true;
- break;
- } else {
- Thread.sleep(2000);
- LOG.info("waiting routes to become leader and be activated.");
- }
- }
- if (!global) {
- fail("The expected number of route never became master");
- }
- }
-
-
- private class MultiMasterZookeeperPolicyEnforcedContext {
- CamelContext controlledContext;
- ProducerTemplate template;
- MockEndpoint mock;
- String routename;
- String path;
-
- MultiMasterZookeeperPolicyEnforcedContext(String name, int
activeNodesDesired, String path) throws Exception {
- controlledContext = new DefaultCamelContext();
- routename = name;
- this.path = path;
- template = controlledContext.createProducerTemplate();
- mock = controlledContext.getEndpoint("mock:controlled",
MockEndpoint.class);
- controlledContext.addRoutes(new FailoverRoute(name,
activeNodesDesired, path));
- controlledContext.start();
- }
-
- public void sendMessageToEnforcedRoute(String message, int expected)
throws InterruptedException {
- mock.expectedMessageCount(expected);
- try {
- LOG.info("Sending message to: {}", "vm:" + routename);
- template.sendBody("vm:" + routename, ExchangePattern.InOut,
message);
- } catch (Exception e) {
- if (expected > 0) {
- LOG.error(e.getMessage(), e);
- fail("Expected messages...");
- }
- }
- mock.await(2, TimeUnit.SECONDS);
- mock.assertIsSatisfied(2000);
- }
-
- public void shutdown() throws Exception {
- LoggerFactory.getLogger(getClass()).debug("stopping");
- controlledContext.stop();
- LoggerFactory.getLogger(getClass()).debug("stopped");
- }
-
-
- public void startup() throws Exception {
- LoggerFactory.getLogger(getClass()).debug("starting");
- controlledContext.start();
- LoggerFactory.getLogger(getClass()).debug("started");
- }
- }
-
- private MultiMasterZookeeperPolicyEnforcedContext
createEnforcedContext(String name, int activeNodesDesired, String path) throws
Exception, InterruptedException {
- MultiMasterZookeeperPolicyEnforcedContext context = new
MultiMasterZookeeperPolicyEnforcedContext(name, activeNodesDesired, path);
- delay(1000);
- return context;
- }
-
- public class FailoverRoute extends RouteBuilder {
-
- private String path;
- private String routename;
- private int activeNodesDesired;
-
- public FailoverRoute(String routename, int activeNodesDesired, String
path) {
- // need names as if we use the same direct ep name in two contexts
- // in the same vm shutting down one context shuts the endpoint for
- // both.
- this.routename = routename;
- this.activeNodesDesired = activeNodesDesired;
- this.path = path;
- }
-
- public void configure() throws Exception {
- CuratorMultiMasterLeaderRoutePolicy policy = new
CuratorMultiMasterLeaderRoutePolicy("zookeeper:localhost:" + getServerPort() +
BASE_ZNODE + ZNODE + "/" + path, this.activeNodesDesired);
- from("vm:" +
routename).routePolicy(policy).id(routename).to("mock:controlled");
- }
- }
-}
diff --git
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperDoubleRouteAndDoublePolicyTest.java
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperDoubleRouteAndDoublePolicyTest.java
deleted file mode 100644
index a1ad16b..0000000
---
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperDoubleRouteAndDoublePolicyTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
-import org.junit.Test;
-
-public class ZookeeperDoubleRouteAndDoublePolicyTest extends
ZooKeeperTestSupport {
-
- @Test
- public void routeDoublePoliciesAndTwoRoutes() throws Exception {
- // set up the parent used to control the election
- client.createPersistent("/someapp", "App node to contain policy
election nodes...");
- client.createPersistent("/someapp/somepolicy", "Policy node used by
route policy to control routes...");
- client.createPersistent("/someapp/someotherpolicy", "Policy node used
by route policy to control routes...");
- context.addRoutes(new ZooKeeperPolicyEnforcedRoute());
-
- MockEndpoint mockedpolicy = getMockEndpoint("mock:controlled");
- mockedpolicy.setExpectedMessageCount(1);
- sendBody("direct:policy-controlled", "This is a test");
- mockedpolicy.await(5, TimeUnit.SECONDS);
- mockedpolicy.assertIsSatisfied();
-
- MockEndpoint mockedpolicy1 = getMockEndpoint("mock:controlled-1");
- mockedpolicy1.setExpectedMessageCount(1);
- sendBody("direct:policy-controlled-1", "This is a test");
- mockedpolicy1.await(5, TimeUnit.SECONDS);
- mockedpolicy1.assertIsSatisfied();
- }
-
- public static class ZooKeeperPolicyEnforcedRoute extends RouteBuilder {
- public void configure() throws Exception {
- ZooKeeperRoutePolicy policy = new
ZooKeeperRoutePolicy("zookeeper:localhost:" + getServerPort() +
"/someapp/somepolicy", 1);
-
from("direct:policy-controlled").routePolicy(policy).to("mock:controlled");
- ZooKeeperRoutePolicy policy2 = new
ZooKeeperRoutePolicy("zookeeper:localhost:" + getServerPort() +
"/someapp/someotherpolicy", 1);
-
from("direct:policy-controlled-1").routePolicy(policy2).to("mock:controlled-1");
- }
- };
-}
diff --git
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperElectionTest.java
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperElectionTest.java
deleted file mode 100644
index 7814c6f..0000000
---
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperElectionTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ZookeeperElectionTest extends ZooKeeperTestSupport {
- private static final Logger LOG =
LoggerFactory.getLogger(ZookeeperElectionTest.class);
-
- private static final String NODE_BASE_KEY = "/someapp";
- private static final String NODE_PARTICULAR_KEY = "/someapp/somepolicy";
-
- private CamelContext candidateOneContext;
- private CamelContext candidateTwoContext;
-
- @Before
- public void before() throws Exception {
- // set up the parent used to control the election
- client.createPersistent(NODE_BASE_KEY, "App node to contain policy
election nodes...");
- client.createPersistent(NODE_PARTICULAR_KEY, "Policy node used by
route policy to control routes...");
- }
-
- @After
- public void after() throws Exception {
- client.deleteAll(NODE_PARTICULAR_KEY);
- client.delete(NODE_BASE_KEY);
-
- if (candidateOneContext != null) {
- candidateOneContext.stop();
- }
- if (candidateTwoContext != null) {
- candidateTwoContext.stop();
- }
- }
-
- private String getElectionUri() {
- return "zookeeper:localhost:" + getServerPort() +
"/someapp/somepolicy";
- }
-
- @Test
- public void masterCanBeElected() throws Exception {
- ZooKeeperElection candidate = new ZooKeeperElection(template, context,
getElectionUri(), 1);
- assertTrue("The only election candidate was not elected as master.",
candidate.isMaster());
- }
-
- @Test
- public void masterAndSlave() throws Exception {
- candidateOneContext = createNewContext();
- candidateTwoContext = createNewContext();
-
- ZooKeeperElection electionCandidate1 =
createElectionCandidate(candidateOneContext, 1);
- assertTrue("The first candidate was not elected.",
electionCandidate1.isMaster());
- ZooKeeperElection electionCandidate2 =
createElectionCandidate(candidateTwoContext, 1);
- assertFalse("The second candidate should not have been elected.",
electionCandidate2.isMaster());
- }
-
- @Test
- public void testMasterGoesAway() throws Exception {
- candidateOneContext = createNewContext();
- candidateTwoContext = createNewContext();
-
- ZooKeeperElection electionCandidate1 =
createElectionCandidate(candidateOneContext, 1);
- assertTrue("The first candidate was not elected.",
electionCandidate1.isMaster());
- ZooKeeperElection electionCandidate2 =
createElectionCandidate(candidateTwoContext, 1);
- assertFalse("The second candidate should not have been elected.",
electionCandidate2.isMaster());
-
- LOG.debug("About to shutdown the first candidate.");
-
- candidateOneContext.stop(); // the first candidate was killed.
- assertIsMaster(electionCandidate2);
- }
-
- @Test
- public void testDualMaster() throws Exception {
- candidateOneContext = createNewContext();
- candidateTwoContext = createNewContext();
-
- ZooKeeperElection electionCandidate1 =
createElectionCandidate(candidateOneContext, 2);
- assertTrue("The first candidate was not elected.",
electionCandidate1.isMaster());
-
- ZooKeeperElection electionCandidate2 =
createElectionCandidate(candidateTwoContext, 2);
- assertIsMaster(electionCandidate2);
- }
-
- @Test
- public void testWatchersAreNotified() throws Exception {
- candidateOneContext = createNewContext();
- candidateTwoContext = createNewContext();
-
- final AtomicBoolean notified = new AtomicBoolean(false);
- ElectionWatcher watcher = new ElectionWatcher() {
- @Override public void electionResultChanged() {
- notified.set(true);
- }
- };
-
- ZooKeeperElection electionCandidate1 =
createElectionCandidate(candidateOneContext, 2);
- assertTrue("The first candidate was not elected.",
electionCandidate1.isMaster());
- electionCandidate1.addElectionWatcher(watcher);
- ZooKeeperElection electionCandidate2 =
createElectionCandidate(candidateTwoContext, 2);
- electionCandidate2.isMaster();
- assertTrue("The first candidate should have had it's watcher
notified", notified.get());
- }
-
- private DefaultCamelContext createNewContext() throws Exception {
- DefaultCamelContext controlledContext = new DefaultCamelContext();
- controlledContext.start();
- return controlledContext;
- }
-
- private ZooKeeperElection createElectionCandidate(final CamelContext
context, int masterCount) {
- return new ZooKeeperElection(context.createProducerTemplate(),
context, getElectionUri(), masterCount);
- }
-
- private void assertIsMaster(ZooKeeperElection electionCandidate) throws
InterruptedException {
- // Need to wait for a while to be elected.
- long timeout = System.currentTimeMillis() + 20000;
-
- while (!electionCandidate.isMaster() && timeout >
System.currentTimeMillis()) {
- Thread.sleep(200);
- }
-
- assertTrue("The candidate should have been elected.",
electionCandidate.isMaster());
- }
-}
diff --git
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperRoutePolicyTest.java
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperRoutePolicyTest.java
deleted file mode 100644
index f75f152..0000000
---
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperRoutePolicyTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
-import org.junit.Test;
-
-public class ZookeeperRoutePolicyTest extends ZooKeeperTestSupport {
-
- @Test
- public void routeCanBeControlledByPolicy() throws Exception {
- // set up the parent used to control the election
- client.createPersistent("/someapp", "App node to contain policy
election nodes...");
- client.createPersistent("/someapp/somepolicy", "Policy node used by
route policy to control routes...");
- context.addRoutes(new ZooKeeperPolicyEnforcedRoute());
-
- MockEndpoint mock = getMockEndpoint("mock:controlled");
- mock.setExpectedMessageCount(1);
- sendBody("direct:policy-controlled", "This is a test");
- mock.await(5, TimeUnit.SECONDS);
- mock.assertIsSatisfied();
- }
-
- public static class ZooKeeperPolicyEnforcedRoute extends RouteBuilder {
- public void configure() throws Exception {
- ZooKeeperRoutePolicy policy = new
ZooKeeperRoutePolicy("zookeeper:localhost:" + getServerPort() +
"/someapp/somepolicy", 1);
-
from("direct:policy-controlled").routePolicy(policy).to("mock:controlled");
- }
- };
-}