This is an automated email from the ASF dual-hosted git repository.

dahn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudstack.git

commit 2654890e868a5a94412126be8aa920888e62cff4
Merge: 7abda3b9639 085bd3bda5f
Author: Daan Hoogland <d...@onecht.net>
AuthorDate: Sat Feb 1 21:20:08 2025 +0100

    Merge branch '4.20'

 .python-version                                    |   2 +-
 agent/conf/agent.properties                        |   7 +
 agent/src/main/java/com/cloud/agent/Agent.java     | 823 +++++++++++----------
 .../src/main/java/com/cloud/agent/AgentShell.java  |  57 +-
 .../src/main/java/com/cloud/agent/IAgentShell.java |   2 +
 .../cloud/agent/properties/AgentProperties.java    |   7 +
 .../test/java/com/cloud/agent/AgentShellTest.java  |   7 +
 agent/src/test/java/com/cloud/agent/AgentTest.java | 257 +++++++
 .../org/apache/cloudstack/acl/RoleService.java     |   5 +
 .../api/command/admin/domain/ListDomainsCmd.java   |   7 +-
 .../api/command/user/account/ListAccountsCmd.java  |   5 +-
 .../user/firewall/CreateFirewallRuleCmd.java       |   7 +-
 .../OutOfBandManagementService.java                |   2 +-
 .../command/admin/domain/ListDomainsCmdTest.java   |  13 +-
 .../command/user/account/ListAccountsCmdTest.java  |  15 +-
 .../user/firewall/CreateFirewallRuleCmdTest.java   |  91 +++
 .../java/com/cloud/resource/ServerResource.java    |   8 +
 .../java/com/cloud/vm/VirtualMachineManager.java   |  22 +-
 .../service/NetworkOrchestrationService.java       |   3 +
 .../java/com/cloud/capacity/CapacityManager.java   |  13 +-
 .../java/com/cloud/resource/ResourceManager.java   |   4 +-
 .../java/com/cloud/storage/StorageManager.java     |   9 +
 .../com/cloud/agent/manager/AgentManagerImpl.java  | 302 +++++---
 .../agent/manager/ClusteredAgentManagerImpl.java   | 143 ++--
 .../com/cloud/vm/VirtualMachineManagerImpl.java    | 220 +++---
 .../cloud/vm/VirtualMachinePowerStateSyncImpl.java | 220 +++---
 .../engine/orchestration/NetworkOrchestrator.java  |   2 +-
 .../java/com/cloud/capacity/dao/CapacityDao.java   |   2 +
 .../com/cloud/capacity/dao/CapacityDaoImpl.java    |  12 +
 .../main/java/com/cloud/dc/ClusterDetailsDao.java  |   3 +
 .../java/com/cloud/dc/ClusterDetailsDaoImpl.java   |  20 +
 .../src/main/java/com/cloud/dc/dao/ClusterDao.java |  16 +-
 .../main/java/com/cloud/dc/dao/ClusterDaoImpl.java |  71 +-
 .../cloud/dc/dao/DataCenterIpAddressDaoImpl.java   |   3 +-
 .../com/cloud/dc/dao/DataCenterVnetDaoImpl.java    |   2 +-
 .../src/main/java/com/cloud/host/dao/HostDao.java  |  46 +-
 .../main/java/com/cloud/host/dao/HostDaoImpl.java  | 497 ++++++++-----
 .../com/cloud/network/dao/IPAddressDaoImpl.java    |   2 +-
 .../java/com/cloud/network/dao/NetworkDaoImpl.java |   3 +-
 .../cloud/secstorage/CommandExecLogDaoImpl.java    |   3 +-
 .../com/cloud/service/dao/ServiceOfferingDao.java  |   4 +-
 .../cloud/service/dao/ServiceOfferingDaoImpl.java  |  11 +-
 .../com/cloud/storage/dao/StoragePoolHostDao.java  |   2 +-
 .../cloud/storage/dao/StoragePoolHostDaoImpl.java  |  24 +-
 .../java/com/cloud/storage/dao/VMTemplateDao.java  |   5 +-
 .../com/cloud/storage/dao/VMTemplateDaoImpl.java   |  60 +-
 .../java/com/cloud/storage/dao/VolumeDaoImpl.java  |   8 -
 .../upgrade/SystemVmTemplateRegistration.java      |   2 +-
 .../cloud/upgrade/dao/DatabaseAccessObject.java    |  11 +
 .../java/com/cloud/upgrade/dao/DbUpgradeUtils.java |   6 +
 .../com/cloud/upgrade/dao/Upgrade42000to42010.java |  39 +
 .../java/com/cloud/vm/dao/ConsoleProxyDao.java     |   2 +-
 .../java/com/cloud/vm/dao/ConsoleProxyDaoImpl.java |  32 +-
 .../java/com/cloud/vm/dao/NicIpAliasDaoImpl.java   |   3 +-
 .../main/java/com/cloud/vm/dao/VMInstanceDao.java  |  21 +-
 .../java/com/cloud/vm/dao/VMInstanceDaoImpl.java   | 229 +++++-
 .../resourcedetail/ResourceDetailsDao.java         |   2 +
 .../resourcedetail/ResourceDetailsDaoBase.java     |  15 +
 .../datastore/db/PrimaryDataStoreDaoImpl.java      |   6 +-
 .../db/views/cloud.network_offering_view.sql       |   8 +-
 .../cloud/capacity/dao/CapacityDaoImplTest.java    |  99 +++
 .../java/com/cloud/dc/dao/ClusterDaoImplTest.java  |  78 ++
 .../java/com/cloud/host/dao/HostDaoImplTest.java   | 184 +++++
 .../cloud/usage/dao/UsageStorageDaoImplTest.java   |   7 +-
 .../resourcedetail/ResourceDetailsDaoBaseTest.java | 181 +++++
 .../datastore/db/PrimaryDataStoreDaoImplTest.java  |  39 +-
 .../agent/lb/IndirectAgentLBAlgorithm.java         |   4 +
 .../dao/ManagementServerHostPeerDaoImpl.java       |   3 +-
 .../framework/config/impl/ConfigDepotImpl.java     |  14 +-
 .../main/java/com/cloud/utils/db/GenericDao.java   |   5 +
 .../java/com/cloud/utils/db/GenericDaoBase.java    |  36 +
 .../framework/jobs/dao/VmWorkJobDao.java           |   1 +
 .../framework/jobs/dao/VmWorkJobDaoImpl.java       |  16 +
 .../framework/jobs/dao/VmWorkJobDaoImplTest.java   |  94 ++-
 .../acl/DynamicRoleBasedAPIAccessChecker.java      |  70 +-
 .../affinity/ExplicitDedicationProcessor.java      |  18 +-
 .../dedicated/DedicatedResourceManagerImpl.java    |  61 +-
 .../cloud/deploy/ImplicitDedicationPlanner.java    |  50 +-
 .../implicitplanner/ImplicitPlannerTest.java       |  38 +-
 .../com/cloud/resource/AgentRoutingResource.java   |   4 +-
 .../xenserver/discoverer/XcpServerDiscoverer.java  |   9 +-
 .../cloudstack/metrics/PrometheusExporterImpl.java |   4 +-
 .../apache/cloudstack/metrics/MetricsService.java  |   8 +-
 .../cloudstack/metrics/MetricsServiceImpl.java     | 158 ++--
 .../CloudStackPrimaryDataStoreLifeCycleImpl.java   | 152 ++--
 ...loudStackPrimaryDataStoreLifeCycleImplTest.java |  77 +-
 .../ScaleIOPrimaryDataStoreLifeCycle.java          |  59 +-
 .../ScaleIOPrimaryDataStoreLifeCycleTest.java      |  32 +-
 .../storage/datastore/util/StorPoolHelper.java     |  14 +-
 .../java/com/cloud/alert/AlertManagerImpl.java     |  96 ++-
 .../java/com/cloud/api/query/QueryManagerImpl.java |   6 +-
 .../com/cloud/api/query/dao/UserVmJoinDao.java     |   4 +-
 .../com/cloud/api/query/dao/UserVmJoinDaoImpl.java |   4 +-
 .../com/cloud/capacity/CapacityManagerImpl.java    | 185 +++--
 .../configuration/ConfigurationManagerImpl.java    |  12 +-
 .../consoleproxy/ConsoleProxyManagerImpl.java      |  12 +-
 .../deploy/DeploymentPlanningManagerImpl.java      |  58 +-
 .../kvm/discoverer/LibvirtServerDiscoverer.java    |  66 +-
 .../java/com/cloud/network/NetworkServiceImpl.java |  24 +-
 .../com/cloud/network/as/AutoScaleManagerImpl.java |   2 +-
 .../network/security/SecurityGroupListener.java    |   2 +-
 .../java/com/cloud/network/vpc/VpcManagerImpl.java |   4 +-
 .../com/cloud/resource/ResourceManagerImpl.java    |  85 +--
 .../resource/RollingMaintenanceManagerImpl.java    |   2 +-
 .../resourcelimit/ResourceLimitManagerImpl.java    |  11 +-
 .../com/cloud/server/ManagementServerImpl.java     |  24 +-
 .../main/java/com/cloud/server/StatsCollector.java | 100 +--
 .../java/com/cloud/storage/StorageManagerImpl.java |  84 ++-
 .../cloud/storage/download/DownloadListener.java   |  42 +-
 .../main/java/com/cloud/vm/UserVmManagerImpl.java  |   2 +-
 .../org/apache/cloudstack/acl/RoleManagerImpl.java |   2 +-
 .../agent/lb/IndirectAgentLBServiceImpl.java       |  94 ++-
 .../IndirectAgentLBRoundRobinAlgorithm.java        |   5 +
 .../OutOfBandManagementServiceImpl.java            |  20 +-
 .../java/com/cloud/alert/AlertManagerImplTest.java |  71 +-
 .../cloud/capacity/CapacityManagerImplTest.java    | 182 +++++
 .../configuration/ConfigurationManagerTest.java    |  20 +-
 .../deploy/DeploymentPlanningManagerImplTest.java  |   4 +-
 .../com/cloud/network/Ipv6ServiceImplTest.java     |  62 +-
 .../com/cloud/network/NetworkServiceImplTest.java  |  26 +-
 .../cloud/network/as/AutoScaleManagerImplTest.java | 149 ++--
 .../cloud/resource/MockResourceManagerImpl.java    |   4 +-
 .../ResourceLimitManagerImplTest.java              |  14 +-
 .../java/com/cloud/user/DomainManagerImplTest.java |  47 +-
 .../java/com/cloud/user/MockUsageEventDao.java     |   6 +
 .../java/com/cloud/vm/FirstFitPlannerTest.java     |   2 -
 .../agent/lb/IndirectAgentLBServiceImplTest.java   |  55 +-
 .../networkoffering/CreateNetworkOfferingTest.java |  49 +-
 .../SecondaryStorageManagerImpl.java               |  16 +-
 setup/db/create-schema-simulator.sql               |   3 +-
 test/integration/smoke/test_dynamicroles.py        |  15 +-
 ui/src/config/section/infra/hosts.js               |   2 +-
 ui/src/views/AutogenView.vue                       |   2 +-
 ui/src/views/dashboard/CapacityDashboard.vue       |  22 +-
 ui/src/views/network/FirewallRules.vue             |   3 +
 .../utils/backoff/impl/ConstantTimeBackoff.java    |   1 -
 .../java/com/cloud/utils/nio/HandlerFactory.java   |  11 +-
 utils/src/main/java/com/cloud/utils/nio/Link.java  |  16 +-
 .../main/java/com/cloud/utils/nio/NioClient.java   |  68 +-
 .../java/com/cloud/utils/nio/NioConnection.java    | 186 ++---
 .../main/java/com/cloud/utils/nio/NioServer.java   |  42 +-
 .../apache/cloudstack/utils/cache/LazyCache.java   |  33 +-
 .../apache/cloudstack/utils/cache/SingleCache.java |  33 +-
 .../java/com/cloud/utils/testcase/NioTest.java     |  45 +-
 .../cloudstack/utils/cache/LazyCacheTest.java      | 115 +++
 145 files changed, 4773 insertions(+), 2415 deletions(-)

diff --cc agent/src/main/java/com/cloud/agent/Agent.java
index 97803477115,2e7b61fbd51..0a76bfbb4f8
--- a/agent/src/main/java/com/cloud/agent/Agent.java
+++ b/agent/src/main/java/com/cloud/agent/Agent.java
@@@ -58,10 -56,11 +58,10 @@@ import org.apache.cloudstack.managed.co
  import org.apache.cloudstack.utils.security.KeyStoreUtils;
  import org.apache.commons.collections.CollectionUtils;
  import org.apache.commons.io.FileUtils;
- import org.apache.commons.lang.ObjectUtils;
- import org.apache.commons.lang3.StringUtils;
- import org.apache.logging.log4j.Logger;
+ import org.apache.commons.lang3.ObjectUtils;
 -import org.apache.commons.lang3.StringUtils;
  import org.apache.logging.log4j.LogManager;
+ import org.apache.logging.log4j.Logger;
+ import org.apache.logging.log4j.ThreadContext;
  
  import com.cloud.agent.api.AgentControlAnswer;
  import com.cloud.agent.api.AgentControlCommand;
@@@ -70,6 -69,7 +70,9 @@@ import com.cloud.agent.api.Command
  import com.cloud.agent.api.CronCommand;
  import com.cloud.agent.api.MaintainAnswer;
  import com.cloud.agent.api.MaintainCommand;
++import com.cloud.agent.api.MigrateAgentConnectionAnswer;
++import com.cloud.agent.api.MigrateAgentConnectionCommand;
+ import com.cloud.agent.api.PingAnswer;
  import com.cloud.agent.api.PingCommand;
  import com.cloud.agent.api.ReadyCommand;
  import com.cloud.agent.api.ShutdownCommand;
@@@ -79,9 -79,11 +82,12 @@@ import com.cloud.agent.transport.Reques
  import com.cloud.agent.transport.Response;
  import com.cloud.exception.AgentControlChannelException;
  import com.cloud.host.Host;
+ import com.cloud.resource.AgentStatusUpdater;
+ import com.cloud.resource.ResourceStatusUpdater;
  import com.cloud.resource.ServerResource;
+ import com.cloud.utils.NumbersUtil;
  import com.cloud.utils.PropertiesUtil;
- import com.cloud.utils.backoff.BackoffAlgorithm;
++import com.cloud.utils.StringUtils;
  import com.cloud.utils.concurrency.NamedThreadFactory;
  import com.cloud.utils.exception.CloudRuntimeException;
  import com.cloud.utils.exception.NioConnectionException;
@@@ -314,8 -338,9 +342,8 @@@ public class Agent implements HandlerFa
                  logger.info("Attempted to connect to the server, but received 
an unexpected exception, trying again...", e);
              }
          }
-         _shell.updateConnectedHost();
+         shell.updateConnectedHost();
          scavengeOldAgentObjects();
 -
      }
  
      public void stop(final String reason, final String detail) {
@@@ -479,21 -539,16 +542,21 @@@
      }
  
      public void sendStartup(final Link link) {
 +        sendStartup(link, false);
 +    }
 +
 +    public void sendStartup(final Link link, boolean transfer) {
-         final StartupCommand[] startup = _resource.initialize();
+         final StartupCommand[] startup = serverResource.initialize();
          if (startup != null) {
-             final String msHostList = _shell.getPersistentProperty(null, 
"host");
+             final String msHostList = shell.getPersistentProperty(null, 
"host");
              final Command[] commands = new Command[startup.length];
              for (int i = 0; i < startup.length; i++) {
                  setupStartupCommand(startup[i]);
                  startup[i].setMSHostList(msHostList);
 +                startup[i].setConnectionTransferred(transfer);
                  commands[i] = startup[i];
              }
-             final Request request = new Request(_id != null ? _id : -1, -1, 
commands, false, false);
+             final Request request = new Request(id != null ? id : -1, -1, 
commands, false, false);
              request.setSequence(getNextSequence());
  
              logger.debug("Sending Startup: {}", request.toString());
@@@ -547,84 -608,63 +616,71 @@@
          return new ServerHandler(type, link, data);
      }
  
 -    protected void closeAndTerminateLink(final Link link) {
 -        if (link == null) {
 -            return;
 -        }
 -        link.close();
 -        link.terminated();
 +    protected void reconnect(final Link link) {
 +        reconnect(link, null, null, false);
      }
  
 -    protected void stopAndCleanupConnection(boolean waitForStop) {
 -        if (connection == null) {
 -            return;
 -        }
 -        connection.stop();
 -        try {
 -            connection.cleanUp();
 -        } catch (final IOException e) {
 -            logger.warn("Fail to clean up old connection. {}", e);
 -        }
 -        if (!waitForStop) {
 +    protected void reconnect(final Link link, String preferredHost, 
List<String> avoidHostList, boolean forTransfer) {
-         if (!(forTransfer || _reconnectAllowed)) {
++        if (!(forTransfer || reconnectAllowed)) {
              return;
          }
 -        do {
 -            shell.getBackoffAlgorithm().waitBeforeRetry();
 -        } while (connection.isStartup());
 -    }
  
-         synchronized (this) {
-             if (_startup != null) {
-                 _startup.cancel();
-                 _startup = null;
-             }
-         }
- 
-         if (link != null) {
-             link.close();
-             link.terminated();
 -    protected void reconnect(final Link link) {
+         if (!reconnectAllowed) {
+             logger.debug("Reconnect requested but it is not allowed {}", () 
-> getLinkLog(link));
+             return;
          }
- 
+         cancelStartupTask();
+         closeAndTerminateLink(link);
+         closeAndTerminateLink(this.link);
          setLink(null);
          cancelTasks();
+         serverResource.disconnected();
+         logger.info("Lost connection to host: {}. Attempting reconnection 
while we still have {} commands in progress.", shell.getConnectedHost(), 
commandsInProgress.get());
+         stopAndCleanupConnection(true);
+         do {
+             final String host = shell.getNextHost();
+             connection = new NioClient(getAgentName(), host, shell.getPort(), 
shell.getWorkers(), shell.getSslHandshakeTimeout(), this);
+             logger.info("Reconnecting to host: {}", host);
+             try {
+                 connection.start();
+             } catch (final NioConnectionException e) {
+                 logger.info("Attempted to re-connect to the server, but 
received an unexpected exception, trying again...", e);
+                 stopAndCleanupConnection(false);
+             }
+             shell.getBackoffAlgorithm().waitBeforeRetry();
+         } while (!connection.isStartup());
+         shell.updateConnectedHost();
+         logger.info("Connected to the host: {}", shell.getConnectedHost());
+     }
  
-         _resource.disconnected();
- 
-         logger.info("Lost connection to host: {}. Attempting reconnection 
while we still have {} commands in progress.", _shell.getConnectedHost(), 
_inProgress.get());
- 
-         _connection.stop();
++    protected void closeAndTerminateLink(final Link link) {
++        if (link == null) {
++            return;
++        }
++        link.close();
++        link.terminated();
++    }
 +
++    protected void stopAndCleanupConnection(boolean waitForStop) {
++        if (connection == null) {
++            return;
++        }
++        connection.stop();
 +        try {
-             _connection.cleanUp();
++            connection.cleanUp();
 +        } catch (final IOException e) {
 +            logger.warn("Fail to clean up old connection. {}", e);
 +        }
- 
-         while (_connection.isStartup()) {
-             _shell.getBackoffAlgorithm().waitBeforeRetry();
-         }
- 
-         String host = preferredHost;
-         if (StringUtils.isEmpty(host)) {
-             host = _shell.getNextHost();
++        if (!waitForStop) {
++            return;
 +        }
- 
 +        do {
-             if (CollectionUtils.isEmpty(avoidHostList) || 
!avoidHostList.contains(host)) {
-                 _connection = new NioClient("Agent", host, _shell.getPort(), 
_shell.getWorkers(), this);
-                 logger.info("Reconnecting to host:{}", host);
-                 try {
-                     _connection.start();
-                 } catch (final NioConnectionException e) {
-                     logger.info("Attempted to re-connect to the server, but 
received an unexpected exception, trying again...", e);
-                     _connection.stop();
-                     try {
-                         _connection.cleanUp();
-                     } catch (final IOException ex) {
-                         logger.warn("Fail to clean up old connection. {}", 
ex);
-                     }
-                 }
-             }
-             _shell.getBackoffAlgorithm().waitBeforeRetry();
-             host = _shell.getNextHost();
-         } while (!_connection.isStartup());
-         _shell.updateConnectedHost();
-         logger.info("Connected to the host: {}", _shell.getConnectedHost());
++            shell.getBackoffAlgorithm().waitBeforeRetry();
++        } while (connection.isStartup());
 +    }
 +
      public void processStartupAnswer(final Answer answer, final Response 
response, final Link link) {
-         boolean cancelled = false;
-         synchronized (this) {
-             if (_startup != null) {
-                 _startup.cancel();
-                 _startup = null;
-             } else {
-                 cancelled = true;
-             }
-         }
+         boolean answerValid = cancelStartupTask();
          final StartupAnswer startup = (StartupAnswer)answer;
          if (!startup.getResult()) {
              logger.error("Not allowed to connect to the server: {}", 
answer.getDetails());
@@@ -879,53 -925,6 +946,53 @@@
          return new SetupMSListAnswer(true);
      }
  
 +    private Answer migrateAgentToOtherMS(final MigrateAgentConnectionCommand 
cmd) {
 +        try {
 +            if (CollectionUtils.isNotEmpty(cmd.getMsList())) {
 +                processManagementServerList(cmd.getMsList(), 
cmd.getLbAlgorithm(), cmd.getLbCheckInterval());
 +            }
 +            migrateAgentConnection(cmd.getAvoidMsList());
 +        } catch (Exception e) {
 +            String errMsg = "Migrate agent connection failed, due to " + 
e.getMessage();
 +            logger.debug(errMsg, e);
 +            return new MigrateAgentConnectionAnswer(errMsg);
 +        }
 +        return new MigrateAgentConnectionAnswer(true);
 +    }
 +
 +    private void migrateAgentConnection(List<String> avoidMsList) {
-         final String[] msHosts = _shell.getHosts();
++        final String[] msHosts = shell.getHosts();
 +        if (msHosts == null || msHosts.length < 1) {
 +            throw new CloudRuntimeException("Management Server hosts empty, 
not properly configured in agent");
 +        }
 +
 +        List<String> msHostsList = new ArrayList<>(Arrays.asList(msHosts));
 +        msHostsList.removeAll(avoidMsList);
 +        if (msHostsList.isEmpty() || StringUtils.isEmpty(msHostsList.get(0))) 
{
 +            throw new CloudRuntimeException("No other Management Server hosts 
to migrate");
 +        }
 +
 +        String preferredHost  = null;
 +        for (String msHost : msHostsList) {
 +            try (final Socket socket = new Socket()) {
-                 socket.connect(new InetSocketAddress(msHost, 
_shell.getPort()), 5000);
++                socket.connect(new InetSocketAddress(msHost, 
shell.getPort()), 5000);
 +                preferredHost = msHost;
 +                break;
 +            } catch (final IOException e) {
 +                throw new CloudRuntimeException("Management server host: " + 
msHost + " is not reachable, to migrate connection");
 +            }
 +        }
 +
 +        if (preferredHost == null) {
 +            throw new CloudRuntimeException("Management server host(s) are 
not reachable, to migrate connection");
 +        }
 +
 +        logger.debug("Management server host " + preferredHost + " is found 
to be reachable, trying to reconnect");
-         _shell.resetHostCounter();
-         _shell.setConnectionTransfer(true);
-         reconnect(_link, preferredHost, avoidMsList, true);
++        shell.resetHostCounter();
++        shell.setConnectionTransfer(true);
++        reconnect(link, preferredHost, avoidMsList, true);
 +    }
 +
      public void processResponse(final Response response, final Link link) {
          final Answer answer = response.getAnswer();
          logger.debug("Received response: {}", response.toString());
@@@ -1133,15 -1125,15 +1193,15 @@@
          }
      }
  
-     public class WatchTask extends ManagedContextTimerTask {
+     public class WatchTask implements Runnable {
          protected Request _request;
          protected Agent _agent;
--        protected Link _link;
++        protected Link link;
  
          public WatchTask(final Link link, final Request request, final Agent 
agent) {
              super();
              _request = request;
--            _link = link;
++            this.link = link;
              _agent = agent;
          }
  
@@@ -1150,9 -1142,9 +1210,9 @@@
              logger.trace("Scheduling {}", (_request instanceof Response ? 
"Ping" : "Watch Task"));
              try {
                  if (_request instanceof Response) {
-                     _ugentTaskPool.submit(new ServerHandler(Task.Type.OTHER, 
_link, _request));
 -                    outRequestHandler.submit(new 
ServerHandler(Task.Type.OTHER, _link, _request));
++                    outRequestHandler.submit(new 
ServerHandler(Task.Type.OTHER, link, _request));
                  } else {
--                    _link.schedule(new ServerHandler(Task.Type.OTHER, _link, 
_request));
++                    link.schedule(new ServerHandler(Task.Type.OTHER, link, 
_request));
                  }
              } catch (final ClosedChannelException e) {
                  logger.warn("Unable to schedule task because channel is 
closed");
@@@ -1160,35 -1152,32 +1220,32 @@@
          }
      }
  
-     public class StartupTask extends ManagedContextTimerTask {
-         protected Link _link;
-         protected volatile boolean cancelled = false;
+     public class StartupTask implements Runnable {
 -        protected Link _link;
++        protected Link link;
+         private final AtomicBoolean cancelled = new AtomicBoolean(false);
  
          public StartupTask(final Link link) {
              logger.debug("Startup task created");
--            _link = link;
++            this.link = link;
          }
  
-         @Override
-         public synchronized boolean cancel() {
+         public boolean cancel() {
              // TimerTask.cancel may fail depends on the calling context
-             if (!cancelled) {
-                 cancelled = true;
-                 _startupWait = _startupWaitDefault;
+             if (cancelled.compareAndSet(false, true)) {
+                 startupWait = DEFAULT_STARTUP_WAIT;
                  logger.debug("Startup task cancelled");
-                 return super.cancel();
              }
              return true;
          }
  
          @Override
-         protected synchronized void runInContext() {
-             if (!cancelled) {
-                 logger.info("The startup command is now cancelled");
-                 cancelled = true;
-                 _startup = null;
-                 _startupWait = _startupWaitDefault * 2;
-                 reconnect(_link);
+         public void run() {
+             if (cancelled.compareAndSet(false, true)) {
+                 logger.info("The running startup command is now invalid. 
Attempting reconnect");
+                 startupTask.set(null);
+                 startupWait = DEFAULT_STARTUP_WAIT * 2;
 -                logger.debug("Executing reconnect from task - {}", () -> 
getLinkLog(_link));
 -                reconnect(_link);
++                logger.debug("Executing reconnect from task - {}", () -> 
getLinkLog(link));
++                reconnect(link);
              }
          }
      }
@@@ -1219,10 -1208,9 +1276,10 @@@
          @Override
          public void doTask(final Task task) throws TaskExecutionException {
              if (task.getType() == Task.Type.CONNECT) {
-                 _shell.getBackoffAlgorithm().reset();
+                 shell.getBackoffAlgorithm().reset();
                  setLink(task.getLink());
-                 sendStartup(task.getLink(), _shell.isConnectionTransfer());
-                 _shell.setConnectionTransfer(false);
 -                sendStartup(task.getLink());
++                sendStartup(task.getLink(), shell.isConnectionTransfer());
++                shell.setConnectionTransfer(false);
              } else if (task.getType() == Task.Type.DATA) {
                  Request request;
                  try {
@@@ -1241,15 -1229,8 +1298,15 @@@
                      logger.error("Error parsing task", e);
                  }
              } else if (task.getType() == Task.Type.DISCONNECT) {
 +                try {
 +                    // an issue has been found if reconnect immediately after 
disconnecting. please refer to https://github.com/apache/cloudstack/issues/8517
 +                    // wait 5 seconds before reconnecting
 +                    Thread.sleep(5000);
 +                } catch (InterruptedException e) {
 +                }
-                 _shell.setConnectionTransfer(false);
++                shell.setConnectionTransfer(false);
+                 logger.debug("Executing disconnect task - {}", () -> 
getLinkLog(task.getLink()));
                  reconnect(task.getLink());
-                 return;
              } else if (task.getType() == Task.Type.OTHER) {
                  processOtherTask(task);
              }
diff --cc agent/src/main/java/com/cloud/agent/IAgentShell.java
index 0b9d9e81e95,7f04048795d..c0ecd90ae69
--- a/agent/src/main/java/com/cloud/agent/IAgentShell.java
+++ b/agent/src/main/java/com/cloud/agent/IAgentShell.java
@@@ -71,7 -71,5 +71,9 @@@ public interface IAgentShell 
  
      void launchNewAgent(ServerResource resource) throws 
ConfigurationException;
  
 +    boolean isConnectionTransfer();
 +
 +    void setConnectionTransfer(boolean connectionTransfer);
++
+     Integer getSslHandshakeTimeout();
  }
diff --cc 
engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java
index f154eaddc1e,1ab3b7ff892..765602e42d0
--- 
a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java
+++ 
b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java
@@@ -16,9 -16,9 +16,10 @@@
  // under the License.
  package com.cloud.agent.manager;
  
 +import java.io.IOException;
  import java.lang.reflect.Constructor;
  import java.lang.reflect.InvocationTargetException;
+ import java.net.SocketAddress;
  import java.nio.channels.ClosedChannelException;
  import java.util.ArrayList;
  import java.util.Arrays;
@@@ -88,6 -82,7 +86,9 @@@ import com.cloud.agent.api.UnsupportedA
  import com.cloud.agent.transport.Request;
  import com.cloud.agent.transport.Response;
  import com.cloud.alert.AlertManager;
++import com.cloud.cluster.ManagementServerHostVO;
++import com.cloud.cluster.dao.ManagementServerHostDao;
+ import com.cloud.configuration.Config;
  import com.cloud.configuration.ManagementServiceConfiguration;
  import com.cloud.dc.ClusterVO;
  import com.cloud.dc.DataCenterVO;
@@@ -142,14 -138,13 +144,13 @@@ public class AgentManagerImpl extends M
       * _agents is a ConcurrentHashMap, but it is used from within a 
synchronized block. This will be reported by findbugs as 
JLM_JSR166_UTILCONCURRENT_MONITORENTER. Maybe a
       * ConcurrentHashMap is not the right thing to use here, but i'm not sure 
so i leave it alone.
       */
--    protected ConcurrentHashMap<Long, AgentAttache> _agents = new 
ConcurrentHashMap<Long, AgentAttache>(10007);
--    protected List<Pair<Integer, Listener>> _hostMonitors = new 
ArrayList<Pair<Integer, Listener>>(17);
--    protected List<Pair<Integer, Listener>> _cmdMonitors = new 
ArrayList<Pair<Integer, Listener>>(17);
--    protected List<Pair<Integer, StartupCommandProcessor>> _creationMonitors 
= new ArrayList<Pair<Integer, StartupCommandProcessor>>(17);
--    protected List<Long> _loadingAgents = new ArrayList<Long>();
++    protected ConcurrentHashMap<Long, AgentAttache> _agents = new 
ConcurrentHashMap<>(10007);
++    protected List<Pair<Integer, Listener>> _hostMonitors = new 
ArrayList<>(17);
++    protected List<Pair<Integer, Listener>> _cmdMonitors = new 
ArrayList<>(17);
++    protected List<Pair<Integer, StartupCommandProcessor>> _creationMonitors 
= new ArrayList<>(17);
++    protected List<Long> _loadingAgents = new ArrayList<>();
      protected Map<String, Integer> _commandTimeouts = new HashMap<>();
      private int _monitorId = 0;
-     private final Lock _agentStatusLock = new ReentrantLock();
  
      @Inject
      protected CAManager caService;
@@@ -198,28 -188,37 +199,39 @@@
  
      private int _directAgentThreadCap;
  
 +    private List<String> lastAgents = null;
 +
      protected StateMachine2<Status, Status.Event, Host> _statusStateMachine = 
Status.getStateMachine();
--    private final ConcurrentHashMap<Long, Long> _pingMap = new 
ConcurrentHashMap<Long, Long>(10007);
++    private final ConcurrentHashMap<Long, Long> _pingMap = new 
ConcurrentHashMap<>(10007);
+     private int maxConcurrentNewAgentConnections;
+     private final ConcurrentHashMap<String, Long> newAgentConnections = new 
ConcurrentHashMap<>();
+     protected ScheduledExecutorService newAgentConnectionsMonitor;
  
      @Inject
      ResourceManager _resourceMgr;
      @Inject
      ManagementServiceConfiguration mgmtServiceConf;
  
--    protected final ConfigKey<Integer> Workers = new 
ConfigKey<Integer>("Advanced", Integer.class, "workers", "5",
++    protected final ConfigKey<Integer> Workers = new ConfigKey<>("Advanced", 
Integer.class, "workers", "5",
              "Number of worker threads handling remote agent connections.", 
false);
--    protected final ConfigKey<Integer> Port = new 
ConfigKey<Integer>("Advanced", Integer.class, "port", "8250", "Port to listen 
on for remote agent connections.", false);
-     protected final ConfigKey<Integer> AlertWait = new 
ConfigKey<Integer>("Advanced", Integer.class, "alert.wait", "1800",
++    protected final ConfigKey<Integer> Port = new ConfigKey<>("Advanced", 
Integer.class, "port", "8250", "Port to listen on for remote agent 
connections.", false);
+     protected final ConfigKey<Integer> RemoteAgentSslHandshakeTimeout = new 
ConfigKey<>("Advanced",
+             Integer.class, "agent.ssl.handshake.timeout", "30",
+             "Seconds after which SSL handshake times out during remote agent 
connections.", false);
+     protected final ConfigKey<Integer> RemoteAgentMaxConcurrentNewConnections 
= new ConfigKey<>("Advanced",
+             Integer.class, "agent.max.concurrent.new.connections", "0",
+             "Number of maximum concurrent new connections server allows for 
remote agents. " +
+                     "If set to zero (default value) then no limit will be 
enforced on concurrent new connections",
+             false);
 -    protected final ConfigKey<Integer> AlertWait = new 
ConfigKey<Integer>("Advanced", Integer.class, "alert.wait", "1800",
++    protected final ConfigKey<Integer> AlertWait = new 
ConfigKey<>("Advanced", Integer.class, "alert.wait", "1800",
              "Seconds to wait before alerting on a disconnected agent", true);
--    protected final ConfigKey<Integer> DirectAgentLoadSize = new 
ConfigKey<Integer>("Advanced", Integer.class, "direct.agent.load.size", "16",
++    protected final ConfigKey<Integer> DirectAgentLoadSize = new 
ConfigKey<>("Advanced", Integer.class, "direct.agent.load.size", "16",
              "The number of direct agents to load each time", false);
--    protected final ConfigKey<Integer> DirectAgentPoolSize = new 
ConfigKey<Integer>("Advanced", Integer.class, "direct.agent.pool.size", "500",
++    protected final ConfigKey<Integer> DirectAgentPoolSize = new 
ConfigKey<>("Advanced", Integer.class, "direct.agent.pool.size", "500",
              "Default size for DirectAgentPool", false);
--    protected final ConfigKey<Float> DirectAgentThreadCap = new 
ConfigKey<Float>("Advanced", Float.class, "direct.agent.thread.cap", "1",
++    protected final ConfigKey<Float> DirectAgentThreadCap = new 
ConfigKey<>("Advanced", Float.class, "direct.agent.thread.cap", "1",
              "Percentage (as a value between 0 and 1) of 
direct.agent.pool.size to be used as upper thread cap for a single direct agent 
to process requests", false);
--    protected final ConfigKey<Boolean> CheckTxnBeforeSending = new 
ConfigKey<Boolean>("Developer", Boolean.class, 
"check.txn.before.sending.agent.commands", "false",
++    protected final ConfigKey<Boolean> CheckTxnBeforeSending = new 
ConfigKey<>("Developer", Boolean.class, 
"check.txn.before.sending.agent.commands", "false",
              "This parameter allows developers to enable a check to see if a 
transaction wraps commands that are sent to the resource.  This is not to be 
enabled on production systems.", true);
  
      @Override
@@@ -239,11 -236,10 +249,13 @@@
  
          registerForHostEvents(new SetHostParamsListener(), true, true, false);
  
 +        managementServerMaintenanceManager.registerListener(this);
 +
-         _executor = new ThreadPoolExecutor(threads, threads, 60l, 
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new 
NamedThreadFactory("AgentTaskPool"));
+         final int agentTaskThreads = DirectAgentLoadSize.value();
 -        _executor = new ThreadPoolExecutor(agentTaskThreads, 
agentTaskThreads, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), 
new NamedThreadFactory("AgentTaskPool"));
+ 
 -        _connectExecutor = new ThreadPoolExecutor(100, 500, 60l, 
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new 
NamedThreadFactory("AgentConnectTaskPool"));
++        _executor = new ThreadPoolExecutor(agentTaskThreads, 
agentTaskThreads, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new 
NamedThreadFactory("AgentTaskPool"));
 +
-         _connectExecutor = new ThreadPoolExecutor(100, 500, 60l, 
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new 
NamedThreadFactory("AgentConnectTaskPool"));
++        _connectExecutor = new ThreadPoolExecutor(100, 500, 60L, 
TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new 
NamedThreadFactory("AgentConnectTaskPool"));
          // allow core threads to time out even when there are no items in the 
queue
          _connectExecutor.allowCoreThreadTimeOut(true);
  
@@@ -269,22 -271,44 +287,44 @@@
          return new AgentHandler(type, link, data);
      }
  
+     @Override
+     public int getMaxConcurrentNewConnectionsCount() {
+         return maxConcurrentNewAgentConnections;
+     }
+ 
+     @Override
+     public int getNewConnectionsCount() {
+         return newAgentConnections.size();
+     }
+ 
+     @Override
+     public void registerNewConnection(SocketAddress address) {
 -        logger.trace(String.format("Adding new agent connection from %s", 
address.toString()));
++        logger.trace("Adding new agent connection from {}", 
address.toString());
+         newAgentConnections.putIfAbsent(address.toString(), 
System.currentTimeMillis());
+     }
+ 
+     @Override
+     public void unregisterNewConnection(SocketAddress address) {
 -        logger.trace(String.format("Removing new agent connection for %s", 
address.toString()));
++        logger.trace("Removing new agent connection for {}", 
address.toString());
+         newAgentConnections.remove(address.toString());
+     }
+ 
      @Override
      public int registerForHostEvents(final Listener listener, final boolean 
connections, final boolean commands, final boolean priority) {
          synchronized (_hostMonitors) {
              _monitorId++;
              if (connections) {
                  if (priority) {
--                    _hostMonitors.add(0, new Pair<Integer, 
Listener>(_monitorId, listener));
++                    _hostMonitors.add(0, new Pair<>(_monitorId, listener));
                  } else {
--                    _hostMonitors.add(new Pair<Integer, Listener>(_monitorId, 
listener));
++                    _hostMonitors.add(new Pair<>(_monitorId, listener));
                  }
              }
              if (commands) {
                  if (priority) {
--                    _cmdMonitors.add(0, new Pair<Integer, 
Listener>(_monitorId, listener));
++                    _cmdMonitors.add(0, new Pair<>(_monitorId, listener));
                  } else {
--                    _cmdMonitors.add(new Pair<Integer, Listener>(_monitorId, 
listener));
++                    _cmdMonitors.add(new Pair<>(_monitorId, listener));
                  }
              }
              logger.debug("Registering listener {} with id {}", 
listener.getClass().getSimpleName(), _monitorId);
@@@ -297,9 -321,9 +337,9 @@@
          synchronized (_hostMonitors) {
              _monitorId++;
              if (priority) {
--                _creationMonitors.add(0, new Pair<Integer, 
StartupCommandProcessor>(_monitorId, creator));
++                _creationMonitors.add(0, new Pair<>(_monitorId, creator));
              } else {
--                _creationMonitors.add(new Pair<Integer, 
StartupCommandProcessor>(_monitorId, creator));
++                _creationMonitors.add(new Pair<>(_monitorId, creator));
              }
              return _monitorId;
          }
@@@ -311,47 -335,8 +351,47 @@@
          _hostMonitors.remove(id);
      }
  
 +    @Override
 +    public void onManagementServerMaintenance() {
 +        logger.debug("Management server maintenance enabled");
 +        _monitorExecutor.shutdownNow();
 +        if (_connection != null) {
 +            _connection.stop();
 +
 +            try {
 +                _connection.cleanUp();
 +            } catch (final IOException e) {
 +                logger.warn("Fail to clean up old connection", e);
 +            }
 +        }
 +        _connectExecutor.shutdownNow();
 +    }
 +
 +    @Override
 +    public void onManagementServerCancelMaintenance() {
 +        logger.debug("Management server maintenance disabled");
 +        if (_connectExecutor.isShutdown()) {
-             _connectExecutor = new ThreadPoolExecutor(100, 500, 60l, 
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new 
NamedThreadFactory("AgentConnectTaskPool"));
++            _connectExecutor = new ThreadPoolExecutor(100, 500, 60L, 
TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new 
NamedThreadFactory("AgentConnectTaskPool"));
 +            _connectExecutor.allowCoreThreadTimeOut(true);
 +        }
 +
 +        startDirectlyConnectedHosts(true);
 +        if (_connection != null) {
 +            try {
 +                _connection.start();
 +            } catch (final NioConnectionException e) {
 +                logger.error("Error when connecting to the NioServer!", e);
 +            }
 +        }
 +
 +        if (_monitorExecutor.isShutdown()) {
 +            _monitorExecutor = new ScheduledThreadPoolExecutor(1, new 
NamedThreadFactory("AgentMonitor"));
 +            _monitorExecutor.scheduleWithFixedDelay(new MonitorTask(), 
mgmtServiceConf.getPingInterval(), mgmtServiceConf.getPingInterval(), 
TimeUnit.SECONDS);
 +        }
 +    }
 +
      private AgentControlAnswer handleControlCommand(final AgentAttache 
attache, final AgentControlCommand cmd) {
--        AgentControlAnswer answer = null;
++        AgentControlAnswer answer;
  
          for (final Pair<Integer, Listener> listener : _cmdMonitors) {
              answer = listener.second().processControlCommand(attache.getId(), 
cmd);
@@@ -379,7 -364,7 +419,7 @@@
      }
  
      public AgentAttache findAttache(final long hostId) {
--        AgentAttache attache = null;
++        AgentAttache attache;
          synchronized (_agents) {
              attache = _agents.get(hostId);
          }
@@@ -431,12 -406,12 +471,10 @@@
          cmds.addCommand(cmd);
          send(hostId, cmds, cmd.getWait());
          final Answer[] answers = cmds.getAnswers();
--        if (answers != null && !(answers[0] instanceof UnsupportedAnswer)) {
--            return answers[0];
--        }
--
--        if (answers != null && answers[0] instanceof UnsupportedAnswer) {
--            logger.warn("Unsupported Command: {}", answers[0].getDetails());
++        if (answers != null) {
++            if (answers[0] instanceof UnsupportedAnswer) {
++                logger.warn("Unsupported Command: {}", 
answers[0].getDetails());
++            }
              return answers[0];
          }
  
@@@ -467,8 -442,8 +505,8 @@@
      }
  
      /**
--     * @param commands
--     * @return
++     * @param commands object container of commands
++     * @return array of commands
       */
      private Command[] checkForCommandsAndTag(final Commands commands) {
          final Command[] cmds = commands.toCommands();
@@@ -484,8 -459,8 +522,8 @@@
      }
  
      /**
--     * @param commands
--     * @param cmds
++     * @param commands object container of commands
++     * @param cmds array of commands
       */
      private void setEmptyAnswers(final Commands commands, final Command[] 
cmds) {
          if (cmds.length == 0) {
@@@ -524,7 -499,7 +562,7 @@@
          String commandWaits = GranularWaitTimeForCommands.value().trim();
          if (StringUtils.isNotEmpty(commandWaits)) {
              _commandTimeouts = getCommandTimeoutsMap(commandWaits);
--            logger.info(String.format("Timeouts for management server 
internal commands successfully initialized from global setting 
commands.timeout: %s", _commandTimeouts));
++            logger.info("Timeouts for management server internal commands 
successfully initialized from global setting commands.timeout: {}", 
_commandTimeouts);
          }
      }
  
@@@ -540,10 -515,10 +578,10 @@@
                      int commandTimeout = Integer.parseInt(parts[1].trim());
                      commandTimeouts.put(commandName, commandTimeout);
                  } catch (NumberFormatException e) {
--                    logger.error(String.format("Initialising the timeouts 
using commands.timeout: %s for management server internal commands failed with 
error %s", commandPair, e.getMessage()));
++                    logger.error("Initialising the timeouts using 
commands.timeout: {} for management server internal commands failed with error 
{}", commandPair, e.getMessage());
                  }
              } else {
--                logger.error(String.format("Error initialising the timeouts 
for management server internal commands. Invalid format in commands.timeout: 
%s", commandPair));
++                logger.error("Error initialising the timeouts for management 
server internal commands. Invalid format in commands.timeout: {}", commandPair);
              }
          }
          return commandTimeouts;
@@@ -557,7 -532,7 +595,7 @@@
          }
  
          int wait = getTimeout(commands, timeout);
--        logger.debug(String.format("Wait time setting on %s is %d seconds", 
commands, wait));
++        logger.debug("Wait time setting on {} is {} seconds", commands, wait);
          for (Command cmd : commands) {
              String simpleCommandName = cmd.getClass().getSimpleName();
              Integer commandTimeout = _commandTimeouts.get(simpleCommandName);
@@@ -644,7 -619,7 +682,7 @@@
          }
          final long hostId = attache.getId();
          logger.debug("Remove Agent : {}", attache);
--        AgentAttache removed = null;
++        AgentAttache removed;
          boolean conflict = false;
          synchronized (_agents) {
              removed = _agents.remove(hostId);
@@@ -697,16 -672,16 +735,15 @@@
                      }
                  } catch (final HypervisorVersionChangedException hvce) {
                      handleDisconnectWithoutInvestigation(attache, 
Event.ShutdownRequested, true, true);
--                    throw new CloudRuntimeException("Unable to connect " + 
(attache == null ? "<unknown agent>" : attache.getId()), hvce);
++                    throw new CloudRuntimeException("Unable to connect " + 
attache.getId(), hvce);
                  } catch (final Exception e) {
                      logger.error("Monitor {} says there is an error in the 
connect process for {} due to {}", monitor.second().getClass().getSimpleName(), 
hostId, e.getMessage(), e);
                      handleDisconnectWithoutInvestigation(attache, 
Event.AgentDisconnected, true, true);
--                    throw new CloudRuntimeException("Unable to connect " + 
(attache == null ? "<unknown agent>" : attache.getId()), e);
++                    throw new CloudRuntimeException("Unable to connect " + 
attache.getId(), e);
                  }
              }
          }
  
--        final Long dcId = host.getDataCenterId();
          final ReadyCommand ready = new ReadyCommand(host, 
NumbersUtil.enableHumanReadableSizes);
          ready.setWait(ReadyCommandWait.value());
          final Answer answer = easySend(hostId, ready);
@@@ -775,25 -748,25 +816,25 @@@
              final Constructor<?> constructor = clazz.getConstructor();
              resource = (ServerResource)constructor.newInstance();
          } catch (final ClassNotFoundException e) {
--            logger.warn("Unable to find class " + host.getResource(), e);
++            logger.warn("Unable to find class {}", host.getResource(), e);
          } catch (final InstantiationException e) {
--            logger.warn("Unable to instantiate class " + host.getResource(), 
e);
++            logger.warn("Unable to instantiate class {}", host.getResource(), 
e);
          } catch (final IllegalAccessException e) {
--            logger.warn("Illegal access " + host.getResource(), e);
++            logger.warn("Illegal access {}", host.getResource(), e);
          } catch (final SecurityException e) {
--            logger.warn("Security error on " + host.getResource(), e);
++            logger.warn("Security error on {}", host.getResource(), e);
          } catch (final NoSuchMethodException e) {
--            logger.warn("NoSuchMethodException error on " + 
host.getResource(), e);
++            logger.warn("NoSuchMethodException error on {}", 
host.getResource(), e);
          } catch (final IllegalArgumentException e) {
--            logger.warn("IllegalArgumentException error on " + 
host.getResource(), e);
++            logger.warn("IllegalArgumentException error on {}", 
host.getResource(), e);
          } catch (final InvocationTargetException e) {
--            logger.warn("InvocationTargetException error on " + 
host.getResource(), e);
++            logger.warn("InvocationTargetException error on {}", 
host.getResource(), e);
          }
  
          if (resource != null) {
              _hostDao.loadDetails(host);
  
--            final HashMap<String, Object> params = new HashMap<String, 
Object>(host.getDetails().size() + 5);
++            final HashMap<String, Object> params = new 
HashMap<>(host.getDetails().size() + 5);
              params.putAll(host.getDetails());
  
              params.put("guid", host.getGuid());
@@@ -803,7 -776,7 +844,7 @@@
              }
              if (host.getClusterId() != null) {
                  params.put("cluster", Long.toString(host.getClusterId()));
--                String guid = null;
++                String guid;
                  final ClusterVO cluster = 
_clusterDao.findById(host.getClusterId());
                  if (cluster.getGuid() == null) {
                      guid = host.getDetail("pool");
@@@ -838,12 -811,8 +879,12 @@@
      }
  
      protected boolean loadDirectlyConnectedHost(final HostVO host, final 
boolean forRebalance) {
 +        return loadDirectlyConnectedHost(host, forRebalance, false);
 +    }
 +
 +    protected boolean loadDirectlyConnectedHost(final HostVO host, final 
boolean forRebalance, final boolean isTransferredConnection) {
          boolean initialized = false;
--        ServerResource resource = null;
++        ServerResource resource;
          try {
              // load the respective discoverer
              final Discoverer discoverer = 
_resourceMgr.getMatchingDiscover(host.getHypervisorType());
@@@ -870,21 -839,21 +911,21 @@@
  
          if (forRebalance) {
              tapLoadingAgents(host.getId(), TapAgentsAction.Add);
 -            final Host h = _resourceMgr.createHostAndAgent(host.getId(), 
resource, host.getDetails(), false, null, true);
 +            final Host h = _resourceMgr.createHostAndAgent(host.getId(), 
resource, host.getDetails(), false, null, true, isTransferredConnection);
              tapLoadingAgents(host.getId(), TapAgentsAction.Del);
  
--            return h == null ? false : true;
++            return h != null;
          } else {
              _executor.execute(new SimulateStartTask(host.getId(), 
host.getUuid(), host.getName(), resource, host.getDetails()));
              return true;
          }
      }
  
--    protected AgentAttache createAttacheForDirectConnect(final Host host, 
final ServerResource resource) throws ConnectionException {
++    protected AgentAttache createAttacheForDirectConnect(final Host host, 
final ServerResource resource) {
          logger.debug("create DirectAgentAttache for {}", host);
          final DirectAgentAttache attache = new DirectAgentAttache(this, 
host.getId(), host.getUuid(), host.getName(), resource, 
host.isInMaintenanceStates());
  
--        AgentAttache old = null;
++        AgentAttache old;
          synchronized (_agents) {
              old = _agents.put(host.getId(), attache);
          }
@@@ -949,7 -919,7 +991,7 @@@
              try {
                  logger.info("Host {} is disconnecting with event {}",
                          attache, event);
--                Status nextStatus = null;
++                Status nextStatus;
                  final HostVO host = _hostDao.findById(hostId);
                  if (host == null) {
                      logger.warn("Can't find host with {} ({})", hostId, 
attache);
@@@ -1082,7 -1052,7 +1124,7 @@@
          @Override
          protected void runInContext() {
              try {
--                if (_investigate == true) {
++                if (_investigate) {
                      handleDisconnectWithInvestigation(_attache, _event);
                  } else {
                      handleDisconnectWithoutInvestigation(_attache, _event, 
true, false);
@@@ -1134,8 -1104,8 +1176,8 @@@
      public Answer[] send(final Long hostId, final Commands cmds) throws 
AgentUnavailableException, OperationTimedoutException {
          int wait = 0;
          if (cmds.size() > 1) {
--            logger.debug(String.format("Checking the wait time in seconds to 
be used for the following commands : %s. If there are multiple commands sent at 
once," +
--                    "then max wait time of those will be used", cmds));
++            logger.debug("Checking the wait time in seconds to be used for 
the following commands : {}. If there are multiple commands sent at once," +
++                    "then max wait time of those will be used", cmds);
          }
  
          for (final Command cmd : cmds) {
@@@ -1198,7 -1168,7 +1240,7 @@@
  
      public boolean executeUserRequest(final long hostId, final Event event) 
throws AgentUnavailableException {
          if (event == Event.AgentDisconnected) {
--            AgentAttache attache = null;
++            AgentAttache attache;
              attache = findAttache(hostId);
              logger.debug("Received agent disconnect event for host {} ({})", 
hostId, attache);
              if (attache != null) {
@@@ -1224,12 -1194,12 +1266,12 @@@
          return agentAttache != null;
      }
  
--    protected AgentAttache createAttacheForConnect(final HostVO host, final 
Link link) throws ConnectionException {
++    protected AgentAttache createAttacheForConnect(final HostVO host, final 
Link link) {
          logger.debug("create ConnectedAgentAttache for {}", host);
          final AgentAttache attache = new ConnectedAgentAttache(this, 
host.getId(), host.getUuid(), host.getName(), link, 
host.isInMaintenanceStates());
          link.attach(attache);
  
--        AgentAttache old = null;
++        AgentAttache old;
          synchronized (_agents) {
              old = _agents.put(host.getId(), attache);
          }
@@@ -1254,7 -1224,7 +1296,7 @@@
              }
          }
          ready.setArch(host.getArch().getType());
--        AgentAttache attache = null;
++        AgentAttache attache;
          GlobalLock joinLock = getHostJoinLock(host.getId());
          if (joinLock.lock(60)) {
              try {
@@@ -1280,7 -1250,7 +1322,7 @@@
          return attache;
      }
  
--    private AgentAttache handleConnectedAgent(final Link link, final 
StartupCommand[] startup, final Request request) {
++    private AgentAttache handleConnectedAgent(final Link link, final 
StartupCommand[] startup) {
          AgentAttache attache = null;
          ReadyCommand ready = null;
          try {
@@@ -1308,7 -1278,7 +1350,7 @@@
                  easySend(attache.getId(), ready);
              }
          } catch (final Exception e) {
--            logger.debug("Failed to send ready command:" + e.toString());
++            logger.debug("Failed to send ready command:", e);
          }
          return attache;
      }
@@@ -1334,6 -1304,6 +1376,8 @@@
              this.id = id;
              this.resource = resource;
              this.details = details;
++            this.uuid = uuid;
++            this.name = name;
          }
  
          @Override
@@@ -1382,7 -1352,7 +1426,7 @@@
                  startups[i] = (StartupCommand)_cmds[i];
              }
  
--            final AgentAttache attache = handleConnectedAgent(_link, 
startups, _request);
++            final AgentAttache attache = handleConnectedAgent(_link, 
startups);
              if (attache == null) {
                  logger.warn("Unable to create attache for agent: {}", 
_request);
              }
@@@ -1402,7 -1373,7 +1447,7 @@@
                  break;
              }
          }
--        Response response = null;
++        Response response;
          response = new Response(request, answers[0], _nodeId, -1);
          try {
              link.send(response.toBytes());
@@@ -1483,7 -1454,7 +1528,6 @@@
              }
  
              final long hostId = attache.getId();
--            final String hostName = attache.getName();
  
              if (logger.isDebugEnabled()) {
                  if (cmd instanceof PingRoutingCommand) {
@@@ -1502,7 -1473,7 +1546,7 @@@
              final Answer[] answers = new Answer[cmds.length];
              for (int i = 0; i < cmds.length; i++) {
                  cmd = cmds[i];
--                Answer answer = null;
++                Answer answer;
                  try {
                      if (cmd instanceof StartupRoutingCommand) {
                          final StartupRoutingCommand startup = 
(StartupRoutingCommand) cmd;
@@@ -1536,7 -1507,7 +1580,7 @@@
                              final long cmdHostId = 
((PingCommand)cmd).getHostId();
                              boolean requestStartupCommand = false;
  
--                            final HostVO host = 
_hostDao.findById(Long.valueOf(cmdHostId));
++                            final HostVO host = _hostDao.findById(cmdHostId);
                              boolean gatewayAccessible = true;
                              // if the router is sending a ping, verify the
                              // gateway was pingable
@@@ -1586,7 -1557,7 +1630,7 @@@
              if (logD) {
                  logger.debug("SeqA {}-: Sending {}", attache.getId(), 
response.getSequence(), response);
              } else {
--                logger.trace("SeqA {}-: Sending {}" + attache.getId(), 
response.getSequence(), response);
++                logger.trace("SeqA {}-: Sending {} {}", 
response.getSequence(), response, attache.getId());
              }
              try {
                  link.send(response.toBytes());
@@@ -1606,15 -1577,15 +1650,14 @@@
  
          @Override
          protected void doTask(final Task task) throws TaskExecutionException {
--            final TransactionLegacy txn = 
TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
--            try {
++            try (TransactionLegacy txn = 
TransactionLegacy.open(TransactionLegacy.CLOUD_DB)) {
                  final Type type = task.getType();
--                if (type == Task.Type.DATA) {
++                if (type == Type.DATA) {
                      final byte[] data = task.getData();
                      try {
                          final Request event = Request.parse(data);
                          if (event instanceof Response) {
--                            processResponse(task.getLink(), (Response)event);
++                            processResponse(task.getLink(), (Response) event);
                          } else {
                              processRequest(task.getLink(), event);
                          }
@@@ -1626,10 -1597,10 +1669,10 @@@
                          logger.error(message);
                          throw new TaskExecutionException(message, e);
                      }
--                } else if (type == Task.Type.CONNECT) {
--                } else if (type == Task.Type.DISCONNECT) {
++                } else if (type == Type.CONNECT) {
++                } else if (type == Type.DISCONNECT) {
                      final Link link = task.getLink();
--                    final AgentAttache attache = 
(AgentAttache)link.attachment();
++                    final AgentAttache attache = (AgentAttache) 
link.attachment();
                      if (attache != null) {
                          disconnectWithInvestigation(attache, 
Event.AgentDisconnected);
                      } else {
@@@ -1638,8 -1609,8 +1681,6 @@@
                          link.terminated();
                      }
                  }
--            } finally {
--                txn.close();
              }
          }
      }
@@@ -1871,7 -1837,7 +1907,7 @@@
          }
  
          protected List<Long> findAgentsBehindOnPing() {
--            final List<Long> agentsBehind = new ArrayList<Long>();
++            final List<Long> agentsBehind = new ArrayList<>();
              final long cutoffTime = InaccurateClock.getTimeInSeconds() - 
mgmtServiceConf.getTimeout();
              for (final Map.Entry<Long, Long> entry : _pingMap.entrySet()) {
                  if (entry.getValue() < cutoffTime) {
@@@ -1879,7 -1845,7 +1915,7 @@@
                  }
              }
  
--            if (agentsBehind.size() > 0) {
++            if (!agentsBehind.isEmpty()) {
                  logger.info("Found the following agents behind on ping: {}", 
agentsBehind);
              }
  
@@@ -1887,6 -1853,35 +1923,35 @@@
          }
      }
  
+     protected class AgentNewConnectionsMonitorTask extends 
ManagedContextRunnable {
+         @Override
+         protected void runInContext() {
+             logger.trace("Agent New Connections Monitor is started.");
+             final int cleanupTime = Wait.value();
+             Set<Map.Entry<String, Long>> entrySet = 
newAgentConnections.entrySet();
+             long cutOff = System.currentTimeMillis() - (cleanupTime * 60 * 
1000L);
+             if (logger.isDebugEnabled()) {
+                 List<String> expiredConnections = 
newAgentConnections.entrySet()
+                         .stream()
+                         .filter(e -> e.getValue() <= cutOff)
+                         .map(Map.Entry::getKey)
+                         .collect(Collectors.toList());
 -                logger.debug(String.format("Currently %d active new 
connections, of which %d have expired - %s",
++                logger.debug("Currently {} active new connections, of which 
{} have expired - {}",
+                         entrySet.size(),
+                         expiredConnections.size(),
 -                        StringUtils.join(expiredConnections)));
++                        StringUtils.join(expiredConnections));
+             }
+             for (Map.Entry<String, Long> entry : entrySet) {
+                 if (entry.getValue() <= cutOff) {
+                     if (logger.isTraceEnabled()) {
 -                        logger.trace(String.format("Cleaning up new agent 
connection for %s", entry.getKey()));
++                        logger.trace("Cleaning up new agent connection for 
{}", entry.getKey());
+                     }
+                     newAgentConnections.remove(entry.getKey());
+                 }
+             }
+         }
+     }
+ 
      protected class BehindOnPingListener implements Listener {
          @Override
          public boolean isRecurring() {
@@@ -1992,15 -1988,12 +2058,15 @@@
  
          @Override
          public void processConnect(final Host host, final StartupCommand cmd, 
final boolean forRebalance) {
 -            if (cmd instanceof StartupRoutingCommand) {
 -                if (((StartupRoutingCommand)cmd).getHypervisorType() == 
HypervisorType.KVM || ((StartupRoutingCommand)cmd).getHypervisorType() == 
HypervisorType.LXC) {
 -                    Map<String, String> params = new HashMap<String, 
String>();
 -                    
params.put(Config.RouterAggregationCommandEachTimeout.toString(), 
_configDao.getValue(Config.RouterAggregationCommandEachTimeout.toString()));
 -                    params.put(Config.MigrateWait.toString(), 
_configDao.getValue(Config.MigrateWait.toString()));
 -                    
params.put(NetworkOrchestrationService.TUNGSTEN_ENABLED.key(), 
String.valueOf(NetworkOrchestrationService.TUNGSTEN_ENABLED.valueIn(host.getDataCenterId())));
 +            if (!(cmd instanceof StartupRoutingCommand) || 
cmd.isConnectionTransferred()) {
 +                return;
 +            }
 +
 +            if (((StartupRoutingCommand)cmd).getHypervisorType() == 
HypervisorType.KVM || ((StartupRoutingCommand)cmd).getHypervisorType() == 
HypervisorType.LXC) {
-                 Map<String, String> params = new HashMap<String, String>();
++                Map<String, String> params = new HashMap<>();
 +                
params.put(Config.RouterAggregationCommandEachTimeout.toString(), 
_configDao.getValue(Config.RouterAggregationCommandEachTimeout.toString()));
 +                params.put(Config.MigrateWait.toString(), 
_configDao.getValue(Config.MigrateWait.toString()));
 +                
params.put(NetworkOrchestrationService.TUNGSTEN_ENABLED.key(), 
String.valueOf(NetworkOrchestrationService.TUNGSTEN_ENABLED.valueIn(host.getDataCenterId())));
  
                      try {
                          SetHostParamsCommand cmds = new 
SetHostParamsCommand(params);
@@@ -2042,13 -2037,13 +2108,13 @@@
          if (allHosts == null) {
              return null;
          }
--        Map<Long, List<Long>> hostsByZone = new HashMap<Long, List<Long>>();
++        Map<Long, List<Long>> hostsByZone = new HashMap<>();
          for (HostVO host : allHosts) {
              if (host.getHypervisorType() == HypervisorType.KVM || 
host.getHypervisorType() == HypervisorType.LXC) {
                  Long zoneId = host.getDataCenterId();
                  List<Long> hostIds = hostsByZone.get(zoneId);
                  if (hostIds == null) {
--                    hostIds = new ArrayList<Long>();
++                    hostIds = new ArrayList<>();
                  }
                  hostIds.add(host.getId());
                  hostsByZone.put(zoneId, hostIds);
diff --cc 
engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
index 732ce9d61f5,dd3666e5561..c667df5412e
--- 
a/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
+++ 
b/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
@@@ -47,16 -47,15 +47,17 @@@ import org.apache.cloudstack.framework.
  import org.apache.cloudstack.framework.config.ConfigKey;
  import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
  import org.apache.cloudstack.ha.dao.HAConfigDao;
 +import org.apache.cloudstack.maintenance.ManagementServerMaintenanceManager;
 +import 
org.apache.cloudstack.maintenance.command.BaseShutdownManagementServerHostCommand;
 +import 
org.apache.cloudstack.maintenance.command.CancelMaintenanceManagementServerHostCommand;
 +import 
org.apache.cloudstack.maintenance.command.CancelShutdownManagementServerHostCommand;
 +import 
org.apache.cloudstack.maintenance.command.PrepareForMaintenanceManagementServerHostCommand;
 +import 
org.apache.cloudstack.maintenance.command.PrepareForShutdownManagementServerHostCommand;
 +import 
org.apache.cloudstack.maintenance.command.TriggerShutdownManagementServerHostCommand;
  import org.apache.cloudstack.managed.context.ManagedContextRunnable;
  import org.apache.cloudstack.managed.context.ManagedContextTimerTask;
+ import org.apache.cloudstack.management.ManagementServerHost;
  import org.apache.cloudstack.outofbandmanagement.dao.OutOfBandManagementDao;
 -import org.apache.cloudstack.shutdown.ShutdownManager;
 -import 
org.apache.cloudstack.shutdown.command.BaseShutdownManagementServerHostCommand;
 -import 
org.apache.cloudstack.shutdown.command.CancelShutdownManagementServerHostCommand;
 -import 
org.apache.cloudstack.shutdown.command.PrepareForShutdownManagementServerHostCommand;
 -import 
org.apache.cloudstack.shutdown.command.TriggerShutdownManagementServerHostCommand;
  import org.apache.cloudstack.utils.identity.ManagementServerNode;
  import org.apache.cloudstack.utils.security.SSLUtils;
  
@@@ -107,14 -100,14 +105,16 @@@ import com.cloud.utils.nio.Link
  import com.cloud.utils.nio.Task;
  import com.google.gson.Gson;
  
++import org.apache.commons.collections.CollectionUtils;
++
  public class ClusteredAgentManagerImpl extends AgentManagerImpl implements 
ClusterManagerListener, ClusteredAgentRebalanceService {
 -    private static final ScheduledExecutorService s_transferExecutor = 
Executors.newScheduledThreadPool(2, new 
NamedThreadFactory("Cluster-AgentRebalancingExecutor"));
 +    private static ScheduledExecutorService s_transferExecutor = 
Executors.newScheduledThreadPool(2, new 
NamedThreadFactory("Cluster-AgentRebalancingExecutor"));
      private final long rebalanceTimeOut = 300000; // 5 mins - after this time 
remove the agent from the transfer list
  
      public final static long STARTUP_DELAY = 5000;
      public final static long SCAN_INTERVAL = 90000; // 90 seconds, it takes 
60 sec for xenserver to fail login
      public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; 
// 5 seconds
--    protected Set<Long> _agentToTransferIds = new HashSet<Long>();
++    protected Set<Long> _agentToTransferIds = new HashSet<>();
      Gson _gson;
      protected HashMap<String, SocketChannel> _peers;
      protected HashMap<String, SSLEngine> _sslEngines;
@@@ -151,17 -139,17 +151,17 @@@
          super();
      }
  
--    protected final ConfigKey<Boolean> EnableLB = new 
ConfigKey<Boolean>(Boolean.class, "agent.lb.enabled", "Advanced", "false", 
"Enable agent load balancing between management server nodes", true);
--    protected final ConfigKey<Double> ConnectedAgentThreshold = new 
ConfigKey<Double>(Double.class, "agent.load.threshold", "Advanced", "0.7",
++    protected final ConfigKey<Boolean> EnableLB = new 
ConfigKey<>(Boolean.class, "agent.lb.enabled", "Advanced", "false", "Enable 
agent load balancing between management server nodes", true);
++    protected final ConfigKey<Double> ConnectedAgentThreshold = new 
ConfigKey<>(Double.class, "agent.load.threshold", "Advanced", "0.7",
              "What percentage of the agents can be held by one management 
server before load balancing happens", true, EnableLB.key());
--    protected final ConfigKey<Integer> LoadSize = new 
ConfigKey<Integer>(Integer.class, "direct.agent.load.size", "Advanced", "16", 
"How many agents to connect to in each round", true);
--    protected final ConfigKey<Integer> ScanInterval = new 
ConfigKey<Integer>(Integer.class, "direct.agent.scan.interval", "Advanced", 
"90", "Interval between scans to load agents", false,
++    protected final ConfigKey<Integer> LoadSize = new 
ConfigKey<>(Integer.class, "direct.agent.load.size", "Advanced", "16", "How 
many agents to connect to in each round", true);
++    protected final ConfigKey<Integer> ScanInterval = new 
ConfigKey<>(Integer.class, "direct.agent.scan.interval", "Advanced", "90", 
"Interval between scans to load agents", false,
              ConfigKey.Scope.Global, 1000);
  
      @Override
      public boolean configure(final String name, final Map<String, Object> 
xmlParams) throws ConfigurationException {
--        _peers = new HashMap<String, SocketChannel>(7);
--        _sslEngines = new HashMap<String, SSLEngine>(7);
++        _peers = new HashMap<>(7);
++        _sslEngines = new HashMap<>(7);
          _nodeId = ManagementServerNode.getManagementServerId();
  
          logger.info("Configuring ClusterAgentManagerImpl. management server 
node id(msid): {}", _nodeId);
@@@ -220,7 -201,7 +220,7 @@@
  
          if (hosts != null) {
              hosts.addAll(appliances);
--            if (hosts.size() > 0) {
++            if (!hosts.isEmpty()) {
                  logger.debug("Found {} unmanaged direct hosts, processing 
connect for them...", hosts.size());
                  for (final HostVO host : hosts) {
                      try {
@@@ -267,10 -246,10 +265,10 @@@
          logger.debug("create forwarding ClusteredAgentAttache for {}", host);
          long id = host.getId();
          final AgentAttache attache = new ClusteredAgentAttache(this, id, 
host.getUuid(), host.getName());
--        AgentAttache old = null;
++        AgentAttache old;
          synchronized (_agents) {
-             old = _agents.get(id);
-             _agents.put(id, attache);
+             old = _agents.get(host.getId());
+             _agents.put(host.getId(), attache);
          }
          if (old != null) {
              logger.debug("Remove stale agent attache from current management 
server");
@@@ -284,7 -263,7 +282,7 @@@
          logger.debug("create ClusteredAgentAttache for {}",  host);
          final AgentAttache attache = new ClusteredAgentAttache(this, 
host.getId(), host.getUuid(), host.getName(), link, 
host.isInMaintenanceStates());
          link.attach(attache);
--        AgentAttache old = null;
++        AgentAttache old;
          synchronized (_agents) {
              old = _agents.get(host.getId());
              _agents.put(host.getId(), attache);
@@@ -299,7 -278,7 +297,7 @@@
      protected AgentAttache createAttacheForDirectConnect(final Host host, 
final ServerResource resource) {
          logger.debug("Create ClusteredDirectAgentAttache for {}.", host);
          final DirectAgentAttache attache = new 
ClusteredDirectAgentAttache(this, host.getId(), host.getUuid(), host.getName(), 
_nodeId, resource, host.isInMaintenanceStates());
--        AgentAttache old = null;
++        AgentAttache old;
          synchronized (_agents) {
              old = _agents.get(host.getId());
              _agents.put(host.getId(), attache);
@@@ -418,12 -397,12 +416,12 @@@
      public boolean routeToPeer(final String peer, final byte[] bytes) {
          int i = 0;
          SocketChannel ch = null;
--        SSLEngine sslEngine = null;
++        SSLEngine sslEngine;
          while (i++ < 5) {
              ch = connectToPeer(peer, ch);
              if (ch == null) {
                  try {
--                    logD(bytes, "Unable to route to peer: " + 
Request.parse(bytes).toString());
++                    logD(bytes, "Unable to route to peer: " + 
Request.parse(bytes));
                  } catch (ClassNotFoundException | UnsupportedVersionException 
e) {
                      // Request.parse thrown exception when we try to log it, 
log as much as we can
                      logD(bytes, "Unable to route to peer, and Request.parse 
further caught exception" + e.getMessage());
@@@ -441,7 -420,7 +439,7 @@@
                  return true;
              } catch (final IOException e) {
                  try {
--                    logI(bytes, "Unable to route to peer: " + 
Request.parse(bytes).toString() + " due to " + e.getMessage());
++                    logI(bytes, "Unable to route to peer: " + 
Request.parse(bytes) + " due to " + e.getMessage());
                  } catch (ClassNotFoundException | UnsupportedVersionException 
ex) {
                      // Request.parse thrown exception when we try to log it, 
log as much as we can
                      logI(bytes, "Unable to route to peer due to" + 
e.getMessage() + ". Also caught exception when parsing request: " + 
ex.getMessage());
@@@ -484,7 -463,7 +482,7 @@@
      public SocketChannel connectToPeer(final String peerName, final 
SocketChannel prevCh) {
          synchronized (_peers) {
              final SocketChannel ch = _peers.get(peerName);
--            SSLEngine sslEngine = null;
++            SSLEngine sslEngine;
              if (prevCh != null) {
                  try {
                      prevCh.close();
@@@ -617,9 -596,9 +615,8 @@@
  
          @Override
          protected void doTask(final Task task) throws TaskExecutionException {
--            final TransactionLegacy txn = 
TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
--            try {
--                if (task.getType() != Task.Type.DATA) {
++            try (TransactionLegacy txn = 
TransactionLegacy.open(TransactionLegacy.CLOUD_DB)) {
++                if (task.getType() != Type.DATA) {
                      super.doTask(task);
                      return;
                  }
@@@ -646,7 -625,7 +643,7 @@@
                          }
                          final Request req = Request.parse(data);
                          final Command[] cmds = req.getCommands();
--                        final CancelCommand cancel = (CancelCommand)cmds[0];
++                        final CancelCommand cancel = (CancelCommand) cmds[0];
                          logD(data, "Cancel request received");
                          agent.cancel(cancel.getSequence());
                          final Long current = agent._currentSequence;
@@@ -670,10 -649,10 +667,9 @@@
                              // to deserialize this and send it through the 
agent attache.
                              final Request req = Request.parse(data);
                              agent.send(req, null);
--                            return;
                          } else {
                              if (agent instanceof Routable) {
--                                final Routable cluster = (Routable)agent;
++                                final Routable cluster = (Routable) agent;
                                  cluster.routeToAgent(data);
                              } else {
                                  agent.send(Request.parse(data));
@@@ -690,13 -669,13 +686,12 @@@
                      if (mgmtId != -1 && mgmtId != _nodeId) {
                          routeToPeer(Long.toString(mgmtId), data);
                          if (Request.requiresSequentialExecution(data)) {
--                            final AgentAttache attache = 
(AgentAttache)link.attachment();
++                            final AgentAttache attache = (AgentAttache) 
link.attachment();
                              if (attache != null) {
                                  attache.sendNext(Request.getSequence(data));
                              }
--                            logD(data, "No attache to process " + 
Request.parse(data).toString());
++                            logD(data, "No attache to process " + 
Request.parse(data));
                          }
--                        return;
                      } else {
                          if (Request.isRequest(data)) {
                              super.doTask(task);
@@@ -712,7 -691,7 +707,6 @@@
                                  logger.info("SeqA {}-{}: Response is not 
processed: {}", attache.getId(), response.getSequence(), response.toString());
                              }
                          }
--                        return;
                      }
                  }
              } catch (final ClassNotFoundException e) {
@@@ -723,8 -702,8 +717,6 @@@
                  final String message = 
String.format("UnsupportedVersionException occurred when executing tasks! Error 
'%s'", e.getMessage());
                  logger.error(message);
                  throw new TaskExecutionException(message, e);
--            } finally {
--                txn.close();
              }
          }
      }
@@@ -761,17 -740,12 +753,17 @@@
  
      @Override
      public boolean executeRebalanceRequest(final long agentId, final long 
currentOwnerId, final long futureOwnerId, final Event event) throws 
AgentUnavailableException, OperationTimedoutException {
 +        return executeRebalanceRequest(agentId, currentOwnerId, 
futureOwnerId, event, false);
 +    }
 +
 +    @Override
 +    public boolean executeRebalanceRequest(final long agentId, final long 
currentOwnerId, final long futureOwnerId, final Event event, boolean 
isConnectionTransfer) throws AgentUnavailableException, 
OperationTimedoutException {
          boolean result = false;
          if (event == Event.RequestAgentRebalance) {
--            return setToWaitForRebalance(agentId, currentOwnerId, 
futureOwnerId);
++            return setToWaitForRebalance(agentId);
          } else if (event == Event.StartAgentRebalance) {
              try {
 -                result = rebalanceHost(agentId, currentOwnerId, 
futureOwnerId);
 +                result = rebalanceHost(agentId, currentOwnerId, 
futureOwnerId, isConnectionTransfer);
              } catch (final Exception e) {
                  logger.warn("Unable to rebalance host id={} ({})",  agentId, 
findAttache(agentId), e);
              }
@@@ -823,7 -797,7 +815,7 @@@
          sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing);
          final List<HostVO> allManagedAgents = sc.list();
  
--        int avLoad = 0;
++        int avLoad;
  
          if (!allManagedAgents.isEmpty() && !allMS.isEmpty()) {
              avLoad = allManagedAgents.size() / allMS.size();
@@@ -841,7 -815,7 +833,7 @@@
          for (final ManagementServerHostVO node : allMS) {
              if (node.getMsid() != _nodeId) {
  
--                List<HostVO> hostsToRebalance = new ArrayList<HostVO>();
++                List<HostVO> hostsToRebalance = new ArrayList<>();
                  for (final AgentLoadBalancerPlanner lbPlanner : _lbPlanners) {
                      hostsToRebalance = lbPlanner.getHostsToRebalance(node, 
avLoad);
                      if (hostsToRebalance != null && 
!hostsToRebalance.isEmpty()) {
@@@ -867,7 -841,7 +859,7 @@@
                          HostTransferMapVO transfer = null;
                          try {
                              transfer = 
_hostTransferDao.startAgentTransfering(hostId, node.getMsid(), _nodeId);
--                            final Answer[] answer = 
sendRebalanceCommand(node.getMsid(), hostId, node.getMsid(), _nodeId, 
Event.RequestAgentRebalance);
++                            final Answer[] answer = 
sendRebalanceCommand(node.getMsid(), hostId, node.getMsid(), _nodeId);
                              if (answer == null) {
                                  logger.warn("Failed to get host {} from 
management server {}", host, node);
                                  result = false;
@@@ -894,12 -868,8 +886,12 @@@
          }
      }
  
--    private Answer[] sendRebalanceCommand(final long peer, final long 
agentId, final long currentOwnerId, final long futureOwnerId, final Event 
event) {
-         return sendRebalanceCommand(peer, agentId, currentOwnerId, 
futureOwnerId, event, false);
 -        final TransferAgentCommand transfer = new 
TransferAgentCommand(agentId, currentOwnerId, futureOwnerId, event);
++    private Answer[] sendRebalanceCommand(final long peer, final long 
agentId, final long currentOwnerId, final long futureOwnerId) {
++        return sendRebalanceCommand(peer, agentId, currentOwnerId, 
futureOwnerId, Event.RequestAgentRebalance, false);
 +    }
 +
 +    private Answer[] sendRebalanceCommand(final long peer, final long 
agentId, final long currentOwnerId, final long futureOwnerId, final Event 
event, final boolean isConnectionTransfer) {
 +        final TransferAgentCommand transfer = new 
TransferAgentCommand(agentId, currentOwnerId, futureOwnerId, event, 
isConnectionTransfer);
          final Commands commands = new Commands(Command.OnError.Stop);
          commands.addCommand(transfer);
  
@@@ -910,8 -880,8 +902,7 @@@
              final String peerName = Long.toString(peer);
              final String cmdStr = _gson.toJson(cmds);
              final String ansStr = _clusterMgr.execute(peerName, agentId, 
cmdStr, true);
--            final Answer[] answers = _gson.fromJson(ansStr, Answer[].class);
--            return answers;
++            return _gson.fromJson(ansStr, Answer[].class);
          } catch (final Exception e) {
              logger.warn("Caught exception while talking to {}",  
currentOwnerId, e);
              return null;
@@@ -960,7 -930,7 +951,7 @@@
                  try {
                      logger.trace("Clustered agent transfer scan check, 
management server id: {}",  _nodeId);
                      synchronized (_agentToTransferIds) {
--                        if (_agentToTransferIds.size() > 0) {
++                        if (!_agentToTransferIds.isEmpty()) {
                              logger.debug("Found {} agents to transfer", 
_agentToTransferIds.size());
                              // for (Long hostId : _agentToTransferIds) {
                              for (final Iterator<Long> iterator = 
_agentToTransferIds.iterator(); iterator.hasNext();) {
@@@ -984,7 -954,7 +975,7 @@@
                                  }
  
                                  if (transferMap.getInitialOwner() != _nodeId 
|| attache == null || attache.forForward()) {
--                                    logger.debug(String.format("Management 
server %d doesn't own host id=%d (%s) any more, skipping rebalance for the 
host", _nodeId, hostId, attache));
++                                    logger.debug("Management server {} 
doesn't own host id={} ({}) any more, skipping rebalance for the host", 
_nodeId, hostId, attache);
                                      iterator.remove();
                                      
_hostTransferDao.completeAgentTransfer(hostId);
                                      continue;
@@@ -1004,9 -974,9 +995,7 @@@
                                          _executor.execute(new 
RebalanceTask(hostId, transferMap.getInitialOwner(), 
transferMap.getFutureOwner()));
                                      } catch (final RejectedExecutionException 
ex) {
                                          logger.warn("Failed to submit 
rebalance task for host id={} ({}); postponing the execution", hostId, attache);
--                                        continue;
                                      }
--
                                  } else {
                                      logger.debug("Agent {} ({}) can't be 
transferred yet as its request queue size is {} and listener queue size is {}",
                                              hostId, attache, 
attache.getQueueSize(), attache.getNonRecurringListenersSize());
@@@ -1016,7 -986,7 +1005,6 @@@
                              logger.trace("Found no agents to be transferred 
by the management server {}",  _nodeId);
                          }
                      }
--
                  } catch (final Throwable e) {
                      logger.error("Problem with the clustered agent transfer 
scan check!", e);
                  }
@@@ -1024,7 -994,7 +1012,7 @@@
          };
      }
  
--    private boolean setToWaitForRebalance(final long hostId, final long 
currentOwnerId, final long futureOwnerId) {
++    private boolean setToWaitForRebalance(final long hostId) {
          logger.debug("Adding agent {} ({}) to the list of agents to 
transfer", hostId, findAttache(hostId));
          synchronized (_agentToTransferIds) {
              return _agentToTransferIds.add(hostId);
@@@ -1096,12 -1063,12 +1084,12 @@@
  
      protected void finishRebalance(final long hostId, final long 
futureOwnerId, final Event event) {
  
--        final boolean success = event == Event.RebalanceCompleted ? true : 
false;
++        final boolean success = event == Event.RebalanceCompleted;
  
          final AgentAttache attache = findAttache(hostId);
          logger.debug("Finishing rebalancing for the agent {} ({}) with event 
{}", hostId, attache, event);
  
--        if (attache == null || !(attache instanceof ClusteredAgentAttache)) {
++        if (!(attache instanceof ClusteredAgentAttache)) {
              logger.debug("Unable to find forward attache for the host id={} 
assuming that the agent disconnected already", hostId);
              _hostTransferDao.completeAgentTransfer(hostId);
              return;
@@@ -1197,9 -1164,9 +1185,9 @@@
      }
  
      protected class RebalanceTask extends ManagedContextRunnable {
--        Long hostId = null;
--        Long currentOwnerId = null;
--        Long futureOwnerId = null;
++        Long hostId;
++        Long currentOwnerId;
++        Long futureOwnerId;
  
          public RebalanceTask(final long hostId, final long currentOwnerId, 
final long futureOwnerId) {
              this.hostId = hostId;
@@@ -1268,7 -1235,7 +1256,7 @@@
                  final ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0];
  
                  logger.debug("Intercepting command for agent change: agent {} 
event: {}", cmd.getAgentId(), cmd.getEvent());
--                boolean result = false;
++                boolean result;
                  try {
                      result = executeAgentUserRequest(cmd.getAgentId(), 
cmd.getEvent());
                      logger.debug("Result is {}", result);
@@@ -1284,10 -1251,10 +1272,10 @@@
              } else if (cmds.length == 1 && cmds[0] instanceof 
TransferAgentCommand) {
                  final TransferAgentCommand cmd = 
(TransferAgentCommand)cmds[0];
  
 -                logger.debug("Intercepting command for agent rebalancing: 
agent {} event: {}", cmd.getAgentId(), cmd.getEvent());
 -                boolean result = false;
 +                logger.debug("Intercepting command for agent rebalancing: 
agent: {}, event: {}, connection transfer: {}", cmd.getAgentId(), 
cmd.getEvent(), cmd.isConnectionTransfer());
-                 boolean result = false;
++                boolean result;
                  try {
 -                    result = rebalanceAgent(cmd.getAgentId(), cmd.getEvent(), 
cmd.getCurrentOwner(), cmd.getFutureOwner());
 +                    result = rebalanceAgent(cmd.getAgentId(), cmd.getEvent(), 
cmd.getCurrentOwner(), cmd.getFutureOwner(), cmd.isConnectionTransfer());
                      logger.debug("Result is {}", result);
  
                  } catch (final AgentUnavailableException e) {
@@@ -1305,7 -1272,7 +1293,7 @@@
  
                  logger.debug("Intercepting command to propagate event {} for 
host {} ({})", () -> cmd.getEvent().name(), cmd::getHostId, () -> 
_hostDao.findById(cmd.getHostId()));
  
--                boolean result = false;
++                boolean result;
                  try {
                      result = _resourceMgr.executeUserRequest(cmd.getHostId(), 
cmd.getEvent());
                      logger.debug("Result is {}", result);
@@@ -1400,133 -1349,6 +1388,127 @@@
          }
      }
  
 +    @Override
 +    public boolean transferDirectAgentsFromMS(String fromMsUuid, long 
fromMsId, long timeoutDurationInMs) {
 +        if (timeoutDurationInMs <= 0) {
-             logger.debug(String.format("Not transferring direct agents from 
management server node %d (id: %s) to other nodes, invalid timeout duration", 
fromMsId, fromMsUuid));
++            logger.debug("Not transferring direct agents from management 
server node {} (id: {}) to other nodes, invalid timeout duration", fromMsId, 
fromMsUuid);
 +            return false;
 +        }
 +
 +        long transferStartTime = System.currentTimeMillis();
 +        if (CollectionUtils.isEmpty(getDirectAgentHosts(fromMsId))) {
-             logger.info(String.format("No direct agent hosts available on 
management server node %d (id: %s), to transfer", fromMsId, fromMsUuid));
++            logger.info("No direct agent hosts available on management server 
node {} (id: {}), to transfer", fromMsId, fromMsUuid);
 +            return true;
 +        }
 +
 +        List<ManagementServerHostVO> msHosts = 
getUpMsHostsExcludingMs(fromMsId);
 +        if (msHosts.isEmpty()) {
-             logger.warn(String.format("No management server nodes available 
to transfer agents from management server node %d (id: %s)", fromMsId, 
fromMsUuid));
++            logger.warn("No management server nodes available to transfer 
agents from management server node {} (id: {})", fromMsId, fromMsUuid);
 +            return false;
 +        }
 +
-         logger.debug(String.format("Transferring direct agents from 
management server node %d (id: %s) to other nodes", fromMsId, fromMsUuid));
++        logger.debug("Transferring direct agents from management server node 
{} (id: {}) to other nodes", fromMsId, fromMsUuid);
 +        int agentTransferFailedCount = 0;
 +        List<DataCenterVO> dataCenterList = dcDao.listAll();
 +        for (DataCenterVO dc : dataCenterList) {
 +            List<HostVO> directAgentHostsInDc = 
getDirectAgentHostsInDc(fromMsId, dc.getId());
 +            if (CollectionUtils.isEmpty(directAgentHostsInDc)) {
 +                continue;
 +            }
-             logger.debug(String.format("Transferring %d direct agents from 
management server node %d (id: %s) of zone %s", directAgentHostsInDc.size(), 
fromMsId, fromMsUuid, dc.toString()));
++            logger.debug("Transferring {} direct agents from management 
server node {} (id: {}) of zone {}", directAgentHostsInDc.size(), fromMsId, 
fromMsUuid, dc);
 +            for (HostVO host : directAgentHostsInDc) {
 +                long transferElapsedTimeInMs = System.currentTimeMillis() - 
transferStartTime;
 +                if (transferElapsedTimeInMs >= timeoutDurationInMs) {
-                     logger.debug(String.format("Stop transferring remaining 
direct agents from management server node %d (id: %s), timed out", fromMsId, 
fromMsUuid));
++                    logger.debug("Stop transferring remaining direct agents 
from management server node {} (id: {}), timed out", fromMsId, fromMsUuid);
 +                    return false;
 +                }
 +
 +                try {
 +                    if (_mshostCounter >= msHosts.size()) {
 +                        _mshostCounter = 0;
 +                    }
 +                    ManagementServerHostVO msHost = 
msHosts.get(_mshostCounter % msHosts.size());
 +                    _mshostCounter++;
 +
 +                    _hostTransferDao.startAgentTransfering(host.getId(), 
fromMsId, msHost.getMsid());
 +                    if (!rebalanceAgent(host.getId(), 
Event.StartAgentRebalance, fromMsId, msHost.getMsid(), true)) {
 +                        agentTransferFailedCount++;
 +                    } else {
 +                        updateLastManagementServer(host.getId(), fromMsId);
 +                    }
 +                } catch (Exception e) {
-                     logger.warn(String.format("Failed to transfer direct 
agent of the host %s from management server node %d (id: %s), due to %s", host, 
fromMsId, fromMsUuid, e.getMessage()));
++                    logger.warn("Failed to transfer direct agent of the host 
{} from management server node {} (id: {}), due to {}", host, fromMsId, 
fromMsUuid, e.getMessage());
 +                }
 +            }
 +        }
 +
 +        return (agentTransferFailedCount == 0);
 +    }
 +
 +    private List<HostVO> getDirectAgentHosts(long msId) {
 +        List<HostVO> directAgentHosts = new ArrayList<>();
 +        List<HostVO> hosts = _hostDao.listHostsByMs(msId);
 +        for (HostVO host : hosts) {
 +            AgentAttache agent = findAttache(host.getId());
-             if (agent != null && agent instanceof DirectAgentAttache) {
++            if (agent instanceof DirectAgentAttache) {
 +                directAgentHosts.add(host);
 +            }
 +        }
 +
 +        return directAgentHosts;
 +    }
 +
 +    private List<HostVO> getDirectAgentHostsInDc(long msId, long dcId) {
 +        List<HostVO> directAgentHosts = new ArrayList<>();
 +        List<HostVO> hosts = _hostDao.listHostsByMsAndDc(msId, dcId);
 +        for (HostVO host : hosts) {
 +            AgentAttache agent = findAttache(host.getId());
-             if (agent != null && agent instanceof DirectAgentAttache) {
++            if (agent instanceof DirectAgentAttache) {
 +                directAgentHosts.add(host);
 +            }
 +        }
 +
 +        return directAgentHosts;
 +    }
 +
 +    private List<ManagementServerHostVO> getUpMsHostsExcludingMs(long 
avoidMsId) {
 +        final List<ManagementServerHostVO> msHosts = 
_mshostDao.listBy(ManagementServerHost.State.Up);
-         Iterator<ManagementServerHostVO> iterator = msHosts.iterator();
-         while (iterator.hasNext()) {
-             ManagementServerHostVO ms = iterator.next();
-             if (ms.getMsid() == avoidMsId || 
_mshostPeerDao.findByPeerMsAndState(ms.getId(), ManagementServerHost.State.Up) 
== null) {
-                 iterator.remove();
-             }
-         }
++        msHosts.removeIf(ms -> ms.getMsid() == avoidMsId || 
_mshostPeerDao.findByPeerMsAndState(ms.getId(), ManagementServerHost.State.Up) 
== null);
 +
 +        return msHosts;
 +    }
 +
 +    private void updateLastManagementServer(long hostId, long msId) {
 +        HostVO hostVO = _hostDao.findById(hostId);
 +        if (hostVO != null) {
 +            hostVO.setLastManagementServerId(msId);
 +            _hostDao.update(hostId, hostVO);
 +        }
 +    }
 +
 +    @Override
 +    public void onManagementServerMaintenance() {
 +        logger.debug("Management server maintenance enabled");
 +        s_transferExecutor.shutdownNow();
 +        cleanupTransferMap(_nodeId);
 +        _agentLbHappened = false;
 +        super.onManagementServerMaintenance();
 +    }
 +
 +    @Override
 +    public void onManagementServerCancelMaintenance() {
 +        logger.debug("Management server maintenance disabled");
 +        super.onManagementServerCancelMaintenance();
 +        if (isAgentRebalanceEnabled()) {
 +            cleanupTransferMap(_nodeId);
 +            if (s_transferExecutor.isShutdown()) {
 +                s_transferExecutor = Executors.newScheduledThreadPool(2, new 
NamedThreadFactory("Cluster-AgentRebalancingExecutor"));
 +                
s_transferExecutor.scheduleAtFixedRate(getAgentRebalanceScanTask(), 60000, 
60000, TimeUnit.MILLISECONDS);
 +                s_transferExecutor.scheduleAtFixedRate(getTransferScanTask(), 
60000, ClusteredAgentRebalanceService.DEFAULT_TRANSFER_CHECK_INTERVAL, 
TimeUnit.MILLISECONDS);
 +            }
 +        }
 +    }
 +
      public boolean executeAgentUserRequest(final long agentId, final Event 
event) throws AgentUnavailableException {
          return executeUserRequest(agentId, event);
      }
@@@ -1593,8 -1411,8 +1575,7 @@@
      public ConfigKey<?>[] getConfigKeys() {
          final ConfigKey<?>[] keys = super.getConfigKeys();
  
--        final List<ConfigKey<?>> keysLst = new ArrayList<ConfigKey<?>>();
--        keysLst.addAll(Arrays.asList(keys));
++        final List<ConfigKey<?>> keysLst = new 
ArrayList<>(Arrays.asList(keys));
          keysLst.add(EnableLB);
          keysLst.add(ConnectedAgentThreshold);
          keysLst.add(LoadSize);
diff --cc engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java
index 4e1be3ae0fb,94a16497e87..54146e55049
--- a/engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java
+++ b/engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java
@@@ -124,9 -126,8 +126,10 @@@ public class HostDaoImpl extends Generi
      protected SearchBuilder<HostVO> UnmanagedApplianceSearch;
      protected SearchBuilder<HostVO> MaintenanceCountSearch;
      protected SearchBuilder<HostVO> HostTypeCountSearch;
 +    protected SearchBuilder<HostVO> ResponsibleMsSearch;
 +    protected SearchBuilder<HostVO> ResponsibleMsDcSearch;
 +    protected GenericSearchBuilder<HostVO, String> ResponsibleMsIdSearch;
+     protected SearchBuilder<HostVO> HostTypeClusterCountSearch;
 -    protected SearchBuilder<HostVO> ResponsibleMsCountSearch;
      protected SearchBuilder<HostVO> HostTypeZoneCountSearch;
      protected SearchBuilder<HostVO> ClusterStatusSearch;
      protected SearchBuilder<HostVO> TypeNameZoneSearch;
@@@ -189,22 -189,21 +191,31 @@@
  
          HostTypeCountSearch = createSearchBuilder();
          HostTypeCountSearch.and("type", 
HostTypeCountSearch.entity().getType(), SearchCriteria.Op.EQ);
+         HostTypeCountSearch.and("zoneId", 
HostTypeCountSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ);
+         HostTypeCountSearch.and("resourceState", 
HostTypeCountSearch.entity().getResourceState(), SearchCriteria.Op.EQ);
          HostTypeCountSearch.done();
  
 -        ResponsibleMsCountSearch = createSearchBuilder();
 -        ResponsibleMsCountSearch.and("managementServerId", 
ResponsibleMsCountSearch.entity().getManagementServerId(), 
SearchCriteria.Op.EQ);
 -        ResponsibleMsCountSearch.done();
 +        ResponsibleMsSearch = createSearchBuilder();
 +        ResponsibleMsSearch.and("managementServerId", 
ResponsibleMsSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ);
 +        ResponsibleMsSearch.done();
 +
 +        ResponsibleMsDcSearch = createSearchBuilder();
 +        ResponsibleMsDcSearch.and("managementServerId", 
ResponsibleMsDcSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ);
 +        ResponsibleMsDcSearch.and("dcId", 
ResponsibleMsDcSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ);
 +        ResponsibleMsDcSearch.done();
 +
 +        ResponsibleMsIdSearch = createSearchBuilder(String.class);
 +        
ResponsibleMsIdSearch.selectFields(ResponsibleMsIdSearch.entity().getUuid());
 +        ResponsibleMsIdSearch.and("managementServerId", 
ResponsibleMsIdSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ);
 +        ResponsibleMsIdSearch.done();
  
+         HostTypeClusterCountSearch = createSearchBuilder();
+         HostTypeClusterCountSearch.and("cluster", 
HostTypeClusterCountSearch.entity().getClusterId(), SearchCriteria.Op.EQ);
+         HostTypeClusterCountSearch.and("type", 
HostTypeClusterCountSearch.entity().getType(), SearchCriteria.Op.EQ);
+         HostTypeClusterCountSearch.and("status", 
HostTypeClusterCountSearch.entity().getStatus(), SearchCriteria.Op.IN);
+         HostTypeClusterCountSearch.and("removed", 
HostTypeClusterCountSearch.entity().getRemoved(), SearchCriteria.Op.NULL);
+         HostTypeClusterCountSearch.done();
+ 
          HostTypeZoneCountSearch = createSearchBuilder();
          HostTypeZoneCountSearch.and("type", 
HostTypeZoneCountSearch.entity().getType(), SearchCriteria.Op.EQ);
          HostTypeZoneCountSearch.and("dc", 
HostTypeZoneCountSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ);
@@@ -1457,16 -1535,9 +1562,16 @@@
          return getCount(sc);
      }
  
 +    @Override
 +    public List<String> listByMs(long msId) {
 +        SearchCriteria<String> sc = ResponsibleMsIdSearch.create();
 +        sc.addAnd("managementServerId", SearchCriteria.Op.EQ, msId);
 +        return customSearch(sc, null);
 +    }
 +
      @Override
      public List<String> listOrderedHostsHypervisorVersionsInDatacenter(long 
datacenterId, HypervisorType hypervisorType) {
-         PreparedStatement pstmt = null;
+         PreparedStatement pstmt;
          List<String> result = new ArrayList<>();
          try {
              TransactionLegacy txn = TransactionLegacy.currentTxn();
diff --cc 
server/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImpl.java
index 027a0530383,84c3081bfc1..1f0f439d819
--- 
a/server/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImpl.java
+++ 
b/server/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImpl.java
@@@ -23,7 -22,6 +22,7 @@@ import java.util.Comparator
  import java.util.HashMap;
  import java.util.List;
  import java.util.Map;
++import java.util.EnumSet;
  
  import javax.inject.Inject;
  import javax.naming.ConfigurationException;
@@@ -37,22 -36,16 +37,22 @@@ import org.apache.commons.lang3.StringU
  
  import com.cloud.agent.AgentManager;
  import com.cloud.agent.api.Answer;
 +import com.cloud.agent.api.MigrateAgentConnectionCommand;
 +import com.cloud.cluster.ManagementServerHostVO;
 +import com.cloud.cluster.dao.ManagementServerHostDao;
  import com.cloud.dc.DataCenterVO;
+ import com.cloud.dc.dao.ClusterDao;
  import com.cloud.dc.dao.DataCenterDao;
  import com.cloud.host.Host;
 +import com.cloud.host.HostVO;
  import com.cloud.host.dao.HostDao;
  import com.cloud.hypervisor.Hypervisor;
  import com.cloud.resource.ResourceState;
  import com.cloud.utils.component.ComponentLifecycleBase;
  import com.cloud.utils.exception.CloudRuntimeException;
  
 +import org.apache.commons.collections.CollectionUtils;
- import org.apache.commons.lang3.StringUtils;
 +
  public class IndirectAgentLBServiceImpl extends ComponentLifecycleBase 
implements IndirectAgentLB, Configurable {
  
      public static final ConfigKey<String> IndirectAgentLBAlgorithm = new 
ConfigKey<>(String.class,
@@@ -70,65 -67,52 +74,74 @@@
      @Inject
      private HostDao hostDao;
      @Inject
 +    private DataCenterDao dcDao;
 +    @Inject
 +    private ManagementServerHostDao mshostDao;
 +    @Inject
      private AgentManager agentManager;
  
+     private static final List<ResourceState> agentValidResourceStates = 
List.of(
+             ResourceState.Enabled, ResourceState.Maintenance, 
ResourceState.Disabled,
+             ResourceState.ErrorInMaintenance, 
ResourceState.PrepareForMaintenance);
+     private static final List<Host.Type> agentValidHostTypes = 
List.of(Host.Type.Routing, Host.Type.ConsoleProxy,
+             Host.Type.SecondaryStorage, Host.Type.SecondaryStorageVM);
+     private static final List<Hypervisor.HypervisorType> 
agentValidHypervisorTypes = List.of(
+             Hypervisor.HypervisorType.KVM, Hypervisor.HypervisorType.LXC);
+ 
      //////////////////////////////////////////////////////
      /////////////// Agent MSLB Methods ///////////////////
      //////////////////////////////////////////////////////
  
 +    @Override
 +    public List<String> getManagementServerList() {
 +        final String msServerAddresses = 
ApiServiceConfiguration.ManagementServerAddresses.value();
 +        if (StringUtils.isEmpty(msServerAddresses)) {
 +            throw new CloudRuntimeException(String.format("No management 
server addresses are defined in '%s' setting",
 +                    ApiServiceConfiguration.ManagementServerAddresses.key()));
 +        }
 +
-         List<String> msList = new 
ArrayList<>(Arrays.asList(msServerAddresses.replace(" ", "").split(",")));
-         return msList;
++        return new ArrayList<>(Arrays.asList(msServerAddresses.replace(" ", 
"").split(",")));
 +    }
 +
      @Override
      public List<String> getManagementServerList(final Long hostId, final Long 
dcId, final List<Long> orderedHostIdList) {
 +        return getManagementServerList(hostId, dcId, orderedHostIdList, null);
 +    }
 +
 +    @Override
 +    public List<String> getManagementServerList(final Long hostId, final Long 
dcId, final List<Long> orderedHostIdList, String lbAlgorithm) {
          final String msServerAddresses = 
ApiServiceConfiguration.ManagementServerAddresses.value();
          if (StringUtils.isEmpty(msServerAddresses)) {
              throw new CloudRuntimeException(String.format("No management 
server addresses are defined in '%s' setting",
                      ApiServiceConfiguration.ManagementServerAddresses.key()));
          }
 +
+         final List<String> msList = Arrays.asList(msServerAddresses.replace(" 
", "").split(","));
+         if (msList.size() == 1) {
+             return msList;
+         }
+ 
 -        final org.apache.cloudstack.agent.lb.IndirectAgentLBAlgorithm 
algorithm = getAgentMSLBAlgorithm();
++        final org.apache.cloudstack.agent.lb.IndirectAgentLBAlgorithm 
algorithm = getAgentMSLBAlgorithm(lbAlgorithm);
          List<Long> hostIdList = orderedHostIdList;
          if (hostIdList == null) {
-             hostIdList = getOrderedHostIdList(dcId);
+             hostIdList = algorithm.isHostListNeeded() ? 
getOrderedHostIdList(dcId) : new ArrayList<>();
          }
  
          // just in case we have a host in creating state make sure it is in 
the list:
          if (null != hostId && ! hostIdList.contains(hostId)) {
-             if (logger.isTraceEnabled()) {
-                 logger.trace("adding requested host to host list as it does 
not seem to be there; " + hostId);
-             }
+             logger.trace("adding requested host to host list as it does not 
seem to be there; {}", hostId);
              hostIdList.add(hostId);
          }
 +
-         final org.apache.cloudstack.agent.lb.IndirectAgentLBAlgorithm 
algorithm = getAgentMSLBAlgorithm(lbAlgorithm);
-         final List<String> msList = Arrays.asList(msServerAddresses.replace(" 
", "").split(","));
          return algorithm.sort(msList, hostIdList, hostId);
      }
  
      @Override
      public boolean compareManagementServerList(final Long hostId, final Long 
dcId, final List<String> receivedMSHosts, final String lbAlgorithm) {
--        if (receivedMSHosts == null || receivedMSHosts.size() < 1) {
++        if (receivedMSHosts == null || receivedMSHosts.isEmpty()) {
              return false;
          }
--        if (getLBAlgorithmName() != lbAlgorithm) {
++        if (!getLBAlgorithmName().equals(lbAlgorithm)) {
              return false;
          }
          final List<String> expectedMSList = getManagementServerList(hostId, 
dcId, null);
@@@ -162,114 -136,18 +165,119 @@@
          return hostIdList;
      }
  
 +    private List<Host> getAllAgentBasedHosts() {
 +        final List<HostVO> allHosts = hostDao.listAll();
 +        if (allHosts == null) {
 +            return new ArrayList<>();
 +        }
 +        final List <Host> agentBasedHosts = new ArrayList<>();
 +        for (final Host host : allHosts) {
 +            conditionallyAddHost(agentBasedHosts, host);
 +        }
 +        return agentBasedHosts;
 +    }
 +
 +    private List<Host> getAllAgentBasedHosts(long msId) {
 +        final List<HostVO> allHosts = hostDao.listHostsByMs(msId);
 +        if (allHosts == null) {
 +            return new ArrayList<>();
 +        }
 +        final List <Host> agentBasedHosts = new ArrayList<>();
 +        for (final Host host : allHosts) {
 +            conditionallyAddHost(agentBasedHosts, host);
 +        }
 +        return agentBasedHosts;
 +    }
 +
 +    private List<Host> getAllAgentBasedHostsInDc(long msId, long dcId) {
 +        final List<HostVO> allHosts = hostDao.listHostsByMsAndDc(msId, dcId);
 +        if (allHosts == null) {
 +            return new ArrayList<>();
 +        }
 +        final List <Host> agentBasedHosts = new ArrayList<>();
 +        for (final Host host : allHosts) {
 +            conditionallyAddHost(agentBasedHosts, host);
 +        }
 +        return agentBasedHosts;
 +    }
 +
 +    private void conditionallyAddHost(List<Host> agentBasedHosts, Host host) {
 +        if (host == null) {
 +            if (logger.isTraceEnabled()) {
 +                logger.trace("trying to add no host to a list");
 +            }
 +            return;
 +        }
 +
 +        EnumSet<ResourceState> allowedStates = EnumSet.of(
 +                ResourceState.Enabled,
 +                ResourceState.Maintenance,
 +                ResourceState.Disabled,
 +                ResourceState.ErrorInMaintenance,
 +                ResourceState.PrepareForMaintenance);
 +        // so the remaining EnumSet<ResourceState> disallowedStates = 
EnumSet.complementOf(allowedStates)
 +        // would be {ResourceState.Creating, ResourceState.Error};
 +        if (!allowedStates.contains(host.getResourceState())) {
 +            if (logger.isTraceEnabled()) {
 +                logger.trace("host ({}) is in '{}' state, not adding to the 
host list", host, host.getResourceState());
 +            }
 +            return;
 +        }
 +
 +        if (host.getType() != Host.Type.Routing
 +                && host.getType() != Host.Type.ConsoleProxy
 +                && host.getType() != Host.Type.SecondaryStorage
 +                && host.getType() != Host.Type.SecondaryStorageVM) {
 +            if (logger.isTraceEnabled()) {
 +                logger.trace(String.format("host (%s) is of wrong type, not 
adding to the host list, type = %s", host, host.getType()));
 +            }
 +            return;
 +        }
 +
 +        if (host.getHypervisorType() != null
-                 && ! (host.getHypervisorType() == 
Hypervisor.HypervisorType.KVM || host.getHypervisorType() == 
Hypervisor.HypervisorType.LXC)) {
++                && !(host.getHypervisorType() == 
Hypervisor.HypervisorType.KVM || host.getHypervisorType() == 
Hypervisor.HypervisorType.LXC)) {
 +
 +            if (logger.isTraceEnabled()) {
 +                logger.trace(String.format("hypervisor is not the right type, 
not adding to the host list, (host: %s, hypervisortype: %s)", host, 
host.getHypervisorType()));
 +            }
 +            return;
 +        }
 +
 +        agentBasedHosts.add(host);
 +    }
 +
+     private List<Long> getAllAgentBasedHostsFromDB(final Long zoneId, final 
Long clusterId) {
+         return 
hostDao.findHostIdsByZoneClusterResourceStateTypeAndHypervisorType(zoneId, 
clusterId,
+                 agentValidResourceStates, agentValidHostTypes, 
agentValidHypervisorTypes);
+     }
+ 
 +    @Override
 +    public boolean haveAgentBasedHosts(long msId) {
 +        return CollectionUtils.isNotEmpty(getAllAgentBasedHosts(msId));
 +    }
 +
      private org.apache.cloudstack.agent.lb.IndirectAgentLBAlgorithm 
getAgentMSLBAlgorithm() {
 -        final String algorithm = getLBAlgorithmName();
 -        if (algorithmMap.containsKey(algorithm)) {
 -            return algorithmMap.get(algorithm);
 +        return getAgentMSLBAlgorithm(null);
 +    }
 +
 +    private org.apache.cloudstack.agent.lb.IndirectAgentLBAlgorithm 
getAgentMSLBAlgorithm(String lbAlgorithm) {
 +        boolean algorithmNameFromConfig = false;
 +        if (StringUtils.isEmpty(lbAlgorithm)) {
 +            lbAlgorithm = getLBAlgorithmName();
 +            algorithmNameFromConfig = true;
 +        }
 +        if (algorithmMap.containsKey(lbAlgorithm)) {
 +            return algorithmMap.get(lbAlgorithm);
 +        }
 +        throw new CloudRuntimeException(String.format("Algorithm %s%s not 
found, valid values are: %s",
 +                lbAlgorithm, algorithmNameFromConfig? " configured for '" + 
IndirectAgentLBAlgorithm.key() + "'" : "", algorithmMap.keySet()));
 +    }
 +
 +    @Override
 +    public void checkLBAlgorithmName(String lbAlgorithm) {
 +        if (!algorithmMap.containsKey(lbAlgorithm)) {
 +            throw new CloudRuntimeException(String.format("Invalid algorithm 
%s, valid values are: %s", lbAlgorithm, algorithmMap.keySet()));
          }
 -        throw new CloudRuntimeException(String.format("Algorithm configured 
for '%s' not found, valid values are: %s",
 -                IndirectAgentLBAlgorithm.key(), algorithmMap.keySet()));
      }
  
      ////////////////////////////////////////////////////////////
@@@ -296,73 -184,6 +314,73 @@@
          }
      }
  
 +    @Override
 +    public boolean migrateAgents(String fromMsUuid, long fromMsId, String 
lbAlgorithm, long timeoutDurationInMs) {
 +        if (timeoutDurationInMs <= 0) {
 +            logger.debug(String.format("Not migrating indirect agents from 
management server node %d (id: %s) to other nodes, invalid timeout duration", 
fromMsId, fromMsUuid));
 +            return false;
 +        }
 +
 +        logger.debug(String.format("Migrating indirect agents from management 
server node %d (id: %s) to other nodes", fromMsId, fromMsUuid));
 +        long migrationStartTime = System.currentTimeMillis();
 +        if (!haveAgentBasedHosts(fromMsId)) {
 +            logger.info(String.format("No indirect agents available on 
management server node %d (id: %s), to migrate", fromMsId, fromMsUuid));
 +            return true;
 +        }
 +
 +        boolean lbAlgorithmChanged = false;
 +        if (StringUtils.isNotBlank(lbAlgorithm) && 
!lbAlgorithm.equalsIgnoreCase(getLBAlgorithmName())) {
 +            logger.debug(String.format("Indirect agent lb algorithm changed 
to %s", lbAlgorithm));
 +            lbAlgorithmChanged = true;
 +        }
 +
 +        final List<String> avoidMsList = mshostDao.listNonUpStateMsIPs();
 +        ManagementServerHostVO ms = mshostDao.findByMsid(fromMsId);
 +        if (ms != null && !avoidMsList.contains(ms.getServiceIP())) {
 +            avoidMsList.add(ms.getServiceIP());
 +        }
 +
 +        List<DataCenterVO> dataCenterList = dcDao.listAll();
 +        for (DataCenterVO dc : dataCenterList) {
 +            Long dcId = dc.getId();
 +            List<Long> orderedHostIdList = getOrderedHostIdList(dcId);
 +            List<Host> agentBasedHostsOfMsInDc = 
getAllAgentBasedHostsInDc(fromMsId, dcId);
 +            if (CollectionUtils.isEmpty(agentBasedHostsOfMsInDc)) {
 +                continue;
 +            }
-             logger.debug(String.format("Migrating %d indirect agents from 
management server node %d (id: %s) of zone %s", agentBasedHostsOfMsInDc.size(), 
fromMsId, fromMsUuid, dc.toString()));
++            logger.debug(String.format("Migrating %d indirect agents from 
management server node %d (id: %s) of zone %s", agentBasedHostsOfMsInDc.size(), 
fromMsId, fromMsUuid, dc));
 +            for (final Host host : agentBasedHostsOfMsInDc) {
 +                long migrationElapsedTimeInMs = System.currentTimeMillis() - 
migrationStartTime;
 +                if (migrationElapsedTimeInMs >= timeoutDurationInMs) {
 +                    logger.debug(String.format("Stop migrating remaining 
indirect agents from management server node %d (id: %s), timed out", fromMsId, 
fromMsUuid));
 +                    return false;
 +                }
 +
 +                List<String> msList = null;
 +                Long lbCheckInterval = 0L;
 +                if (lbAlgorithmChanged) {
 +                    // send new MS list when there is change in lb algorithm
 +                    msList = getManagementServerList(host.getId(), dcId, 
orderedHostIdList, lbAlgorithm);
 +                    lbCheckInterval = 
getLBPreferredHostCheckInterval(host.getClusterId());
 +                }
 +
 +                final MigrateAgentConnectionCommand cmd = new 
MigrateAgentConnectionCommand(msList, avoidMsList, lbAlgorithm, 
lbCheckInterval);
 +                agentManager.easySend(host.getId(), cmd); //answer not 
received as the agent disconnects and reconnects to other ms
 +                updateLastManagementServer(host.getId(), fromMsId);
 +            }
 +        }
 +
 +        return true;
 +    }
 +
 +    private void updateLastManagementServer(long hostId, long msId) {
 +        HostVO hostVO = hostDao.findById(hostId);
 +        if (hostVO != null) {
 +            hostVO.setLastManagementServerId(msId);
 +            hostDao.update(hostId, hostVO);
 +        }
 +    }
 +
      private void configureAlgorithmMap() {
          final List<org.apache.cloudstack.agent.lb.IndirectAgentLBAlgorithm> 
algorithms = new ArrayList<>();
          algorithms.add(new IndirectAgentLBStaticAlgorithm());
diff --cc utils/src/main/java/com/cloud/utils/nio/NioConnection.java
index 2a468fa3c85,2a157b9c001..98fa69716cd
--- a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java
+++ b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java
@@@ -32,8 -32,8 +32,9 @@@ import java.nio.channels.SelectionKey
  import java.nio.channels.Selector;
  import java.nio.channels.ServerSocketChannel;
  import java.nio.channels.SocketChannel;
+ import java.security.GeneralSecurityException;
  import java.util.ArrayList;
 +import java.util.HashSet;
  import java.util.Iterator;
  import java.util.List;
  import java.util.Set;
@@@ -76,17 -78,27 +80,29 @@@ public abstract class NioConnection imp
      protected ExecutorService _executor;
      protected ExecutorService _sslHandshakeExecutor;
      protected CAService caService;
 +    protected Set<SocketChannel> socketChannels = new HashSet<>();
+     protected Integer sslHandshakeTimeout = null;
+     private final int factoryMaxNewConnectionsCount;
  
      public NioConnection(final String name, final int port, final int 
workers, final HandlerFactory factory) {
          _name = name;
          _isRunning = false;
          _selector = null;
          _port = port;
 +        _workers = workers;
          _factory = factory;
-         _executor = new ThreadPoolExecutor(workers, 5 * workers, 1, 
TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(name 
+ "-Handler"));
-         _sslHandshakeExecutor = Executors.newCachedThreadPool(new 
NamedThreadFactory(name + "-SSLHandshakeHandler"));
+         this.factoryMaxNewConnectionsCount = 
factory.getMaxConcurrentNewConnectionsCount();
+         _executor = new ThreadPoolExecutor(workers, 5 * workers, 1, 
TimeUnit.DAYS,
+                 new LinkedBlockingQueue<>(5 * workers), new 
NamedThreadFactory(name + "-Handler"),
+                 new ThreadPoolExecutor.AbortPolicy());
+         String sslHandshakeHandlerName = name + "-SSLHandshakeHandler";
+         if (factoryMaxNewConnectionsCount > 0) {
+             _sslHandshakeExecutor = new ThreadPoolExecutor(0, 
this.factoryMaxNewConnectionsCount, 30,
+                     TimeUnit.MINUTES, new SynchronousQueue<>(), new 
NamedThreadFactory(sslHandshakeHandlerName),
+                     new ThreadPoolExecutor.AbortPolicy());
+         } else {
+             _sslHandshakeExecutor = Executors.newCachedThreadPool(new 
NamedThreadFactory(sslHandshakeHandlerName));
+         }
      }
  
      public void setCAService(final CAService caService) {
@@@ -94,7 -106,7 +110,7 @@@
      }
  
      public void start() throws NioConnectionException {
--        _todos = new ArrayList<ChangeRequest>();
++        _todos = new ArrayList<>();
  
          try {
              init();
@@@ -110,9 -122,6 +126,9 @@@
          }
          _isStartup = true;
  
 +        if (_executor.isShutdown()) {
-             _executor = new ThreadPoolExecutor(_workers, 5 * _workers, 1, 
TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new 
NamedThreadFactory(_name + "-Handler"));
++            _executor = new ThreadPoolExecutor(_workers, 5 * _workers, 1, 
TimeUnit.DAYS, new LinkedBlockingQueue<>(), new NamedThreadFactory(_name + 
"-Handler"));
 +        }
          _threadExecutor = Executors.newSingleThreadExecutor(new 
NamedThreadFactory(this._name + "-NioConnectionHandler"));
          _isRunning = true;
          _futureTask = _threadExecutor.submit(this);
@@@ -145,18 -155,18 +162,14 @@@
                  final Set<SelectionKey> readyKeys = _selector.selectedKeys();
                  final Iterator<SelectionKey> i = readyKeys.iterator();
  
--                if (logger.isTraceEnabled()) {
--                    logger.trace("Keys Processing: " + readyKeys.size());
--                }
++                logger.trace("Keys Processing: {}", readyKeys.size());
                  // Walk through the ready keys collection.
                  while (i.hasNext()) {
                      final SelectionKey sk = i.next();
                      i.remove();
  
                      if (!sk.isValid()) {
--                        if (logger.isTraceEnabled()) {
--                            logger.trace("Selection Key is invalid: " + 
sk.toString());
--                        }
++                        logger.trace("Selection Key is invalid: {}", sk);
                          final Link link = (Link)sk.attachment();
                          if (link != null) {
                              link.terminated();
@@@ -205,53 -231,44 +234,42 @@@
          final Socket socket = socketChannel.socket();
          socket.setKeepAlive(true);
  
--        if (logger.isTraceEnabled()) {
--            logger.trace("Connection accepted for " + socket);
--        }
++        logger.trace("Connection accepted for {}", socket);
  
-         final SSLEngine sslEngine;
          try {
-             sslEngine = Link.initServerSSLEngine(caService, 
socketChannel.getRemoteAddress().toString());
-             sslEngine.setUseClientMode(false);
-             
sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
              final NioConnection nioConnection = this;
-             _sslHandshakeExecutor.submit(new Runnable() {
-                 @Override
-                 public void run() {
-                     _selector.wakeup();
-                     try {
-                         sslEngine.beginHandshake();
-                         if (!Link.doHandshake(socketChannel, sslEngine)) {
-                             throw new IOException("SSL handshake timed out 
with " + socketChannel.getRemoteAddress());
-                         }
-                         if (logger.isTraceEnabled()) {
-                             logger.trace("SSL: Handshake done");
-                         }
-                         final InetSocketAddress saddr = 
(InetSocketAddress)socket.getRemoteSocketAddress();
-                         final Link link = new Link(saddr, nioConnection);
-                         link.setSSLEngine(sslEngine);
-                         link.setKey(socketChannel.register(key.selector(), 
SelectionKey.OP_READ, link));
-                         final Task task = _factory.create(Task.Type.CONNECT, 
link, null);
-                         socketChannels.add(socketChannel);
-                         registerLink(saddr, link);
-                         _executor.submit(task);
-                     } catch (IOException e) {
-                         if (logger.isTraceEnabled()) {
-                             logger.trace("Connection closed due to failure: " 
+ e.getMessage());
-                         }
-                         closeAutoCloseable(socket, "accepting socket");
-                         closeAutoCloseable(socketChannel, "accepting 
socketChannel");
-                     } finally {
-                         _selector.wakeup();
+             _sslHandshakeExecutor.submit(() -> {
+                 final InetSocketAddress socketAddress = 
(InetSocketAddress)socket.getRemoteSocketAddress();
+                 _factory.registerNewConnection(socketAddress);
+                 _selector.wakeup();
+                 try {
+                     final SSLEngine sslEngine = 
Link.initServerSSLEngine(caService, 
socketChannel.getRemoteAddress().toString());
+                     sslEngine.setUseClientMode(false);
+                     
sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
+                     sslEngine.beginHandshake();
+                     if (!Link.doHandshake(socketChannel, sslEngine, 
getSslHandshakeTimeout())) {
+                         throw new IOException("SSL handshake timed out with " 
+ socketAddress);
                      }
+                     logger.trace("SSL: Handshake done");
+                     final Link link = new Link(socketAddress, nioConnection);
+                     link.setSSLEngine(sslEngine);
+                     link.setKey(socketChannel.register(key.selector(), 
SelectionKey.OP_READ, link));
+                     final Task task = _factory.create(Task.Type.CONNECT, 
link, null);
+                     registerLink(socketAddress, link);
+                     _executor.submit(task);
+                 } catch (final GeneralSecurityException | IOException e) {
+                     _factory.unregisterNewConnection(socketAddress);
+                     logger.trace("Connection closed with {} due to failure: 
{}", socket.getRemoteSocketAddress(), e.getMessage());
+                     closeAutoCloseable(socket, "accepting socket");
+                     closeAutoCloseable(socketChannel, "accepting 
socketChannel");
+                 } finally {
+                     _selector.wakeup();
                  }
              });
-         } catch (final Exception e) {
-             if (logger.isTraceEnabled()) {
-                 logger.trace("Connection closed due to failure: " + 
e.getMessage());
-             }
-             closeAutoCloseable(socket, "accepting socket");
-             closeAutoCloseable(socketChannel, "accepting socketChannel");
+         } catch (final RejectedExecutionException e) {
+             logger.trace("{} Accept Task rejected: {}", 
socket.getRemoteSocketAddress(), e.getMessage());
+             closeAutoCloseable(socket, "Rejecting connection - accepting 
socket");
+             closeAutoCloseable(socketChannel, "Rejecting connection - 
accepting socketChannel");
          } finally {
              _selector.wakeup();
          }
@@@ -277,14 -295,14 +296,10 @@@
          final Link link = (Link)key.attachment();
          try {
              final SocketChannel socketChannel = (SocketChannel)key.channel();
--            if (logger.isTraceEnabled()) {
--                logger.trace("Reading from: " + 
socketChannel.socket().toString());
--            }
++            logger.trace("Reading from: {}", 
socketChannel.socket().toString());
              final byte[] data = link.read(socketChannel);
              if (data == null) {
--                if (logger.isTraceEnabled()) {
--                    logger.trace("Packet is incomplete.  Waiting for more.");
--                }
++                logger.trace("Packet is incomplete.  Waiting for more.");
                  return;
              }
              final Task task = _factory.create(Task.Type.DATA, link, data);
@@@ -330,18 -348,18 +345,17 @@@
  
      protected void processTodos() {
          List<ChangeRequest> todos;
--        if (_todos.size() == 0) {
++        if (_todos.isEmpty()) {
              return;             // Nothing to do.
          }
  
          synchronized (this) {
              todos = _todos;
--            _todos = new ArrayList<ChangeRequest>();
++            _todos = new ArrayList<>();
          }
  
--        if (logger.isTraceEnabled()) {
--            logger.trace("Todos Processing: " + todos.size());
--        }
++        logger.trace("Todos Processing: {}", todos.size());
++
          SelectionKey key;
          for (final ChangeRequest todo : todos) {
              switch (todo.type) {
@@@ -368,7 -386,7 +382,7 @@@
                          link.setKey(key);
                      }
                  } catch (final ClosedChannelException e) {
--                    logger.warn("Couldn't register socket: " + todo.key);
++                    logger.warn("Couldn't register socket: {}", todo.key);
                      try {
                          ((SocketChannel)todo.key).close();
                      } catch (final IOException ignore) {
@@@ -380,9 -398,9 +394,7 @@@
                  }
                  break;
              case ChangeRequest.CLOSE:
--                if (logger.isTraceEnabled()) {
--                    logger.trace("Trying to close " + todo.key);
--                }
++                logger.trace("Trying to close {}", todo.key);
                  key = (SelectionKey)todo.key;
                  closeConnection(key);
                  if (key != null) {
@@@ -410,9 -428,9 +422,7 @@@
              if (!socket.getKeepAlive()) {
                  socket.setKeepAlive(true);
              }
--            if (logger.isDebugEnabled()) {
--                logger.debug("Connected to " + socket);
--            }
++            logger.debug("Connected to {}", socket);
              final Link link = new 
Link((InetSocketAddress)socket.getRemoteSocketAddress(), this);
              link.setKey(key);
              key.attach(link);
@@@ -440,9 -458,9 +450,7 @@@
      protected void write(final SelectionKey key) throws IOException {
          final Link link = (Link)key.attachment();
          try {
--            if (logger.isTraceEnabled()) {
--                logger.trace("Writing to " + 
link.getSocketAddress().toString());
--            }
++            logger.trace("Writing to {}", link.getSocketAddress().toString());
              final boolean close = link.write((SocketChannel)key.channel());
              if (close) {
                  closeConnection(key);
@@@ -462,9 -480,9 +470,7 @@@
              key.cancel();
              try {
                  if (channel != null) {
--                    if (logger.isDebugEnabled()) {
--                        logger.debug("Closing socket " + channel.socket());
--                    }
++                    logger.debug("Closing socket {}", channel.socket());
                      channel.close();
                  }
              } catch (final IOException ignore) {
@@@ -499,17 -517,8 +505,17 @@@
  
      /* Release the resource used by the instance */
      public void cleanUp() throws IOException {
 -        if (_selector != null && _selector.isOpen()) {
 -            _selector.wakeup();
 +        for (SocketChannel channel : socketChannels) {
 +            if (channel != null && channel.isOpen()) {
 +                try {
-                     logger.info(String.format("Closing connection: %s", 
channel.getRemoteAddress()));
++                    logger.info("Closing connection: {}", 
channel.getRemoteAddress());
 +                    channel.close();
 +                } catch (IOException e) {
-                     logger.warn(String.format("Unable to close connection due 
to %s", e.getMessage()));
++                    logger.warn("Unable to close connection due to {}", 
e.getMessage());
 +                }
 +            }
 +        }
 +        if (_selector != null) {
              _selector.close();
          }
      }

Reply via email to