This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 24fbc3c  CAMEL-13580: Remove zookeeper route policy as there are 
better alternatives.
24fbc3c is described below

commit 24fbc3c6c4db4868b571e712170a7abcb3be2e6d
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Jun 3 20:46:03 2019 +0200

    CAMEL-13580: Remove zookeeper route policy as there are better alternatives.
---
 MIGRATION.md                                       |   2 +
 .../src/main/docs/zookeeper-component.adoc         |  67 ---
 .../zookeeper/policy/CuratorLeaderElection.java    | 161 -------
 .../zookeeper/policy/CuratorLeaderRoutePolicy.java | 170 --------
 .../policy/CuratorMultiMasterLeaderElection.java   | 171 --------
 .../CuratorMultiMasterLeaderRoutePolicy.java       | 191 ---------
 .../zookeeper/policy/ElectionWatcher.java          |  27 --
 .../zookeeper/policy/ZooKeeperElection.java        | 253 -----------
 .../zookeeper/policy/ZooKeeperRoutePolicy.java     | 170 --------
 .../FailoverCuratorLeaderRoutePolicyTest.java      | 141 -------
 .../zookeeper/policy/FailoverRoutePolicyTest.java  | 118 ------
 .../MultiMasterCuratorLeaderRoutePolicyTest.java   | 461 ---------------------
 .../ZookeeperDoubleRouteAndDoublePolicyTest.java   |  57 ---
 .../zookeeper/policy/ZookeeperElectionTest.java    | 148 -------
 .../zookeeper/policy/ZookeeperRoutePolicyTest.java |  48 ---
 15 files changed, 2 insertions(+), 2183 deletions(-)

diff --git a/MIGRATION.md b/MIGRATION.md
index 05e0a66..0431a17 100644
--- a/MIGRATION.md
+++ b/MIGRATION.md
@@ -88,6 +88,8 @@ We have removed all deprecated components from Camel 2.x, 
also including the old
 
 We removed `camel-jibx` component which wasn't working on JDK 8.
 
+The `camel-zookeeper` has its route policy functionality removed, instead use 
`ZooKeeperClusterService` or the `camel-zookeeper-master` component.
+
 ### Renamed components
 
 The `test` component has been renamed to `dataset-test` and moved out of 
`camel-core` into `camel-dataset` JAR.
diff --git a/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc 
b/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc
index 1351b68..92720fb 100644
--- a/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc
+++ b/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc
@@ -12,9 +12,6 @@ following features to Camel:
 being set must be convertible to `byte[]`).
 3.  Create and retrieve the list the child nodes attached to a
 particular node.
-4.  A Distributed `RoutePolicy` that leverages a
-Leader election coordinated by ZooKeeper to determine if exchanges
-should get processed.
 
 Maven users will need to add the following dependency to their `pom.xml`
 for this component:
@@ -306,67 +303,3 @@ Object testPayload = ...
 template.sendBodyAndHeader("direct:create-and-write-to-persistent-znode", 
testPayload, "CamelZooKeeperCreateMode", "PERSISTENT");
 ----
 
-### ZooKeeper enabled Route policies
-
-ZooKeeper allows for very simple and effective leader election out of
-the box. This component exploits this election capability in a
-`RoutePolicy` to control when and how routes are
-enabled. This policy would typically be used in fail-over scenarios, to
-control identical instances of a route across a cluster of Camel based
-servers. A very common scenario is a simple 'Master-Slave' setup where
-there are multiple instances of a route distributed across a cluster but
-only one of them, that of the master, should be running at a time. If
-the master fails, a new master should be elected from the available
-slaves and the route in this new master should be started.
-
-The policy uses a common _znode_ path across all instances of the
-`RoutePolicy` that will be involved in the election. Each policy writes
-its id into this node and Zookeeper will order the writes in the order
-it received them. The policy then reads the listing of the node to see
-what position of its id; this position is used to determine if the route
-should be started or not. The policy is configured at startup with the
-number of route instances that should be started across the cluster and
-if its position in the list is less than this value then its route will
-be started. For a Master-slave scenario, the route is configured with 1
-route instance and only the first entry in the listing will start its
-route. All policies watch for updates to the listing and if the listing
-changes they recalculate if their route should be started. For more info
-on Zookeeper's leader election capability see
-http://zookeeper.apache.org/doc/trunk/recipes.html#sc_leaderElection[this
-page].
-
-The following example uses the node `/someapplication/somepolicy` for
-the election and is set up to start only the top '1' entries in the node
-listing i.e. elect a master:
-
-[source,java]
-----
-ZooKeeperRoutePolicy policy = new 
ZooKeeperRoutePolicy("zookeeper:localhost:39913/someapp/somepolicy", 1);
-from("direct:policy-controlled")
-    .routePolicy(policy)
-    .to("mock:controlled");
-----
-
-There are currently 3 policies defined in the component, with different SLAs:
-
-* `ZooKeeperRoutePolicy`
-* `CuratorLeaderRoutePolicy` (since *2.19*)
-* `MultiMasterCuratorLeaderRoutePolicy` (since *2.19*)
-
-*ZooKeeperRoutePolicy* supports multiple active nodes, but it's activation 
kicks in only after a Camel component and its correspondent Consumer have 
already been started,
- this introduces, depending on your routes definition, the risk that you 
component can already start consuming events and producing `Exchange`s, before 
the policy could estabilish
- that the node should not be activated.
-
-*CuratorLeaderRoutePolicy* supports only a single active node, but it's bound 
to a different `CamelContext` lifecycle method; this Policy kicks in before any 
route or consumer is started
- thus you can be sure that no even is processed before the Policy takes its 
decision.
-
-*MultiMasterCuratorLeaderRoutePolicy* support multiple active nodes, and it's 
bound to the same lifecycle method as `CuratorLeaderRoutePolicy`; this Policy 
kicks in before any route or consumer is started
- thus you can be sure that no even is processed before the Policy takes its 
decision.
-
-
-### See Also
-
-* Configuring Camel
-* Component
-* Endpoint
-* Getting Started
diff --git 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorLeaderElection.java
 
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorLeaderElection.java
deleted file mode 100644
index 5a4d35c..0000000
--- 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorLeaderElection.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.StatefulService;
-import org.apache.camel.impl.engine.JavaUuidGenerator;
-import org.apache.camel.spi.UuidGenerator;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.leader.LeaderSelector;
-import 
org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <code>CuratorLeaderElection</code> uses the leader election capabilities of 
a
- * ZooKeeper cluster to control which nodes are enabled. It is typically used 
in
- * fail-over scenarios controlling identical instances of an application across
- * a cluster of Camel based servers. <p> The election is configured with a 
single
- * server that should be marked as master.
- * <p> All instances of the election must also be configured with the same 
path on the ZooKeeper
- * cluster where the election will be carried out. It is good practice for this
- * to indicate the application e.g. <tt>/someapplication/someroute/</tt> note
- * that these nodes should exist before using the election. <p> See <a
- * 
href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection";>
- * for more on how Leader election</a> is archived with ZooKeeper.
- */
-public class CuratorLeaderElection {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(CuratorLeaderElection.class);
-    private final CamelContext camelContext;
-    private final String uri;
-
-    private final String candidateName;
-    private final Lock lock = new ReentrantLock();
-    private final CountDownLatch electionComplete = new CountDownLatch(1);
-    private final List<ElectionWatcher> watchers = new ArrayList<>();
-    private AtomicBoolean masterNode = new AtomicBoolean(false);
-    private volatile boolean isCandidateCreated;
-    private int enabledCount = 1;
-    private UuidGenerator uuidGenerator = new JavaUuidGenerator();
-    private LeaderSelector leaderSelector;
-    private CuratorFramework client;
-
-    public CuratorLeaderElection(CamelContext camelContext, String uri) {
-        this.camelContext = camelContext;
-        this.uri = uri;
-        this.candidateName = createCandidateName();
-
-        String connectionString = uri.substring(1 + 
uri.indexOf(':')).split("/")[0];
-        String protocol = uri.substring(0, uri.indexOf(':'));
-        String path = uri.replace(protocol + ":" + connectionString, "");
-        client = CuratorFrameworkFactory.newClient(connectionString, new 
ExponentialBackoffRetry(1000, 3));
-        client.start();
-
-        leaderSelector = new LeaderSelector(client, path, new 
CamelLeaderElectionListener());
-        leaderSelector.start();
-    }
-
-    // stolen from org/apache/camel/processor/CamelInternalProcessor
-    public static boolean isCamelStopping(CamelContext context) {
-        if (context instanceof StatefulService) {
-            StatefulService ss = (StatefulService) context;
-            return ss.isStopping() || ss.isStopped();
-        }
-        return false;
-    }
-
-    public void shutdownClients() {
-        try {
-            leaderSelector.close();
-        } finally {
-            client.close();
-        }
-    }
-
-    public boolean isMaster() {
-        return masterNode.get();
-    }
-
-    private String createCandidateName() {
-        StringBuilder builder = new StringBuilder();
-        try {
-            /* UUID would be enough, also using hostname for human readability 
*/
-            builder.append(InetAddress.getLocalHost().getCanonicalHostName());
-        } catch (UnknownHostException ex) {
-            LOG.warn("Failed to get the local hostname.", ex);
-            builder.append("unknown-host");
-        }
-        builder.append("-").append(uuidGenerator.generateUuid());
-        return builder.toString();
-    }
-
-    public String getCandidateName() {
-        return candidateName;
-    }
-
-    private void notifyElectionWatchers() {
-        for (ElectionWatcher watcher : watchers) {
-            try {
-                watcher.electionResultChanged();
-            } catch (Exception e) {
-                LOG.warn("Election watcher " + watcher + " of type " + 
watcher.getClass() + " threw an exception.", e);
-            }
-        }
-    }
-
-    public boolean addElectionWatcher(ElectionWatcher e) {
-        return watchers.add(e);
-    }
-
-    class CamelLeaderElectionListener extends LeaderSelectorListenerAdapter {
-
-        @Override
-        public void takeLeadership(CuratorFramework curatorFramework) throws 
Exception {
-            masterNode.set(true);
-            LOG.info("{} is now leader", getCandidateName());
-            notifyElectionWatchers();
-
-            // this is supposed to never return as long as it wants to keep 
its own leader status
-            while (!isCamelStopping(camelContext)) {
-                try {
-                    Thread.sleep(5000);
-                } catch (InterruptedException e) {
-                    Thread.interrupted();
-                    break;
-                }
-            }
-            masterNode.set(false);
-            LOG.info("{} has given up its own leadership", getCandidateName());
-            notifyElectionWatchers();
-        }
-    }
-}
-
-
diff --git 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorLeaderRoutePolicy.java
 
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorLeaderRoutePolicy.java
deleted file mode 100644
index cf3af30..0000000
--- 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorLeaderRoutePolicy.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.camel.NonManagedService;
-import org.apache.camel.Route;
-import org.apache.camel.model.RouteDefinition;
-import org.apache.camel.support.RoutePolicySupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-
-/**
- * <code>CuratorLeaderRoutePolicy</code> uses Apache Curator LeaderElection 
recipe to implement the behavior of having
- * at max 1 instance of a route, controlled by a specific policy, running. It 
is typically used in
- * fail-over scenarios controlling identical instances of a route across a
- * cluster of Camel based servers.
- * <p>
- * The policy affects the normal startup lifecycle of CamelContext and Routes, 
automatically set autoStart property of
- * routes controlled by this policy to false.
- * After Curator recipe identifies the current Policy instance as the Leader 
between a set of clients that are
- * competing for the role, it will start the route, and only at that moment 
the route will start its business.
- * This specific behavior is designed to avoid scenarios where such a policy 
would kick in only after a route had
- * already been started, with the risk, for consumers for example, that some 
source event might have already been
- * consumed.
- * <p>
- * All instances of the policy must also be configured with the same path on 
the
- * ZooKeeper cluster where the election will be carried out. It is good 
practice
- * for this to indicate the application e.g. 
<tt>/someapplication/someroute/</tt> note
- * that these nodes should exist before using the policy.
- * <p>
- * See <a 
href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection";>
- *     for more on how Leader election</a> is archived with ZooKeeper.
- */
-public class CuratorLeaderRoutePolicy extends RoutePolicySupport implements 
ElectionWatcher, NonManagedService {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(CuratorLeaderRoutePolicy.class);
-    private final String uri;
-    private final Lock lock = new ReentrantLock();
-    private final Set<Route> suspendedRoutes = new CopyOnWriteArraySet<>();
-    private final AtomicBoolean shouldProcessExchanges = new AtomicBoolean();
-    private volatile boolean shouldStopRoute = true;
-
-    private final Lock electionLock = new ReentrantLock();
-    private CuratorLeaderElection election;
-
-    public CuratorLeaderRoutePolicy(String uri) {
-        this.uri = uri;
-    }
-
-    public CuratorLeaderRoutePolicy(CuratorLeaderElection election) {
-        this.election = election;
-        this.uri = null;
-    }
-
-    @Override
-    public void onInit(Route route) {
-        ensureElectionIsCreated(route);
-        LOG.info("Route managed by {}. Setting route {} AutoStartup flag to 
false.", this.getClass(), route.getId());
-        route.getRouteContext().setAutoStartup(false);
-        ensureElectionIsCreated(route);
-
-        if (election.isMaster()) {
-            if (shouldStopRoute) {
-                startManagedRoute(route);
-            }
-        } else {
-            if (shouldStopRoute) {
-                stopManagedRoute(route);
-            }
-        }
-    }
-
-    private void ensureElectionIsCreated(Route route) {
-        if (election == null) {
-            electionLock.lock();
-            try {
-                if (election == null) { // re-test
-                    election = new 
CuratorLeaderElection(route.getCamelContext(), uri);
-                    election.addElectionWatcher(this);
-                }
-            } finally {
-                electionLock.unlock();
-            }
-        }
-    }
-
-    private void startManagedRoute(Route route) {
-        try {
-            lock.lock();
-            if (suspendedRoutes.contains(route)) {
-                startRoute(route);
-                suspendedRoutes.remove(route);
-            }
-        } catch (Exception e) {
-            handleException(e);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    private void stopManagedRoute(Route route) {
-        try {
-            lock.lock();
-            // check that we should still suspend once the lock is acquired
-            if (!suspendedRoutes.contains(route) && 
!shouldProcessExchanges.get()) {
-                stopRoute(route);
-                suspendedRoutes.add(route);
-            }
-        } catch (Exception e) {
-            handleException(e);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    @Override
-    public void electionResultChanged() {
-        if (election.isMaster()) {
-            startAllStoppedRoutes();
-        }
-    }
-
-    private void startAllStoppedRoutes() {
-        try {
-            lock.lock();
-            if (!suspendedRoutes.isEmpty()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("{} route(s) have been stopped previously by 
policy, restarting.", suspendedRoutes.size());
-                }
-                for (Route suspended : suspendedRoutes) {
-                    log.debug("Starting route {}.", suspended.getId());
-                    startRoute(suspended);
-                }
-                suspendedRoutes.clear();
-            }
-
-        } catch (Exception e) {
-            handleException(e);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    @Override
-    protected void doShutdown() throws Exception {
-        election.shutdownClients();
-    }
-}
diff --git 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderElection.java
 
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderElection.java
deleted file mode 100644
index d027015..0000000
--- 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderElection.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.StatefulService;
-import org.apache.camel.impl.engine.JavaUuidGenerator;
-import org.apache.camel.spi.UuidGenerator;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
-import org.apache.curator.framework.recipes.locks.Lease;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <code>CuratorMultiMasterLeaderElection</code> uses the leader election 
capabilities of a
- * ZooKeeper cluster to control which nodes are enabled. It is typically used 
in
- * fail-over scenarios controlling identical instances of an application across
- * a cluster of Camel based servers. <p> The election is configured providing 
the number of instances that are required
- * to be active..
- * <p> All instances of the election must also be configured with the same 
path on the ZooKeeper
- * cluster where the election will be carried out. It is good practice for this
- * to indicate the application e.g. <tt>/someapplication/someroute/</tt> note
- * that these nodes should exist before using the election. <p> See <a
- * 
href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection";>
- * for more on how Leader election</a> is archived with ZooKeeper.
- */
-public class CuratorMultiMasterLeaderElection implements 
ConnectionStateListener {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(CuratorMultiMasterLeaderElection.class);
-
-    private final String candidateName;
-    private final List<ElectionWatcher> watchers = new ArrayList<>();
-    private final int desiredActiveNodes;
-    private AtomicBoolean activeNode = new AtomicBoolean(false);
-    private UuidGenerator uuidGenerator = new JavaUuidGenerator();
-    private InterProcessSemaphoreV2 leaderSelector;
-    private CuratorFramework client;
-    private Lease lease;
-
-    public CuratorMultiMasterLeaderElection(String uri, int 
desiredActiveNodes) {
-        this.candidateName = createCandidateName();
-        this.desiredActiveNodes = desiredActiveNodes;
-
-        String connectionString = uri.substring(1 + 
uri.indexOf(':')).split("/")[0];
-        String protocol = uri.substring(0, uri.indexOf(':'));
-        String path = uri.replace(protocol + ":" + connectionString, "");
-        client = CuratorFrameworkFactory.newClient(connectionString, new 
ExponentialBackoffRetry(1000, 3));
-        client.getConnectionStateListenable().addListener(this);
-        leaderSelector = new InterProcessSemaphoreV2(client, path, 
this.desiredActiveNodes);
-        client.start();
-
-
-    }
-
-    // stolen from org/apache/camel/processor/CamelInternalProcessor
-    public static boolean isCamelStopping(CamelContext context) {
-        if (context instanceof StatefulService) {
-            StatefulService ss = (StatefulService) context;
-            return ss.isStopping() || ss.isStopped();
-        }
-        return false;
-    }
-
-    public void shutdownClients() {
-        try {
-            leaderSelector.returnLease(lease);
-        } finally {
-            client.close();
-        }
-    }
-
-    /*
-     * Blocking method
-     */
-    public void requestResource() {
-        LOG.info("Requested to become active from {}", candidateName);
-        try {
-            lease = leaderSelector.acquire();
-        } catch (Exception e) {
-            throw new RuntimeException("Unable to obtain access to become a 
leader node.");
-        }
-        LOG.info("{} is now active", candidateName);
-        activeNode.set(true);
-        notifyElectionWatchers();
-    }
-
-    public boolean isMaster() {
-        return activeNode.get();
-    }
-
-    private String createCandidateName() {
-        StringBuilder builder = new StringBuilder();
-        try {
-            /* UUID would be enough, also using hostname for human readability 
*/
-            builder.append(InetAddress.getLocalHost().getCanonicalHostName());
-        } catch (UnknownHostException ex) {
-            LOG.warn("Failed to get the local hostname.", ex);
-            builder.append("unknown-host");
-        }
-        builder.append("-").append(uuidGenerator.generateUuid());
-        return builder.toString();
-    }
-
-    public String getCandidateName() {
-        return candidateName;
-    }
-
-    private void notifyElectionWatchers() {
-        for (ElectionWatcher watcher : watchers) {
-            try {
-                watcher.electionResultChanged();
-            } catch (Exception e) {
-                LOG.warn("Election watcher " + watcher + " of type " + 
watcher.getClass() + " threw an exception.", e);
-            }
-        }
-    }
-
-    public boolean addElectionWatcher(ElectionWatcher e) {
-        return watchers.add(e);
-    }
-
-    @Override
-    public void stateChanged(CuratorFramework curatorFramework, 
ConnectionState connectionState) {
-        switch (connectionState) {
-        case SUSPENDED:
-        case LOST:
-            LOG.info("Received {} state from connection. Giving up lock.", 
connectionState);
-
-            try {
-                leaderSelector.returnLease(lease);
-            } finally {
-                this.activeNode.set(false);
-                notifyElectionWatchers();
-            }
-
-            break;
-        default:
-            LOG.info("Connection state changed: {}", connectionState);
-            requestResource();
-
-        }
-    }
-
-}
-
-
diff --git 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderRoutePolicy.java
 
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderRoutePolicy.java
deleted file mode 100644
index cfe952c..0000000
--- 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderRoutePolicy.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.NonManagedService;
-import org.apache.camel.Route;
-import org.apache.camel.support.RoutePolicySupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-
-/**
- * <code>CuratorMultiMasterLeaderRoutePolicy</code> uses Apache Curator 
InterProcessSemaphoreV2 receipe to implement the behavior of having
- * at multiple active instances of  a route, controlled by a specific policy, 
running. It is typically used in
- * fail-over scenarios controlling identical instances of a route across a 
cluster of Camel based servers.
- * <p>
- * The policy affects the normal startup lifecycle of CamelContext and Routes, 
automatically set autoStart property of
- * routes controlled by this policy to false.
- * After Curator receipe identifies the current Policy instance as the Leader 
between a set of clients that are
- * competing for the role, it will start the route, and only at that moment 
the route will start its business.
- * This specific behavior is designed to avoid scenarios where such a policy 
would kick in only after a route had
- * already been started, with the risk, for consumers for example, that some 
source event might have already been
- * consumed.
- * <p>
- * All instances of the policy must also be configured with the same path on 
the
- * ZooKeeper cluster where the election will be carried out. It is good 
practice
- * for this to indicate the application e.g. 
<tt>/someapplication/someroute/</tt> note
- * that these nodes should exist before using the policy.
- * <p>
- * See <a 
href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection";>
- *     for more on how Leader election</a> is archived with ZooKeeper.
- */
-public class CuratorMultiMasterLeaderRoutePolicy extends RoutePolicySupport 
implements ElectionWatcher, NonManagedService {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(CuratorMultiMasterLeaderRoutePolicy.class);
-    private final String uri;
-    private final Lock lock = new ReentrantLock();
-    private final Set<Route> suspendedRoutes = new CopyOnWriteArraySet<>();
-    private final AtomicBoolean shouldProcessExchanges = new AtomicBoolean();
-    private volatile boolean shouldStopRoute = true;
-    private final int enabledCount;
-
-
-    private final Lock electionLock = new ReentrantLock();
-
-    private CuratorMultiMasterLeaderElection election;
-
-    public CuratorMultiMasterLeaderRoutePolicy(String uri, int enabledCount) {
-        this.uri = uri;
-        this.enabledCount = enabledCount;
-    }
-    public CuratorMultiMasterLeaderRoutePolicy(String uri) {
-        this(uri, 1);
-    }
-
-    @Override
-    public void onInit(Route route) {
-        ensureElectionIsCreated();
-        LOG.info("Route managed by {}. Setting route [{}] AutoStartup flag to 
false.", this.getClass(), route.getId());
-        route.getRouteContext().setAutoStartup(false);
-
-
-        if (election.isMaster()) {
-            if (shouldStopRoute) {
-                startManagedRoute(route);
-            }
-        } else {
-            if (shouldStopRoute) {
-                stopManagedRoute(route);
-            }
-        }
-
-    }
-
-    private void ensureElectionIsCreated() {
-        if (election == null) {
-            electionLock.lock();
-            try {
-                if (election == null) { // re-test
-                    election = new CuratorMultiMasterLeaderElection(uri, 
enabledCount);
-                    election.addElectionWatcher(this);
-
-                }
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            } finally {
-                electionLock.unlock();
-            }
-        }
-    }
-
-    private void startManagedRoute(Route route) {
-        try {
-            lock.lock();
-            if (suspendedRoutes.contains(route)) {
-                startRoute(route);
-                suspendedRoutes.remove(route);
-            }
-        } catch (Exception e) {
-            handleException(e);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    private void stopManagedRoute(Route route) {
-        try {
-            lock.lock();
-            // check that we should still suspend once the lock is acquired
-            if (!suspendedRoutes.contains(route) && 
!shouldProcessExchanges.get()) {
-                stopRoute(route);
-                suspendedRoutes.add(route);
-            }
-        } catch (Exception e) {
-            handleException(e);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    @Override
-    public void electionResultChanged() {
-        if (election.isMaster()) {
-            startAllStoppedRoutes();
-        }
-    }
-
-    private void startAllStoppedRoutes() {
-        try {
-            lock.lock();
-
-            if (!suspendedRoutes.isEmpty()) {
-                if (log.isDebugEnabled()) {
-                    log.info("{} route(s) have been stopped previously by 
policy, restarting.", suspendedRoutes.size());
-                }
-                for (Route suspended : suspendedRoutes) {
-                    CamelContext ctx = suspended.getCamelContext();
-                    while (!ctx.isStarted()) {
-                        log.info("Context {} is not started yet. Sleeping for 
a bit.", ctx.getName());
-                        Thread.sleep(5000);
-                    }
-                    log.info("Starting route [{}] defined in context [{}].", 
suspended.getId(), ctx.getName());
-                    startRoute(suspended);
-                }
-                suspendedRoutes.clear();
-            }
-
-        } catch (Exception e) {
-            handleException(e);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    @Override
-    protected void doShutdown() throws Exception {
-        try {
-            electionLock.lock();
-            election.shutdownClients();
-            election = null;
-        } finally {
-            electionLock.unlock();
-        }
-    }
-
-    public CuratorMultiMasterLeaderElection getElection() {
-        return election;
-    }
-}
diff --git 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ElectionWatcher.java
 
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ElectionWatcher.java
deleted file mode 100644
index 62e199b..0000000
--- 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ElectionWatcher.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-public interface ElectionWatcher {
-
-    /**
-     * This method is called when there is a potential change to the master.
-     * Implementations should call "isMaster" on their ZookeeperElection
-     * instance to re-validate their status.
-     */
-    void electionResultChanged();
-}
diff --git 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperElection.java
 
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperElection.java
deleted file mode 100644
index b491f1c..0000000
--- 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperElection.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Processor;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.zookeeper.SequenceComparator;
-import org.apache.camel.component.zookeeper.ZooKeeperEndpoint;
-import org.apache.camel.component.zookeeper.ZooKeeperMessage;
-import org.apache.camel.impl.engine.JavaUuidGenerator;
-import org.apache.camel.spi.UuidGenerator;
-import org.apache.zookeeper.CreateMode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <code>ZooKeeperElection</code> uses the leader election capabilities of a
- * ZooKeeper cluster to control which nodes are enabled. It is typically used 
in
- * fail-over scenarios controlling identical instances of an application across
- * a cluster of Camel based servers. <p> The election is configured with a 'top
- * n' number of servers that should be marked as master, for a simple
- * master/slave scenario this would be 1. Each instance will execute the
- * election algorithm to obtain its position in the hierarchy of servers, if it
- * is within the 'top n' servers then the node is enabled and isMaster() will
- * return 'true'. If not it waits for a change in the leader hierarchy and then
- * reruns this scenario to see if it is now in the top n. <p> All instances of
- * the election must also be configured with the same path on the ZooKeeper
- * cluster where the election will be carried out. It is good practice for this
- * to indicate the application e.g. <tt>/someapplication/someroute/</tt> note
- * that these nodes should exist before using the election. <p> See <a
- * 
href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection";>
- * for more on how Leader election</a> is archived with ZooKeeper.
- */
-public class ZooKeeperElection {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperElection.class);
-    private final ProducerTemplate producerTemplate;
-    private final CamelContext camelContext;
-    private final String uri;
-    private final String candidateName;
-    private final Lock lock = new ReentrantLock();
-    private final CountDownLatch electionComplete = new CountDownLatch(1);
-    private AtomicBoolean masterNode = new AtomicBoolean();
-    private volatile boolean isCandidateCreated;
-    private int enabledCount = 1;
-    private UuidGenerator uuidGenerator = new JavaUuidGenerator();
-    private final List<ElectionWatcher> watchers = new ArrayList<>();
-
-    public ZooKeeperElection(CamelContext camelContext, String uri, int 
enabledCount) {
-        this(camelContext.createProducerTemplate(), camelContext, uri, 
enabledCount);
-    }
-
-    public ZooKeeperElection(ProducerTemplate producerTemplate, CamelContext 
camelContext, String uri, int enabledCount) {
-        this.camelContext = camelContext;
-        this.producerTemplate = producerTemplate;
-        this.uri = uri;
-        this.enabledCount = enabledCount;
-        this.candidateName = createCandidateName();
-    }
-
-    public boolean isMaster() {
-        if (!isCandidateCreated) {
-            testAndCreateCandidateNode();
-            awaitElectionResults();
-
-        }
-        return masterNode.get();
-    }
-
-    private String createCandidateName() {
-        StringBuilder builder = new StringBuilder();
-        try {
-            /* UUID would be enough, also using hostname for human readability 
*/
-            builder.append(InetAddress.getLocalHost().getCanonicalHostName());
-        } catch (UnknownHostException ex) {
-            LOG.warn("Failed to get the local hostname.", ex);
-            builder.append("unknown-host");
-        }
-        builder.append("-").append(uuidGenerator.generateUuid());
-        return builder.toString();
-    }
-
-    private void testAndCreateCandidateNode() {
-        try {
-            lock.lock();
-            if (!isCandidateCreated) {
-                createCandidateNode(camelContext);
-                isCandidateCreated = true;
-            }
-        } catch (Exception e) {
-            handleException(e);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    private void awaitElectionResults() {
-        while (electionComplete.getCount() > 0) {
-            try {
-                LOG.debug("Awaiting election results...");
-                electionComplete.await();
-            } catch (InterruptedException e1) {
-                // do nothing here
-            }
-        }
-    }
-
-    private ZooKeeperEndpoint createCandidateNode(CamelContext camelContext) {
-        LOG.info("Initializing ZookeeperElection with uri '{}'", uri);
-        ZooKeeperEndpoint zep = camelContext.getEndpoint(uri, 
ZooKeeperEndpoint.class);
-        zep.getConfiguration().setCreate(true);
-        String fullpath = createFullPathToCandidate(zep);
-        Exchange e = zep.createExchange();
-        e.setPattern(ExchangePattern.InOut);
-        e.getIn().setHeader(ZooKeeperMessage.ZOOKEEPER_NODE, fullpath);
-        e.getIn().setHeader(ZooKeeperMessage.ZOOKEEPER_CREATE_MODE, 
CreateMode.EPHEMERAL_SEQUENTIAL);
-        producerTemplate.send(zep, e);
-
-        if (e.isFailed()) {
-            LOG.warn("Error setting up election node {}", fullpath, 
e.getException());
-        } else {
-            LOG.info("Candidate node '{}' has been created", fullpath);
-            try {
-                camelContext.addRoutes(new ElectoralMonitorRoute(zep));
-            } catch (Exception ex) {
-                LOG.warn("Error configuring ZookeeperElection", ex);
-            }
-        }
-        return zep;
-
-    }
-
-    private String createFullPathToCandidate(ZooKeeperEndpoint zep) {
-        String fullpath = zep.getConfiguration().getPath();
-        if (!fullpath.endsWith("/")) {
-            fullpath += "/";
-        }
-        fullpath += candidateName;
-        return fullpath;
-    }
-
-    private void handleException(Exception e) {
-        throw new RuntimeException(e.getMessage(), e);
-    }
-
-    private void notifyElectionWatchers() {
-        for (ElectionWatcher watcher : watchers) {
-            try {
-                watcher.electionResultChanged();
-            } catch (Exception e) {
-                LOG.warn("Election watcher " + watcher + " of type " + 
watcher.getClass() + " threw an exception.", e);
-            }
-        }
-    }
-
-    public boolean addElectionWatcher(ElectionWatcher e) {
-        return watchers.add(e);
-    }
-
-    public boolean removeElectionWatcher(ElectionWatcher o) {
-        return watchers.remove(o);
-    }
-
-    private class ElectoralMonitorRoute extends RouteBuilder {
-
-        private SequenceComparator comparator = new SequenceComparator();
-        private ZooKeeperEndpoint zep;
-
-        ElectoralMonitorRoute(ZooKeeperEndpoint zep) {
-            this.zep = zep;
-            zep.getConfiguration().setListChildren(true);
-            zep.getConfiguration().setSendEmptyMessageOnDelete(true);
-            zep.getConfiguration().setRepeat(true);
-        }
-
-        @Override
-        public void configure() throws Exception {
-
-            /**
-             * TODO: this is cheap cheerful but suboptimal; it suffers from the
-             * 'herd effect' that on any change to the candidates list every
-             * policy instance will ask for the entire candidate list again.
-             * This is fine for small numbers of nodes (for scenarios like
-             * Master-Slave it is perfect) but could get noisy if large numbers
-             * of nodes were involved. <p> Better would be to find the position
-             * of this node in the list and watch the node in the position 
ahead
-             * node ahead of this and only request the candidate list when its
-             * status changes. This will require enhancing the consumer to 
allow
-             * custom operation lists.
-             */
-            from(zep).id("election-route-" + candidateName).sort(body(), 
comparator).process(new Processor() {
-                @Override
-                public void process(Exchange e) throws Exception {
-                    @SuppressWarnings("unchecked")
-                    List<String> candidates = 
e.getIn().getMandatoryBody(List.class);
-                    // we cannot use the binary search here and the candidates 
a not sorted in the normal way
-                    /**
-                     * check if the item at this location starts with this 
nodes
-                     * candidate name
-                     */
-                    int location = 
findCandidateLocationInCandidatesList(candidates, candidateName); 
-                    if (location != -1) {
-                        // set the nodes
-                        masterNode.set(location <= enabledCount);
-                        LOG.debug("This node is number '{}' on the candidate 
list, election is configured for the top '{}'. this node will be {}",
-                                new Object[]{location, enabledCount, 
masterNode.get() ? "enabled" : "disabled"}
-                        );
-                    }
-                    electionComplete.countDown();
-
-                    notifyElectionWatchers();
-                }
-
-                private int findCandidateLocationInCandidatesList(List<String> 
candidates, String candidateName) {
-                
-                    for (int location = 1; location <= candidates.size(); 
location++) {
-                        if (candidates.get(location - 
1).startsWith(candidateName)) {
-                            return location;
-                        }
-                    }
-                    return -1;
-                }
-            });
-        }
-    }
-}
diff --git 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java
 
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java
deleted file mode 100644
index bb6a760..0000000
--- 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.NonManagedService;
-import org.apache.camel.Route;
-import org.apache.camel.support.RoutePolicySupport;
-
-/**
- * <code>ZooKeeperRoutePolicy</code> uses the leader election capabilities of a
- * ZooKeeper cluster to control how routes are enabled. It is typically used in
- * fail-over scenarios controlling identical instances of a route across a
- * cluster of Camel based servers.
- * <p>
- * The policy is configured with a 'top n' number of routes that should be
- * allowed to start, for a master/slave scenario this would be 1. Each instance
- * of the policy will execute the election algorithm to obtain its position in
- * the hierarchy of servers, if it is within the 'top n' servers then the 
policy
- * is enabled and exchanges can be processed by the route. If not it waits for 
a
- * change in the leader hierarchy and then reruns this scenario to see if it is
- * now in the top n.
- * <p>
- * All instances of the policy must also be configured with the same path on 
the
- * ZooKeeper cluster where the election will be carried out. It is good 
practice
- * for this to indicate the application e.g. 
<tt>/someapplication/someroute/</tt> note
- * that these nodes should exist before using the policy.
- * <p>
- * See <a 
href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection";>
- *     for more on how Leader election</a> is archived with ZooKeeper.
- */
-public class ZooKeeperRoutePolicy extends RoutePolicySupport implements 
ElectionWatcher, NonManagedService {
-
-    private final String uri;
-    private final int enabledCount;
-    private final Lock lock = new ReentrantLock();
-    private final Set<Route> suspendedRoutes = new CopyOnWriteArraySet<>();
-    private final AtomicBoolean shouldProcessExchanges = new AtomicBoolean();
-    private volatile boolean shouldStopConsumer = true;
-
-    private final Lock electionLock = new ReentrantLock();
-    private ZooKeeperElection election;
-
-    public ZooKeeperRoutePolicy(String uri, int enabledCount) {
-        this.uri = uri;
-        this.enabledCount = enabledCount;
-    }
-
-    public ZooKeeperRoutePolicy(ZooKeeperElection election) {
-        this.election = election;
-        this.uri = null;
-        this.enabledCount = -1;
-    }
-
-    @Override
-    public void onExchangeBegin(Route route, Exchange exchange) {
-        ensureElectionIsCreated(route);
-
-        if (election.isMaster()) {
-            if (shouldStopConsumer) {
-                startConsumer(route);
-            }
-        } else {
-            if (shouldStopConsumer) {
-                stopConsumer(route);
-            }
-
-            IllegalStateException e = new IllegalStateException("Zookeeper 
based route policy prohibits processing exchanges, stopping route and failing 
the exchange");
-            exchange.setException(e);
-        }
-    }
-
-    private void ensureElectionIsCreated(Route route) {
-        if (election == null) {
-            electionLock.lock();
-            try {
-                if (election == null) { // re-test
-                    election = new ZooKeeperElection(route.getCamelContext(), 
uri, enabledCount);
-                    election.addElectionWatcher(this);
-                }
-            } finally {
-                electionLock.unlock();
-            }
-        }
-    }
-
-    private void startConsumer(Route route) {
-        try {
-            lock.lock();
-            if (suspendedRoutes.contains(route)) {
-                startConsumer(route.getConsumer());
-                suspendedRoutes.remove(route);
-            }
-        } catch (Exception e) {
-            handleException(e);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    private void stopConsumer(Route route) {
-        try {
-            lock.lock();
-            // check that we should still suspend once the lock is acquired
-            if (!suspendedRoutes.contains(route) && 
!shouldProcessExchanges.get()) {
-                stopConsumer(route.getConsumer());
-                suspendedRoutes.add(route);
-            }
-        } catch (Exception e) {
-            handleException(e);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    @Override
-    public void electionResultChanged() {
-        if (election.isMaster()) {
-            startAllStoppedConsumers();
-        }
-    }
-
-    private void startAllStoppedConsumers() {
-        try {
-            lock.lock();
-            if (!suspendedRoutes.isEmpty()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("{} have been stopped previously by policy, 
restarting.", suspendedRoutes.size());
-                }
-                for (Route suspended : suspendedRoutes) {
-                    startConsumer(suspended.getConsumer());
-                }
-                suspendedRoutes.clear();
-            }
-
-        } catch (Exception e) {
-            handleException(e);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public boolean isShouldStopConsumer() {
-        return shouldStopConsumer;
-    }
-
-    public void setShouldStopConsumer(boolean shouldStopConsumer) {
-        this.shouldStopConsumer = shouldStopConsumer;
-    }
-}
diff --git 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverCuratorLeaderRoutePolicyTest.java
 
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverCuratorLeaderRoutePolicyTest.java
deleted file mode 100644
index 28ab4ae..0000000
--- 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverCuratorLeaderRoutePolicyTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.hamcrest.CoreMatchers.is;
-
-public class FailoverCuratorLeaderRoutePolicyTest extends ZooKeeperTestSupport 
{
-    public static final String ZNODE = "/curatorleader";
-    public static final String BASE_ZNODE = "/someapp";
-    private static final Logger LOG = 
LoggerFactory.getLogger(FailoverCuratorLeaderRoutePolicyTest.class);
-
-
-    protected CamelContext createCamelContext() throws Exception {
-        disableJMX();
-        return super.createCamelContext();
-    }
-
-    @Test
-    public void masterSlaveScenarioContolledByPolicy() throws Exception {
-        ZookeeperPolicyEnforcedContext master = 
createEnforcedContext("master");
-        ZookeeperPolicyEnforcedContext slave = createEnforcedContext("slave");
-        Thread.sleep(5000);
-        // Send messages to the master and the slave.
-        // The route is enabled in the master and gets through, but that sent 
to
-        // the slave context is rejected.
-        master.sendMessageToEnforcedRoute("message for master", 1);
-        slave.sendMessageToEnforcedRoute("message for slave", 0);
-
-        // trigger failover by killing the master... then assert that the slave
-        // can now receive messages.
-        master.shutdown();
-        slave.sendMessageToEnforcedRoute("second message for slave", 1);
-        slave.shutdown();
-    }
-
-
-    @Test
-    public void ensureRoutesDoNotStartBeforeElection() throws Exception {
-        DefaultCamelContext context = new DefaultCamelContext();
-
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                CuratorLeaderRoutePolicy policy = new 
CuratorLeaderRoutePolicy("zookeeper:localhost:" + getServerPort() + BASE_ZNODE 
+ ZNODE + 2);
-                
from("timer://foo?fixedRate=true&period=5").routePolicy(policy).id("single_route").autoStartup(true).to("mock:controlled");
-            }
-        });
-        context.start();
-        // this check verifies that a route marked as autostartable is not 
started automatically. It will be the policy responsibility to eventually start 
it.
-        
assertThat(context.getRouteController().getRouteStatus("single_route").isStarted(),
 is(false));
-        
assertThat(context.getRouteController().getRouteStatus("single_route").isStarting(),
 is(false));
-
-        context.shutdown();
-    }
-
-    private static class ZookeeperPolicyEnforcedContext {
-        private CamelContext controlledContext;
-        private ProducerTemplate template;
-        private MockEndpoint mock;
-        private String routename;
-
-        ZookeeperPolicyEnforcedContext(String name) throws Exception {
-            controlledContext = new DefaultCamelContext();
-            routename = name;
-            template = controlledContext.createProducerTemplate();
-            mock = controlledContext.getEndpoint("mock:controlled", 
MockEndpoint.class);
-            controlledContext.addRoutes(new FailoverRoute(name));
-            controlledContext.start();
-        }
-
-        public void sendMessageToEnforcedRoute(String message, int expected) 
throws InterruptedException {
-            mock.expectedMessageCount(expected);
-            try {
-                template.sendBody("vm:" + routename, ExchangePattern.InOut, 
message);
-            } catch (Exception e) {
-                if (expected > 0) {
-                    LOG.error(e.getMessage(), e);
-                    fail("Expected messages...");
-                }
-            }
-            mock.await(2, TimeUnit.SECONDS);
-            mock.assertIsSatisfied(2000);
-        }
-
-        public void shutdown() throws Exception {
-            LoggerFactory.getLogger(getClass()).debug("stopping");
-            controlledContext.stop();
-            LoggerFactory.getLogger(getClass()).debug("stopped");
-        }
-    }
-
-    private ZookeeperPolicyEnforcedContext createEnforcedContext(String name) 
throws Exception, InterruptedException {
-        ZookeeperPolicyEnforcedContext context = new 
ZookeeperPolicyEnforcedContext(name);
-        delay(1000);
-        return context;
-    }
-
-    public static class FailoverRoute extends RouteBuilder {
-
-        private String routename;
-
-        public FailoverRoute(String routename) {
-            // need names as if we use the same direct ep name in two contexts
-            // in the same vm shutting down one context shuts the endpoint for
-            // both.
-            this.routename = routename;
-        }
-
-        public void configure() throws Exception {
-            CuratorLeaderRoutePolicy policy = new 
CuratorLeaderRoutePolicy("zookeeper:localhost:" + getServerPort() + BASE_ZNODE 
+ ZNODE);
-            from("vm:" + 
routename).routePolicy(policy).id(routename).to("mock:controlled");
-        }
-    }
-}
diff --git 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverRoutePolicyTest.java
 
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverRoutePolicyTest.java
deleted file mode 100644
index 14e9d8a..0000000
--- 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverRoutePolicyTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FailoverRoutePolicyTest extends ZooKeeperTestSupport {
-    private static final Logger LOG = 
LoggerFactory.getLogger(FailoverRoutePolicyTest.class);
-
-    protected CamelContext createCamelContext() throws Exception {
-        disableJMX();
-        // set up the parent nodes used to control the election
-        client.createPersistent("/someapp", "App node to contain policy 
election nodes...");
-        client.createPersistent("/someapp/somepolicy", "Node used by route 
policy to control active routes...");
-        return super.createCamelContext();
-    }
-
-    @Test
-    public void masterSlaveScenarioContolledByPolicy() throws Exception {
-        ZookeeperPolicyEnforcedContext tetrisisMasterOfBlocks = 
createEnforcedContext("master");
-        ZookeeperPolicyEnforcedContext slave = createEnforcedContext("slave");
-
-        // http://bit.ly/9gTlGe ;). Send messages to the master and the slave.
-        // The route is enabled in the master and gets through, but that sent 
to
-        // the slave context is rejected.
-        
tetrisisMasterOfBlocks.sendMessageToEnforcedRoute("LIIIIIIIIIINNNNNNNNNEEEEEEE 
PEEEEEEICCCE", 1);
-        slave.sendMessageToEnforcedRoute("But lord there is no place for a 
square!??!", 0);
-
-        // trigger failover by killing the master... then assert that the slave
-        // can now receive messages.
-        tetrisisMasterOfBlocks.shutdown();
-        slave.sendMessageToEnforcedRoute("What a cruel and angry god...", 1);
-    }
-
-    private static class ZookeeperPolicyEnforcedContext {
-        private CamelContext controlledContext;
-        private ProducerTemplate template;
-        private MockEndpoint mock;
-        private String routename;
-
-        ZookeeperPolicyEnforcedContext(String name) throws Exception {
-            controlledContext = new DefaultCamelContext();
-            routename = name;
-            template = controlledContext.createProducerTemplate();
-            mock = controlledContext.getEndpoint("mock:controlled", 
MockEndpoint.class);
-            controlledContext.addRoutes(new FailoverRoute(name));
-            controlledContext.start();
-        }
-
-        public void sendMessageToEnforcedRoute(String message, int expected) 
throws InterruptedException {
-            mock.expectedMessageCount(expected);
-            try {
-                template.sendBody("vm:" + routename, ExchangePattern.InOut, 
message);
-            } catch (Exception e) {
-                if (expected > 0) {
-                    LOG.error(e.getMessage(), e);
-                    fail("Expected messages...");
-                }
-            }
-            mock.await(2, TimeUnit.SECONDS);
-            mock.assertIsSatisfied(1000);
-        }
-
-        public void shutdown() throws Exception {
-            LoggerFactory.getLogger(getClass()).debug("stopping");
-            controlledContext.stop();
-            LoggerFactory.getLogger(getClass()).debug("stopped");
-        }
-    }
-
-    private ZookeeperPolicyEnforcedContext createEnforcedContext(String name) 
throws Exception, InterruptedException {
-        ZookeeperPolicyEnforcedContext context = new 
ZookeeperPolicyEnforcedContext(name);
-        delay(1000);
-        return context;
-    }
-
-    public static class FailoverRoute extends RouteBuilder {
-
-        private String routename;
-
-        public FailoverRoute(String routename) {
-            // need names as if we use the same direct ep name in two contexts
-            // in the same vm shutting down one context shuts the endpoint for
-            // both.
-            this.routename = routename;
-        }
-
-        public void configure() throws Exception {
-            ZooKeeperRoutePolicy policy = new 
ZooKeeperRoutePolicy("zookeeper:localhost:" + getServerPort() + 
"/someapp/somepolicy", 1);
-            from("vm:" + 
routename).routePolicy(policy).id(routename).to("mock:controlled");
-        }
-    };
-}
diff --git 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/MultiMasterCuratorLeaderRoutePolicyTest.java
 
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/MultiMasterCuratorLeaderRoutePolicyTest.java
deleted file mode 100644
index 05afc3f..0000000
--- 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/MultiMasterCuratorLeaderRoutePolicyTest.java
+++ /dev/null
@@ -1,461 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.model.Model;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.notNullValue;
-
-public class MultiMasterCuratorLeaderRoutePolicyTest extends 
ZooKeeperTestSupport {
-    public static final String ZNODE = "/multimaster";
-    public static final String BASE_ZNODE = "/someapp";
-    private static final Logger LOG = 
LoggerFactory.getLogger(MultiMasterCuratorLeaderRoutePolicyTest.class);
-
-
-    protected CamelContext createCamelContext() throws Exception {
-        disableJMX();
-        return super.createCamelContext();
-    }
-
-
-    @Test
-    public void ensureRoutesDoNotStartAutomatically() throws Exception {
-        DefaultCamelContext context = new DefaultCamelContext();
-
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                CuratorMultiMasterLeaderRoutePolicy policy = new 
CuratorMultiMasterLeaderRoutePolicy("zookeeper:localhost:" + getServerPort() + 
BASE_ZNODE + ZNODE + 2);
-                
from("timer://foo?fixedRate=true&period=5").routePolicy(policy).id("single_route").autoStartup(true).to("mock:controlled");
-            }
-        });
-        context.start();
-        // this check verifies that a route marked as autostartable is not 
started automatically. It will be the policy responsibility to eventually start 
it.
-        
assertThat(context.getRouteController().getRouteStatus("single_route").isStarted(),
 is(false));
-        
assertThat(context.getRouteController().getRouteStatus("single_route").isStarting(),
 is(false));
-        try {
-            context.shutdown();
-        } catch (Exception e) {
-            //concurrency can raise some InterruptedException but we don't 
really care in this scenario.
-        }
-    }
-
-    @Test
-    public void oneMasterOneSlaveScenarioContolledByPolicy() throws Exception {
-        final String path = "oneMasterOneSlaveScenarioContolledByPolicy";
-        final String firstDestination = "first" + System.currentTimeMillis();
-        final String secondDestination = "second" + System.currentTimeMillis();
-        final CountDownLatch waitForSecondRouteCompletedLatch = new 
CountDownLatch(1);
-        final int activeNodesDesired = 1;
-
-        MultiMasterZookeeperPolicyEnforcedContext first = 
createEnforcedContext(firstDestination, activeNodesDesired, path);
-        DefaultCamelContext controlledContext = (DefaultCamelContext) 
first.controlledContext;
-        // get reference to the Policy object to check if it's already a master
-        CuratorMultiMasterLeaderRoutePolicy routePolicy = 
(CuratorMultiMasterLeaderRoutePolicy) 
controlledContext.getRouteDefinition(firstDestination).getRoutePolicies().get(0);
-
-        assertWeHaveMasters(routePolicy);
-
-        LOG.info("Starting first CamelContext");
-        final MultiMasterZookeeperPolicyEnforcedContext[] arr = new 
MultiMasterZookeeperPolicyEnforcedContext[1];
-
-        new Thread() {
-            @Override
-            public void run() {
-                MultiMasterZookeeperPolicyEnforcedContext second = null;
-                try {
-                    LOG.info("Starting second CamelContext in a separate 
thread");
-                    second = createEnforcedContext(secondDestination, 
activeNodesDesired, path);
-                    arr[0] = second;
-                    second.sendMessageToEnforcedRoute("message for second", 0);
-                    waitForSecondRouteCompletedLatch.countDown();
-                } catch (Exception e) {
-                    LOG.error("Error in the thread controlling the second 
context", e);
-                    fail("Error in the thread controlling the second context: 
" + e.getMessage());
-                }
-
-
-            }
-        }.start();
-
-        first.sendMessageToEnforcedRoute("message for first", 1);
-
-        waitForSecondRouteCompletedLatch.await(2, TimeUnit.MINUTES);
-        LOG.info("Explicitly shutting down the first camel context.");
-
-        LOG.info("Shutting down first con");
-        first.shutdown();
-
-        MultiMasterZookeeperPolicyEnforcedContext second = arr[0];
-
-        DefaultCamelContext secondCamelContext = (DefaultCamelContext) 
second.controlledContext;
-        
assertWeHaveMasters((CuratorMultiMasterLeaderRoutePolicy)secondCamelContext.getRouteDefinition(secondDestination).getRoutePolicies().get(0));
-
-        //second.mock = secondCamelContext.getEndpoint("mock:controlled", 
MockEndpoint.class);
-        second.sendMessageToEnforcedRoute("message for slave", 1);
-        second.shutdown();
-    }
-
-
-    @Test
-    public void oneMasterOneSlaveAndFlippedAgainScenarioContolledByPolicy() 
throws Exception {
-        final String path = "oneMasterOneSlaveScenarioContolledByPolicy";
-        final String firstDestination = "first" + System.currentTimeMillis();
-        final String secondDestination = "second" + System.currentTimeMillis();
-        final CountDownLatch waitForSecondRouteCompletedLatch = new 
CountDownLatch(1);
-        final int activeNodeDesired = 1;
-
-        MultiMasterZookeeperPolicyEnforcedContext first = 
createEnforcedContext(firstDestination, activeNodeDesired, path);
-        DefaultCamelContext controlledContext = (DefaultCamelContext) 
first.controlledContext;
-        // get reference to the Policy object to check if it's already a master
-        CuratorMultiMasterLeaderRoutePolicy routePolicy = 
(CuratorMultiMasterLeaderRoutePolicy) 
controlledContext.getRouteDefinition(firstDestination).getRoutePolicies().get(0);
-
-        assertWeHaveMasters(routePolicy);
-
-        LOG.info("Starting first CamelContext");
-        final MultiMasterZookeeperPolicyEnforcedContext[] arr = new 
MultiMasterZookeeperPolicyEnforcedContext[1];
-
-        new Thread() {
-            @Override
-            public void run() {
-                MultiMasterZookeeperPolicyEnforcedContext slave = null;
-                try {
-                    LOG.info("Starting second CamelContext in a separate 
thread");
-                    slave = createEnforcedContext(secondDestination, 
activeNodeDesired, path);
-                    arr[0] = slave;
-                    slave.sendMessageToEnforcedRoute("message for second", 0);
-                    waitForSecondRouteCompletedLatch.countDown();
-                } catch (Exception e) {
-                    LOG.error("Error in the thread controlling the second 
context", e);
-                    fail("Error in the thread controlling the second context: 
" + e.getMessage());
-                }
-
-
-            }
-        }.start();
-
-        first.sendMessageToEnforcedRoute("message for first", 1);
-
-        waitForSecondRouteCompletedLatch.await(2, TimeUnit.MINUTES);
-        MultiMasterZookeeperPolicyEnforcedContext second = arr[0];
-
-        LOG.info("Explicitly shutting down the first camel context.");
-        first.shutdown();
-
-        DefaultCamelContext secondCamelContext = (DefaultCamelContext) 
second.controlledContext;
-        
assertWeHaveMasters((CuratorMultiMasterLeaderRoutePolicy)secondCamelContext.getRouteDefinition(secondDestination).getRoutePolicies().get(0));
-
-        CountDownLatch restartFirstLatch = new CountDownLatch(1);
-        LOG.info("Start back first context");
-        new Thread() {
-            @Override
-            public void run() {
-                try {
-                    first.startup();
-                    restartFirstLatch.countDown();
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        }.start();
-        restartFirstLatch.await();
-        second.sendMessageToEnforcedRoute("message for second", 1);
-        first.mock.reset();
-        first.sendMessageToEnforcedRoute("message for first", 0);
-        second.shutdown();
-        controlledContext = (DefaultCamelContext) first.controlledContext;
-        // get reference to the Policy object to check if it's already a master
-        routePolicy = (CuratorMultiMasterLeaderRoutePolicy) 
controlledContext.getRouteDefinition(firstDestination).getRoutePolicies().get(0);
-        log.info("Asserting route is up. context: [{}]", 
controlledContext.getName());
-        assertWeHaveMasters(routePolicy);
-        first.controlledContext.setTracing(true);
-        first.mock = controlledContext.getEndpoint("mock:controlled", 
MockEndpoint.class);
-        first.sendMessageToEnforcedRoute("message for first", 1);
-        first.shutdown();
-    }
-
-
-
-
-    @Test
-    public void oneMasterTwoSlavesScenarioContolledByPolicy() throws Exception 
{
-        final String path = "oneMasterTwoSlavesScenarioContolledByPolicy";
-        final String master = "master" + System.currentTimeMillis();
-        final String secondDestination = "second" + System.currentTimeMillis();
-        final String thirdDestination = "third" + System.currentTimeMillis();
-        final CountDownLatch waitForNonActiveRoutesLatch = new 
CountDownLatch(2);
-        final int activeNodesDesired = 1;
-
-        LOG.info("Starting first CamelContext");
-        MultiMasterZookeeperPolicyEnforcedContext first = 
createEnforcedContext(master, activeNodesDesired, path);
-        DefaultCamelContext controlledContext = (DefaultCamelContext) 
first.controlledContext;
-        // get reference to the Policy object to check if it's already a master
-        CuratorMultiMasterLeaderRoutePolicy routePolicy = 
(CuratorMultiMasterLeaderRoutePolicy) 
controlledContext.getRouteDefinition(master).getRoutePolicies().get(0);
-
-        assertWeHaveMasters(routePolicy);
-
-        final MultiMasterZookeeperPolicyEnforcedContext[] arr = new 
MultiMasterZookeeperPolicyEnforcedContext[2];
-
-        new Thread() {
-            @Override
-            public void run() {
-                MultiMasterZookeeperPolicyEnforcedContext second = null;
-                try {
-                    LOG.info("Starting second CamelContext");
-                    second = createEnforcedContext(secondDestination, 
activeNodesDesired, path);
-                    arr[0] = second;
-                    second.sendMessageToEnforcedRoute("message for second", 0);
-                    waitForNonActiveRoutesLatch.countDown();
-                } catch (Exception e) {
-                    LOG.error("Error in the thread controlling the second 
context", e);
-                    fail("Error in the thread controlling the second context: 
" + e.getMessage());
-                }
-
-
-            }
-        }.start();
-
-        new Thread() {
-            @Override
-            public void run() {
-                MultiMasterZookeeperPolicyEnforcedContext third = null;
-                try {
-                    LOG.info("Starting third CamelContext");
-                    third = createEnforcedContext(thirdDestination, 
activeNodesDesired, path);
-                    arr[1] = third;
-                    third.sendMessageToEnforcedRoute("message for third", 0);
-                    waitForNonActiveRoutesLatch.countDown();
-                } catch (Exception e) {
-                    LOG.error("Error in the thread controlling the third 
context", e);
-                    fail("Error in the thread controlling the third context: " 
+ e.getMessage());
-                }
-
-
-            }
-        }.start();
-
-        // Send messages to the master and the slave.
-        // The route is enabled in the master and gets through, but that sent 
to
-        // the slave context is rejected.
-        first.sendMessageToEnforcedRoute("message for master", 1);
-
-        waitForNonActiveRoutesLatch.await();
-        LOG.info("Explicitly shutting down the first camel context.");
-        // trigger failover by killing the master..
-        first.shutdown();
-        // let's find out who's active now:
-
-        CuratorMultiMasterLeaderRoutePolicy routePolicySecond = 
(CuratorMultiMasterLeaderRoutePolicy) 
arr[0].controlledContext.getExtension(Model.class)
-                                                                
.getRouteDefinition(secondDestination).getRoutePolicies().get(0);
-        CuratorMultiMasterLeaderRoutePolicy routePolicyThird = 
(CuratorMultiMasterLeaderRoutePolicy) 
arr[1].controlledContext.getExtension(Model.class)
-                                                                
.getRouteDefinition(thirdDestination).getRoutePolicies().get(0);
-
-        MultiMasterZookeeperPolicyEnforcedContext newMaster = null;
-        MultiMasterZookeeperPolicyEnforcedContext slave = null;
-
-        final int maxWait = 20;
-        for (int i = 0; i < maxWait; i++) {
-            if (routePolicySecond.getElection().isMaster()) {
-                newMaster = arr[0];
-                slave = arr[1];
-                LOG.info("[second] is the new master");
-                break;
-            } else if (routePolicyThird.getElection().isMaster()) {
-                newMaster = arr[1];
-                slave = arr[0];
-                LOG.info("[third] is the new master");
-                break;
-            } else {
-                Thread.sleep(2000);
-                LOG.info("waiting for a new master to be elected");
-            }
-        }
-        assertThat(newMaster, is(notNullValue()));
-
-        newMaster.sendMessageToEnforcedRoute("message for second", 1);
-        slave.sendMessageToEnforcedRoute("message for third", 0);
-        slave.shutdown();
-        newMaster.shutdown();
-    }
-
-
-    @Test
-    public void twoMasterOneSlavesScenarioContolledByPolicy() throws Exception 
{
-        final String path = "twoMasterOneSlavesScenarioContolledByPolicy";
-        final String firstDestination = "first" + System.currentTimeMillis();
-        final String secondDestination = "second" + System.currentTimeMillis();
-        final String thirdDestination = "third" + System.currentTimeMillis();
-        final CountDownLatch waitForThirdRouteCompletedLatch = new 
CountDownLatch(1);
-        final int activeNodeDesired = 2;
-
-        MultiMasterZookeeperPolicyEnforcedContext first = 
createEnforcedContext(firstDestination, activeNodeDesired, path);
-        DefaultCamelContext firstControlledContext = (DefaultCamelContext) 
first.controlledContext;
-        CuratorMultiMasterLeaderRoutePolicy firstRoutePolicy = 
(CuratorMultiMasterLeaderRoutePolicy) 
firstControlledContext.getRouteDefinition(firstDestination).getRoutePolicies().get(0);
-
-        MultiMasterZookeeperPolicyEnforcedContext second = 
createEnforcedContext(secondDestination, activeNodeDesired, path);
-        DefaultCamelContext secondControlledContext = (DefaultCamelContext) 
second.controlledContext;
-        CuratorMultiMasterLeaderRoutePolicy secondRoutePolicy = 
(CuratorMultiMasterLeaderRoutePolicy) 
secondControlledContext.getRouteDefinition(secondDestination).getRoutePolicies().get(0);
-
-        assertWeHaveMasters(firstRoutePolicy, secondRoutePolicy);
-
-        final MultiMasterZookeeperPolicyEnforcedContext[] arr = new 
MultiMasterZookeeperPolicyEnforcedContext[1];
-
-
-        new Thread() {
-            @Override
-            public void run() {
-                MultiMasterZookeeperPolicyEnforcedContext third = null;
-                try {
-                    LOG.info("Starting third CamelContext");
-                    third = createEnforcedContext(thirdDestination, 
activeNodeDesired, path);
-                    arr[0] = third;
-                    third.sendMessageToEnforcedRoute("message for third", 0);
-                    waitForThirdRouteCompletedLatch.countDown();
-                } catch (Exception e) {
-                    LOG.error("Error in the thread controlling the third 
context", e);
-                    fail("Error in the thread controlling the third context: " 
+ e.getMessage());
-                }
-
-
-            }
-        }.start();
-
-        first.sendMessageToEnforcedRoute("message for first", 1);
-        second.sendMessageToEnforcedRoute("message for second", 1);
-
-
-        waitForThirdRouteCompletedLatch.await();
-
-        LOG.info("Explicitly shutting down the first camel context.");
-        first.shutdown();
-
-
-        arr[0].sendMessageToEnforcedRoute("message for third", 1);
-        second.shutdown();
-        arr[0].shutdown();
-    }
-
-    void assertWeHaveMasters(CuratorMultiMasterLeaderRoutePolicy... 
routePolicies) throws InterruptedException {
-        final int maxWait = 20;
-        boolean global = false;
-        for (int i = 0; i < maxWait; i++) {
-            boolean iteration = true;
-            for (CuratorMultiMasterLeaderRoutePolicy policy : routePolicies) {
-                log.info("Policy: {}, master: {}", policy, 
policy.getElection().isMaster());
-                iteration = iteration & policy.getElection().isMaster();
-            }
-            if (iteration) {
-                LOG.info("the number of required active routes is available");
-                global = true;
-                break;
-            } else {
-                Thread.sleep(2000);
-                LOG.info("waiting routes to become leader and be activated.");
-            }
-        }
-        if (!global) {
-            fail("The expected number of route never became master");
-        }
-    }
-
-
-    private class MultiMasterZookeeperPolicyEnforcedContext {
-        CamelContext controlledContext;
-        ProducerTemplate template;
-        MockEndpoint mock;
-        String routename;
-        String path;
-
-        MultiMasterZookeeperPolicyEnforcedContext(String name, int 
activeNodesDesired, String path) throws Exception {
-            controlledContext = new DefaultCamelContext();
-            routename = name;
-            this.path = path;
-            template = controlledContext.createProducerTemplate();
-            mock = controlledContext.getEndpoint("mock:controlled", 
MockEndpoint.class);
-            controlledContext.addRoutes(new FailoverRoute(name, 
activeNodesDesired, path));
-            controlledContext.start();
-        }
-
-        public void sendMessageToEnforcedRoute(String message, int expected) 
throws InterruptedException {
-            mock.expectedMessageCount(expected);
-            try {
-                LOG.info("Sending message to: {}", "vm:" + routename);
-                template.sendBody("vm:" + routename, ExchangePattern.InOut, 
message);
-            } catch (Exception e) {
-                if (expected > 0) {
-                    LOG.error(e.getMessage(), e);
-                    fail("Expected messages...");
-                }
-            }
-            mock.await(2, TimeUnit.SECONDS);
-            mock.assertIsSatisfied(2000);
-        }
-
-        public void shutdown() throws Exception {
-            LoggerFactory.getLogger(getClass()).debug("stopping");
-            controlledContext.stop();
-            LoggerFactory.getLogger(getClass()).debug("stopped");
-        }
-
-
-        public void startup() throws Exception {
-            LoggerFactory.getLogger(getClass()).debug("starting");
-            controlledContext.start();
-            LoggerFactory.getLogger(getClass()).debug("started");
-        }
-    }
-
-    private MultiMasterZookeeperPolicyEnforcedContext 
createEnforcedContext(String name, int activeNodesDesired, String path) throws 
Exception, InterruptedException {
-        MultiMasterZookeeperPolicyEnforcedContext context = new 
MultiMasterZookeeperPolicyEnforcedContext(name, activeNodesDesired, path);
-        delay(1000);
-        return context;
-    }
-
-    public class FailoverRoute extends RouteBuilder {
-
-        private String path;
-        private String routename;
-        private int activeNodesDesired;
-
-        public FailoverRoute(String routename, int activeNodesDesired, String 
path) {
-            // need names as if we use the same direct ep name in two contexts
-            // in the same vm shutting down one context shuts the endpoint for
-            // both.
-            this.routename = routename;
-            this.activeNodesDesired = activeNodesDesired;
-            this.path = path;
-        }
-
-        public void configure() throws Exception {
-            CuratorMultiMasterLeaderRoutePolicy policy = new 
CuratorMultiMasterLeaderRoutePolicy("zookeeper:localhost:" + getServerPort() + 
BASE_ZNODE + ZNODE + "/" + path, this.activeNodesDesired);
-            from("vm:" + 
routename).routePolicy(policy).id(routename).to("mock:controlled");
-        }
-    }
-}
diff --git 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperDoubleRouteAndDoublePolicyTest.java
 
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperDoubleRouteAndDoublePolicyTest.java
deleted file mode 100644
index a1ad16b..0000000
--- 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperDoubleRouteAndDoublePolicyTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
-import org.junit.Test;
-
-public class ZookeeperDoubleRouteAndDoublePolicyTest extends 
ZooKeeperTestSupport {
-
-    @Test
-    public void routeDoublePoliciesAndTwoRoutes() throws Exception {
-        // set up the parent used to control the election
-        client.createPersistent("/someapp", "App node to contain policy 
election nodes...");
-        client.createPersistent("/someapp/somepolicy", "Policy node used by 
route policy to control routes...");
-        client.createPersistent("/someapp/someotherpolicy", "Policy node used 
by route policy to control routes...");
-        context.addRoutes(new ZooKeeperPolicyEnforcedRoute());
-
-        MockEndpoint mockedpolicy = getMockEndpoint("mock:controlled");
-        mockedpolicy.setExpectedMessageCount(1);
-        sendBody("direct:policy-controlled", "This is a test");
-        mockedpolicy.await(5, TimeUnit.SECONDS);
-        mockedpolicy.assertIsSatisfied();
-        
-        MockEndpoint mockedpolicy1 = getMockEndpoint("mock:controlled-1");
-        mockedpolicy1.setExpectedMessageCount(1);
-        sendBody("direct:policy-controlled-1", "This is a test");
-        mockedpolicy1.await(5, TimeUnit.SECONDS);
-        mockedpolicy1.assertIsSatisfied();
-    }
-
-    public static class ZooKeeperPolicyEnforcedRoute extends RouteBuilder {
-        public void configure() throws Exception {
-            ZooKeeperRoutePolicy policy = new 
ZooKeeperRoutePolicy("zookeeper:localhost:" + getServerPort() + 
"/someapp/somepolicy", 1);
-            
from("direct:policy-controlled").routePolicy(policy).to("mock:controlled");
-            ZooKeeperRoutePolicy policy2 = new 
ZooKeeperRoutePolicy("zookeeper:localhost:" + getServerPort() + 
"/someapp/someotherpolicy", 1);
-            
from("direct:policy-controlled-1").routePolicy(policy2).to("mock:controlled-1");
-        }
-    };
-}
diff --git 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperElectionTest.java
 
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperElectionTest.java
deleted file mode 100644
index 7814c6f..0000000
--- 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperElectionTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ZookeeperElectionTest extends ZooKeeperTestSupport {
-    private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperElectionTest.class);
-
-    private static final String NODE_BASE_KEY = "/someapp";
-    private static final String NODE_PARTICULAR_KEY = "/someapp/somepolicy";
-
-    private CamelContext candidateOneContext;
-    private CamelContext candidateTwoContext;
-
-    @Before
-    public void before() throws Exception {
-        // set up the parent used to control the election
-        client.createPersistent(NODE_BASE_KEY, "App node to contain policy 
election nodes...");
-        client.createPersistent(NODE_PARTICULAR_KEY, "Policy node used by 
route policy to control routes...");
-    }
-
-    @After
-    public void after() throws Exception {
-        client.deleteAll(NODE_PARTICULAR_KEY);
-        client.delete(NODE_BASE_KEY);
-
-        if (candidateOneContext != null) {
-            candidateOneContext.stop();
-        }
-        if (candidateTwoContext != null) {
-            candidateTwoContext.stop();
-        }
-    }
-
-    private String getElectionUri() {
-        return "zookeeper:localhost:" + getServerPort() + 
"/someapp/somepolicy";
-    }
-
-    @Test
-    public void masterCanBeElected() throws Exception {
-        ZooKeeperElection candidate = new ZooKeeperElection(template, context, 
getElectionUri(), 1);
-        assertTrue("The only election candidate was not elected as master.", 
candidate.isMaster());
-    }
-
-    @Test
-    public void masterAndSlave() throws Exception {
-        candidateOneContext = createNewContext();
-        candidateTwoContext = createNewContext();
-
-        ZooKeeperElection electionCandidate1 = 
createElectionCandidate(candidateOneContext, 1);
-        assertTrue("The first candidate was not elected.", 
electionCandidate1.isMaster());
-        ZooKeeperElection electionCandidate2 = 
createElectionCandidate(candidateTwoContext, 1);
-        assertFalse("The second candidate should not have been elected.", 
electionCandidate2.isMaster());
-    }
-
-    @Test
-    public void testMasterGoesAway() throws Exception {
-        candidateOneContext = createNewContext();
-        candidateTwoContext = createNewContext();
-
-        ZooKeeperElection electionCandidate1 = 
createElectionCandidate(candidateOneContext, 1);
-        assertTrue("The first candidate was not elected.", 
electionCandidate1.isMaster());
-        ZooKeeperElection electionCandidate2 = 
createElectionCandidate(candidateTwoContext, 1);
-        assertFalse("The second candidate should not have been elected.", 
electionCandidate2.isMaster());
-
-        LOG.debug("About to shutdown the first candidate.");
-
-        candidateOneContext.stop(); // the first candidate was killed.
-        assertIsMaster(electionCandidate2);
-    }
-
-    @Test
-    public void testDualMaster() throws Exception {
-        candidateOneContext = createNewContext();
-        candidateTwoContext = createNewContext();
-
-        ZooKeeperElection electionCandidate1 = 
createElectionCandidate(candidateOneContext, 2);
-        assertTrue("The first candidate was not elected.", 
electionCandidate1.isMaster());
-        
-        ZooKeeperElection electionCandidate2 = 
createElectionCandidate(candidateTwoContext, 2);
-        assertIsMaster(electionCandidate2);
-    }
-
-    @Test
-    public void testWatchersAreNotified() throws Exception {
-        candidateOneContext = createNewContext();
-        candidateTwoContext = createNewContext();
-
-        final AtomicBoolean notified = new AtomicBoolean(false);
-        ElectionWatcher watcher = new ElectionWatcher() {
-            @Override public void electionResultChanged() {
-                notified.set(true);
-            }
-        };
-
-        ZooKeeperElection electionCandidate1 = 
createElectionCandidate(candidateOneContext, 2);
-        assertTrue("The first candidate was not elected.", 
electionCandidate1.isMaster());
-        electionCandidate1.addElectionWatcher(watcher);
-        ZooKeeperElection electionCandidate2 = 
createElectionCandidate(candidateTwoContext, 2);
-        electionCandidate2.isMaster();
-        assertTrue("The first candidate should have had it's watcher 
notified", notified.get());
-    }
-
-    private DefaultCamelContext createNewContext() throws Exception {
-        DefaultCamelContext controlledContext = new DefaultCamelContext();
-        controlledContext.start();
-        return controlledContext;
-    }
-
-    private ZooKeeperElection createElectionCandidate(final CamelContext 
context, int masterCount) {
-        return new ZooKeeperElection(context.createProducerTemplate(), 
context, getElectionUri(), masterCount);
-    }
-
-    private void assertIsMaster(ZooKeeperElection electionCandidate) throws 
InterruptedException {
-        // Need to wait for a while to be elected.
-        long timeout = System.currentTimeMillis() + 20000;
-        
-        while (!electionCandidate.isMaster() && timeout > 
System.currentTimeMillis()) {
-            Thread.sleep(200);
-        }
-        
-        assertTrue("The candidate should have been elected.", 
electionCandidate.isMaster());
-    }
-}
diff --git 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperRoutePolicyTest.java
 
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperRoutePolicyTest.java
deleted file mode 100644
index f75f152..0000000
--- 
a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperRoutePolicyTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper.policy;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
-import org.junit.Test;
-
-public class ZookeeperRoutePolicyTest extends ZooKeeperTestSupport {
-
-    @Test
-    public void routeCanBeControlledByPolicy() throws Exception {
-        // set up the parent used to control the election
-        client.createPersistent("/someapp", "App node to contain policy 
election nodes...");
-        client.createPersistent("/someapp/somepolicy", "Policy node used by 
route policy to control routes...");
-        context.addRoutes(new ZooKeeperPolicyEnforcedRoute());
-
-        MockEndpoint mock = getMockEndpoint("mock:controlled");
-        mock.setExpectedMessageCount(1);
-        sendBody("direct:policy-controlled", "This is a test");
-        mock.await(5, TimeUnit.SECONDS);
-        mock.assertIsSatisfied();
-    }
-
-    public static class ZooKeeperPolicyEnforcedRoute extends RouteBuilder {
-        public void configure() throws Exception {
-            ZooKeeperRoutePolicy policy = new 
ZooKeeperRoutePolicy("zookeeper:localhost:" + getServerPort() + 
"/someapp/somepolicy", 1);
-            
from("direct:policy-controlled").routePolicy(policy).to("mock:controlled");
-        }
-    };
-}

Reply via email to