This is an automated email from the ASF dual-hosted git repository. dahn pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudstack.git
commit 2654890e868a5a94412126be8aa920888e62cff4 Merge: 7abda3b9639 085bd3bda5f Author: Daan Hoogland <d...@onecht.net> AuthorDate: Sat Feb 1 21:20:08 2025 +0100 Merge branch '4.20' .python-version | 2 +- agent/conf/agent.properties | 7 + agent/src/main/java/com/cloud/agent/Agent.java | 823 +++++++++++---------- .../src/main/java/com/cloud/agent/AgentShell.java | 57 +- .../src/main/java/com/cloud/agent/IAgentShell.java | 2 + .../cloud/agent/properties/AgentProperties.java | 7 + .../test/java/com/cloud/agent/AgentShellTest.java | 7 + agent/src/test/java/com/cloud/agent/AgentTest.java | 257 +++++++ .../org/apache/cloudstack/acl/RoleService.java | 5 + .../api/command/admin/domain/ListDomainsCmd.java | 7 +- .../api/command/user/account/ListAccountsCmd.java | 5 +- .../user/firewall/CreateFirewallRuleCmd.java | 7 +- .../OutOfBandManagementService.java | 2 +- .../command/admin/domain/ListDomainsCmdTest.java | 13 +- .../command/user/account/ListAccountsCmdTest.java | 15 +- .../user/firewall/CreateFirewallRuleCmdTest.java | 91 +++ .../java/com/cloud/resource/ServerResource.java | 8 + .../java/com/cloud/vm/VirtualMachineManager.java | 22 +- .../service/NetworkOrchestrationService.java | 3 + .../java/com/cloud/capacity/CapacityManager.java | 13 +- .../java/com/cloud/resource/ResourceManager.java | 4 +- .../java/com/cloud/storage/StorageManager.java | 9 + .../com/cloud/agent/manager/AgentManagerImpl.java | 302 +++++--- .../agent/manager/ClusteredAgentManagerImpl.java | 143 ++-- .../com/cloud/vm/VirtualMachineManagerImpl.java | 220 +++--- .../cloud/vm/VirtualMachinePowerStateSyncImpl.java | 220 +++--- .../engine/orchestration/NetworkOrchestrator.java | 2 +- .../java/com/cloud/capacity/dao/CapacityDao.java | 2 + .../com/cloud/capacity/dao/CapacityDaoImpl.java | 12 + .../main/java/com/cloud/dc/ClusterDetailsDao.java | 3 + .../java/com/cloud/dc/ClusterDetailsDaoImpl.java | 20 + .../src/main/java/com/cloud/dc/dao/ClusterDao.java | 16 +- .../main/java/com/cloud/dc/dao/ClusterDaoImpl.java | 71 +- .../cloud/dc/dao/DataCenterIpAddressDaoImpl.java | 3 +- .../com/cloud/dc/dao/DataCenterVnetDaoImpl.java | 2 +- .../src/main/java/com/cloud/host/dao/HostDao.java | 46 +- .../main/java/com/cloud/host/dao/HostDaoImpl.java | 497 ++++++++----- .../com/cloud/network/dao/IPAddressDaoImpl.java | 2 +- .../java/com/cloud/network/dao/NetworkDaoImpl.java | 3 +- .../cloud/secstorage/CommandExecLogDaoImpl.java | 3 +- .../com/cloud/service/dao/ServiceOfferingDao.java | 4 +- .../cloud/service/dao/ServiceOfferingDaoImpl.java | 11 +- .../com/cloud/storage/dao/StoragePoolHostDao.java | 2 +- .../cloud/storage/dao/StoragePoolHostDaoImpl.java | 24 +- .../java/com/cloud/storage/dao/VMTemplateDao.java | 5 +- .../com/cloud/storage/dao/VMTemplateDaoImpl.java | 60 +- .../java/com/cloud/storage/dao/VolumeDaoImpl.java | 8 - .../upgrade/SystemVmTemplateRegistration.java | 2 +- .../cloud/upgrade/dao/DatabaseAccessObject.java | 11 + .../java/com/cloud/upgrade/dao/DbUpgradeUtils.java | 6 + .../com/cloud/upgrade/dao/Upgrade42000to42010.java | 39 + .../java/com/cloud/vm/dao/ConsoleProxyDao.java | 2 +- .../java/com/cloud/vm/dao/ConsoleProxyDaoImpl.java | 32 +- .../java/com/cloud/vm/dao/NicIpAliasDaoImpl.java | 3 +- .../main/java/com/cloud/vm/dao/VMInstanceDao.java | 21 +- .../java/com/cloud/vm/dao/VMInstanceDaoImpl.java | 229 +++++- .../resourcedetail/ResourceDetailsDao.java | 2 + .../resourcedetail/ResourceDetailsDaoBase.java | 15 + .../datastore/db/PrimaryDataStoreDaoImpl.java | 6 +- .../db/views/cloud.network_offering_view.sql | 8 +- .../cloud/capacity/dao/CapacityDaoImplTest.java | 99 +++ .../java/com/cloud/dc/dao/ClusterDaoImplTest.java | 78 ++ .../java/com/cloud/host/dao/HostDaoImplTest.java | 184 +++++ .../cloud/usage/dao/UsageStorageDaoImplTest.java | 7 +- .../resourcedetail/ResourceDetailsDaoBaseTest.java | 181 +++++ .../datastore/db/PrimaryDataStoreDaoImplTest.java | 39 +- .../agent/lb/IndirectAgentLBAlgorithm.java | 4 + .../dao/ManagementServerHostPeerDaoImpl.java | 3 +- .../framework/config/impl/ConfigDepotImpl.java | 14 +- .../main/java/com/cloud/utils/db/GenericDao.java | 5 + .../java/com/cloud/utils/db/GenericDaoBase.java | 36 + .../framework/jobs/dao/VmWorkJobDao.java | 1 + .../framework/jobs/dao/VmWorkJobDaoImpl.java | 16 + .../framework/jobs/dao/VmWorkJobDaoImplTest.java | 94 ++- .../acl/DynamicRoleBasedAPIAccessChecker.java | 70 +- .../affinity/ExplicitDedicationProcessor.java | 18 +- .../dedicated/DedicatedResourceManagerImpl.java | 61 +- .../cloud/deploy/ImplicitDedicationPlanner.java | 50 +- .../implicitplanner/ImplicitPlannerTest.java | 38 +- .../com/cloud/resource/AgentRoutingResource.java | 4 +- .../xenserver/discoverer/XcpServerDiscoverer.java | 9 +- .../cloudstack/metrics/PrometheusExporterImpl.java | 4 +- .../apache/cloudstack/metrics/MetricsService.java | 8 +- .../cloudstack/metrics/MetricsServiceImpl.java | 158 ++-- .../CloudStackPrimaryDataStoreLifeCycleImpl.java | 152 ++-- ...loudStackPrimaryDataStoreLifeCycleImplTest.java | 77 +- .../ScaleIOPrimaryDataStoreLifeCycle.java | 59 +- .../ScaleIOPrimaryDataStoreLifeCycleTest.java | 32 +- .../storage/datastore/util/StorPoolHelper.java | 14 +- .../java/com/cloud/alert/AlertManagerImpl.java | 96 ++- .../java/com/cloud/api/query/QueryManagerImpl.java | 6 +- .../com/cloud/api/query/dao/UserVmJoinDao.java | 4 +- .../com/cloud/api/query/dao/UserVmJoinDaoImpl.java | 4 +- .../com/cloud/capacity/CapacityManagerImpl.java | 185 +++-- .../configuration/ConfigurationManagerImpl.java | 12 +- .../consoleproxy/ConsoleProxyManagerImpl.java | 12 +- .../deploy/DeploymentPlanningManagerImpl.java | 58 +- .../kvm/discoverer/LibvirtServerDiscoverer.java | 66 +- .../java/com/cloud/network/NetworkServiceImpl.java | 24 +- .../com/cloud/network/as/AutoScaleManagerImpl.java | 2 +- .../network/security/SecurityGroupListener.java | 2 +- .../java/com/cloud/network/vpc/VpcManagerImpl.java | 4 +- .../com/cloud/resource/ResourceManagerImpl.java | 85 +-- .../resource/RollingMaintenanceManagerImpl.java | 2 +- .../resourcelimit/ResourceLimitManagerImpl.java | 11 +- .../com/cloud/server/ManagementServerImpl.java | 24 +- .../main/java/com/cloud/server/StatsCollector.java | 100 +-- .../java/com/cloud/storage/StorageManagerImpl.java | 84 ++- .../cloud/storage/download/DownloadListener.java | 42 +- .../main/java/com/cloud/vm/UserVmManagerImpl.java | 2 +- .../org/apache/cloudstack/acl/RoleManagerImpl.java | 2 +- .../agent/lb/IndirectAgentLBServiceImpl.java | 94 ++- .../IndirectAgentLBRoundRobinAlgorithm.java | 5 + .../OutOfBandManagementServiceImpl.java | 20 +- .../java/com/cloud/alert/AlertManagerImplTest.java | 71 +- .../cloud/capacity/CapacityManagerImplTest.java | 182 +++++ .../configuration/ConfigurationManagerTest.java | 20 +- .../deploy/DeploymentPlanningManagerImplTest.java | 4 +- .../com/cloud/network/Ipv6ServiceImplTest.java | 62 +- .../com/cloud/network/NetworkServiceImplTest.java | 26 +- .../cloud/network/as/AutoScaleManagerImplTest.java | 149 ++-- .../cloud/resource/MockResourceManagerImpl.java | 4 +- .../ResourceLimitManagerImplTest.java | 14 +- .../java/com/cloud/user/DomainManagerImplTest.java | 47 +- .../java/com/cloud/user/MockUsageEventDao.java | 6 + .../java/com/cloud/vm/FirstFitPlannerTest.java | 2 - .../agent/lb/IndirectAgentLBServiceImplTest.java | 55 +- .../networkoffering/CreateNetworkOfferingTest.java | 49 +- .../SecondaryStorageManagerImpl.java | 16 +- setup/db/create-schema-simulator.sql | 3 +- test/integration/smoke/test_dynamicroles.py | 15 +- ui/src/config/section/infra/hosts.js | 2 +- ui/src/views/AutogenView.vue | 2 +- ui/src/views/dashboard/CapacityDashboard.vue | 22 +- ui/src/views/network/FirewallRules.vue | 3 + .../utils/backoff/impl/ConstantTimeBackoff.java | 1 - .../java/com/cloud/utils/nio/HandlerFactory.java | 11 +- utils/src/main/java/com/cloud/utils/nio/Link.java | 16 +- .../main/java/com/cloud/utils/nio/NioClient.java | 68 +- .../java/com/cloud/utils/nio/NioConnection.java | 186 ++--- .../main/java/com/cloud/utils/nio/NioServer.java | 42 +- .../apache/cloudstack/utils/cache/LazyCache.java | 33 +- .../apache/cloudstack/utils/cache/SingleCache.java | 33 +- .../java/com/cloud/utils/testcase/NioTest.java | 45 +- .../cloudstack/utils/cache/LazyCacheTest.java | 115 +++ 145 files changed, 4773 insertions(+), 2415 deletions(-) diff --cc agent/src/main/java/com/cloud/agent/Agent.java index 97803477115,2e7b61fbd51..0a76bfbb4f8 --- a/agent/src/main/java/com/cloud/agent/Agent.java +++ b/agent/src/main/java/com/cloud/agent/Agent.java @@@ -58,10 -56,11 +58,10 @@@ import org.apache.cloudstack.managed.co import org.apache.cloudstack.utils.security.KeyStoreUtils; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.FileUtils; - import org.apache.commons.lang.ObjectUtils; - import org.apache.commons.lang3.StringUtils; - import org.apache.logging.log4j.Logger; + import org.apache.commons.lang3.ObjectUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; + import org.apache.logging.log4j.Logger; + import org.apache.logging.log4j.ThreadContext; import com.cloud.agent.api.AgentControlAnswer; import com.cloud.agent.api.AgentControlCommand; @@@ -70,6 -69,7 +70,9 @@@ import com.cloud.agent.api.Command import com.cloud.agent.api.CronCommand; import com.cloud.agent.api.MaintainAnswer; import com.cloud.agent.api.MaintainCommand; ++import com.cloud.agent.api.MigrateAgentConnectionAnswer; ++import com.cloud.agent.api.MigrateAgentConnectionCommand; + import com.cloud.agent.api.PingAnswer; import com.cloud.agent.api.PingCommand; import com.cloud.agent.api.ReadyCommand; import com.cloud.agent.api.ShutdownCommand; @@@ -79,9 -79,11 +82,12 @@@ import com.cloud.agent.transport.Reques import com.cloud.agent.transport.Response; import com.cloud.exception.AgentControlChannelException; import com.cloud.host.Host; + import com.cloud.resource.AgentStatusUpdater; + import com.cloud.resource.ResourceStatusUpdater; import com.cloud.resource.ServerResource; + import com.cloud.utils.NumbersUtil; import com.cloud.utils.PropertiesUtil; - import com.cloud.utils.backoff.BackoffAlgorithm; ++import com.cloud.utils.StringUtils; import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.exception.NioConnectionException; @@@ -314,8 -338,9 +342,8 @@@ public class Agent implements HandlerFa logger.info("Attempted to connect to the server, but received an unexpected exception, trying again...", e); } } - _shell.updateConnectedHost(); + shell.updateConnectedHost(); scavengeOldAgentObjects(); - } public void stop(final String reason, final String detail) { @@@ -479,21 -539,16 +542,21 @@@ } public void sendStartup(final Link link) { + sendStartup(link, false); + } + + public void sendStartup(final Link link, boolean transfer) { - final StartupCommand[] startup = _resource.initialize(); + final StartupCommand[] startup = serverResource.initialize(); if (startup != null) { - final String msHostList = _shell.getPersistentProperty(null, "host"); + final String msHostList = shell.getPersistentProperty(null, "host"); final Command[] commands = new Command[startup.length]; for (int i = 0; i < startup.length; i++) { setupStartupCommand(startup[i]); startup[i].setMSHostList(msHostList); + startup[i].setConnectionTransferred(transfer); commands[i] = startup[i]; } - final Request request = new Request(_id != null ? _id : -1, -1, commands, false, false); + final Request request = new Request(id != null ? id : -1, -1, commands, false, false); request.setSequence(getNextSequence()); logger.debug("Sending Startup: {}", request.toString()); @@@ -547,84 -608,63 +616,71 @@@ return new ServerHandler(type, link, data); } - protected void closeAndTerminateLink(final Link link) { - if (link == null) { - return; - } - link.close(); - link.terminated(); + protected void reconnect(final Link link) { + reconnect(link, null, null, false); } - protected void stopAndCleanupConnection(boolean waitForStop) { - if (connection == null) { - return; - } - connection.stop(); - try { - connection.cleanUp(); - } catch (final IOException e) { - logger.warn("Fail to clean up old connection. {}", e); - } - if (!waitForStop) { + protected void reconnect(final Link link, String preferredHost, List<String> avoidHostList, boolean forTransfer) { - if (!(forTransfer || _reconnectAllowed)) { ++ if (!(forTransfer || reconnectAllowed)) { return; } - do { - shell.getBackoffAlgorithm().waitBeforeRetry(); - } while (connection.isStartup()); - } - synchronized (this) { - if (_startup != null) { - _startup.cancel(); - _startup = null; - } - } - - if (link != null) { - link.close(); - link.terminated(); - protected void reconnect(final Link link) { + if (!reconnectAllowed) { + logger.debug("Reconnect requested but it is not allowed {}", () -> getLinkLog(link)); + return; } - + cancelStartupTask(); + closeAndTerminateLink(link); + closeAndTerminateLink(this.link); setLink(null); cancelTasks(); + serverResource.disconnected(); + logger.info("Lost connection to host: {}. Attempting reconnection while we still have {} commands in progress.", shell.getConnectedHost(), commandsInProgress.get()); + stopAndCleanupConnection(true); + do { + final String host = shell.getNextHost(); + connection = new NioClient(getAgentName(), host, shell.getPort(), shell.getWorkers(), shell.getSslHandshakeTimeout(), this); + logger.info("Reconnecting to host: {}", host); + try { + connection.start(); + } catch (final NioConnectionException e) { + logger.info("Attempted to re-connect to the server, but received an unexpected exception, trying again...", e); + stopAndCleanupConnection(false); + } + shell.getBackoffAlgorithm().waitBeforeRetry(); + } while (!connection.isStartup()); + shell.updateConnectedHost(); + logger.info("Connected to the host: {}", shell.getConnectedHost()); + } - _resource.disconnected(); - - logger.info("Lost connection to host: {}. Attempting reconnection while we still have {} commands in progress.", _shell.getConnectedHost(), _inProgress.get()); - - _connection.stop(); ++ protected void closeAndTerminateLink(final Link link) { ++ if (link == null) { ++ return; ++ } ++ link.close(); ++ link.terminated(); ++ } + ++ protected void stopAndCleanupConnection(boolean waitForStop) { ++ if (connection == null) { ++ return; ++ } ++ connection.stop(); + try { - _connection.cleanUp(); ++ connection.cleanUp(); + } catch (final IOException e) { + logger.warn("Fail to clean up old connection. {}", e); + } - - while (_connection.isStartup()) { - _shell.getBackoffAlgorithm().waitBeforeRetry(); - } - - String host = preferredHost; - if (StringUtils.isEmpty(host)) { - host = _shell.getNextHost(); ++ if (!waitForStop) { ++ return; + } - + do { - if (CollectionUtils.isEmpty(avoidHostList) || !avoidHostList.contains(host)) { - _connection = new NioClient("Agent", host, _shell.getPort(), _shell.getWorkers(), this); - logger.info("Reconnecting to host:{}", host); - try { - _connection.start(); - } catch (final NioConnectionException e) { - logger.info("Attempted to re-connect to the server, but received an unexpected exception, trying again...", e); - _connection.stop(); - try { - _connection.cleanUp(); - } catch (final IOException ex) { - logger.warn("Fail to clean up old connection. {}", ex); - } - } - } - _shell.getBackoffAlgorithm().waitBeforeRetry(); - host = _shell.getNextHost(); - } while (!_connection.isStartup()); - _shell.updateConnectedHost(); - logger.info("Connected to the host: {}", _shell.getConnectedHost()); ++ shell.getBackoffAlgorithm().waitBeforeRetry(); ++ } while (connection.isStartup()); + } + public void processStartupAnswer(final Answer answer, final Response response, final Link link) { - boolean cancelled = false; - synchronized (this) { - if (_startup != null) { - _startup.cancel(); - _startup = null; - } else { - cancelled = true; - } - } + boolean answerValid = cancelStartupTask(); final StartupAnswer startup = (StartupAnswer)answer; if (!startup.getResult()) { logger.error("Not allowed to connect to the server: {}", answer.getDetails()); @@@ -879,53 -925,6 +946,53 @@@ return new SetupMSListAnswer(true); } + private Answer migrateAgentToOtherMS(final MigrateAgentConnectionCommand cmd) { + try { + if (CollectionUtils.isNotEmpty(cmd.getMsList())) { + processManagementServerList(cmd.getMsList(), cmd.getLbAlgorithm(), cmd.getLbCheckInterval()); + } + migrateAgentConnection(cmd.getAvoidMsList()); + } catch (Exception e) { + String errMsg = "Migrate agent connection failed, due to " + e.getMessage(); + logger.debug(errMsg, e); + return new MigrateAgentConnectionAnswer(errMsg); + } + return new MigrateAgentConnectionAnswer(true); + } + + private void migrateAgentConnection(List<String> avoidMsList) { - final String[] msHosts = _shell.getHosts(); ++ final String[] msHosts = shell.getHosts(); + if (msHosts == null || msHosts.length < 1) { + throw new CloudRuntimeException("Management Server hosts empty, not properly configured in agent"); + } + + List<String> msHostsList = new ArrayList<>(Arrays.asList(msHosts)); + msHostsList.removeAll(avoidMsList); + if (msHostsList.isEmpty() || StringUtils.isEmpty(msHostsList.get(0))) { + throw new CloudRuntimeException("No other Management Server hosts to migrate"); + } + + String preferredHost = null; + for (String msHost : msHostsList) { + try (final Socket socket = new Socket()) { - socket.connect(new InetSocketAddress(msHost, _shell.getPort()), 5000); ++ socket.connect(new InetSocketAddress(msHost, shell.getPort()), 5000); + preferredHost = msHost; + break; + } catch (final IOException e) { + throw new CloudRuntimeException("Management server host: " + msHost + " is not reachable, to migrate connection"); + } + } + + if (preferredHost == null) { + throw new CloudRuntimeException("Management server host(s) are not reachable, to migrate connection"); + } + + logger.debug("Management server host " + preferredHost + " is found to be reachable, trying to reconnect"); - _shell.resetHostCounter(); - _shell.setConnectionTransfer(true); - reconnect(_link, preferredHost, avoidMsList, true); ++ shell.resetHostCounter(); ++ shell.setConnectionTransfer(true); ++ reconnect(link, preferredHost, avoidMsList, true); + } + public void processResponse(final Response response, final Link link) { final Answer answer = response.getAnswer(); logger.debug("Received response: {}", response.toString()); @@@ -1133,15 -1125,15 +1193,15 @@@ } } - public class WatchTask extends ManagedContextTimerTask { + public class WatchTask implements Runnable { protected Request _request; protected Agent _agent; -- protected Link _link; ++ protected Link link; public WatchTask(final Link link, final Request request, final Agent agent) { super(); _request = request; -- _link = link; ++ this.link = link; _agent = agent; } @@@ -1150,9 -1142,9 +1210,9 @@@ logger.trace("Scheduling {}", (_request instanceof Response ? "Ping" : "Watch Task")); try { if (_request instanceof Response) { - _ugentTaskPool.submit(new ServerHandler(Task.Type.OTHER, _link, _request)); - outRequestHandler.submit(new ServerHandler(Task.Type.OTHER, _link, _request)); ++ outRequestHandler.submit(new ServerHandler(Task.Type.OTHER, link, _request)); } else { -- _link.schedule(new ServerHandler(Task.Type.OTHER, _link, _request)); ++ link.schedule(new ServerHandler(Task.Type.OTHER, link, _request)); } } catch (final ClosedChannelException e) { logger.warn("Unable to schedule task because channel is closed"); @@@ -1160,35 -1152,32 +1220,32 @@@ } } - public class StartupTask extends ManagedContextTimerTask { - protected Link _link; - protected volatile boolean cancelled = false; + public class StartupTask implements Runnable { - protected Link _link; ++ protected Link link; + private final AtomicBoolean cancelled = new AtomicBoolean(false); public StartupTask(final Link link) { logger.debug("Startup task created"); -- _link = link; ++ this.link = link; } - @Override - public synchronized boolean cancel() { + public boolean cancel() { // TimerTask.cancel may fail depends on the calling context - if (!cancelled) { - cancelled = true; - _startupWait = _startupWaitDefault; + if (cancelled.compareAndSet(false, true)) { + startupWait = DEFAULT_STARTUP_WAIT; logger.debug("Startup task cancelled"); - return super.cancel(); } return true; } @Override - protected synchronized void runInContext() { - if (!cancelled) { - logger.info("The startup command is now cancelled"); - cancelled = true; - _startup = null; - _startupWait = _startupWaitDefault * 2; - reconnect(_link); + public void run() { + if (cancelled.compareAndSet(false, true)) { + logger.info("The running startup command is now invalid. Attempting reconnect"); + startupTask.set(null); + startupWait = DEFAULT_STARTUP_WAIT * 2; - logger.debug("Executing reconnect from task - {}", () -> getLinkLog(_link)); - reconnect(_link); ++ logger.debug("Executing reconnect from task - {}", () -> getLinkLog(link)); ++ reconnect(link); } } } @@@ -1219,10 -1208,9 +1276,10 @@@ @Override public void doTask(final Task task) throws TaskExecutionException { if (task.getType() == Task.Type.CONNECT) { - _shell.getBackoffAlgorithm().reset(); + shell.getBackoffAlgorithm().reset(); setLink(task.getLink()); - sendStartup(task.getLink(), _shell.isConnectionTransfer()); - _shell.setConnectionTransfer(false); - sendStartup(task.getLink()); ++ sendStartup(task.getLink(), shell.isConnectionTransfer()); ++ shell.setConnectionTransfer(false); } else if (task.getType() == Task.Type.DATA) { Request request; try { @@@ -1241,15 -1229,8 +1298,15 @@@ logger.error("Error parsing task", e); } } else if (task.getType() == Task.Type.DISCONNECT) { + try { + // an issue has been found if reconnect immediately after disconnecting. please refer to https://github.com/apache/cloudstack/issues/8517 + // wait 5 seconds before reconnecting + Thread.sleep(5000); + } catch (InterruptedException e) { + } - _shell.setConnectionTransfer(false); ++ shell.setConnectionTransfer(false); + logger.debug("Executing disconnect task - {}", () -> getLinkLog(task.getLink())); reconnect(task.getLink()); - return; } else if (task.getType() == Task.Type.OTHER) { processOtherTask(task); } diff --cc agent/src/main/java/com/cloud/agent/IAgentShell.java index 0b9d9e81e95,7f04048795d..c0ecd90ae69 --- a/agent/src/main/java/com/cloud/agent/IAgentShell.java +++ b/agent/src/main/java/com/cloud/agent/IAgentShell.java @@@ -71,7 -71,5 +71,9 @@@ public interface IAgentShell void launchNewAgent(ServerResource resource) throws ConfigurationException; + boolean isConnectionTransfer(); + + void setConnectionTransfer(boolean connectionTransfer); ++ + Integer getSslHandshakeTimeout(); } diff --cc engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java index f154eaddc1e,1ab3b7ff892..765602e42d0 --- a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java +++ b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java @@@ -16,9 -16,9 +16,10 @@@ // under the License. package com.cloud.agent.manager; +import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; + import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Arrays; @@@ -88,6 -82,7 +86,9 @@@ import com.cloud.agent.api.UnsupportedA import com.cloud.agent.transport.Request; import com.cloud.agent.transport.Response; import com.cloud.alert.AlertManager; ++import com.cloud.cluster.ManagementServerHostVO; ++import com.cloud.cluster.dao.ManagementServerHostDao; + import com.cloud.configuration.Config; import com.cloud.configuration.ManagementServiceConfiguration; import com.cloud.dc.ClusterVO; import com.cloud.dc.DataCenterVO; @@@ -142,14 -138,13 +144,13 @@@ public class AgentManagerImpl extends M * _agents is a ConcurrentHashMap, but it is used from within a synchronized block. This will be reported by findbugs as JLM_JSR166_UTILCONCURRENT_MONITORENTER. Maybe a * ConcurrentHashMap is not the right thing to use here, but i'm not sure so i leave it alone. */ -- protected ConcurrentHashMap<Long, AgentAttache> _agents = new ConcurrentHashMap<Long, AgentAttache>(10007); -- protected List<Pair<Integer, Listener>> _hostMonitors = new ArrayList<Pair<Integer, Listener>>(17); -- protected List<Pair<Integer, Listener>> _cmdMonitors = new ArrayList<Pair<Integer, Listener>>(17); -- protected List<Pair<Integer, StartupCommandProcessor>> _creationMonitors = new ArrayList<Pair<Integer, StartupCommandProcessor>>(17); -- protected List<Long> _loadingAgents = new ArrayList<Long>(); ++ protected ConcurrentHashMap<Long, AgentAttache> _agents = new ConcurrentHashMap<>(10007); ++ protected List<Pair<Integer, Listener>> _hostMonitors = new ArrayList<>(17); ++ protected List<Pair<Integer, Listener>> _cmdMonitors = new ArrayList<>(17); ++ protected List<Pair<Integer, StartupCommandProcessor>> _creationMonitors = new ArrayList<>(17); ++ protected List<Long> _loadingAgents = new ArrayList<>(); protected Map<String, Integer> _commandTimeouts = new HashMap<>(); private int _monitorId = 0; - private final Lock _agentStatusLock = new ReentrantLock(); @Inject protected CAManager caService; @@@ -198,28 -188,37 +199,39 @@@ private int _directAgentThreadCap; + private List<String> lastAgents = null; + protected StateMachine2<Status, Status.Event, Host> _statusStateMachine = Status.getStateMachine(); -- private final ConcurrentHashMap<Long, Long> _pingMap = new ConcurrentHashMap<Long, Long>(10007); ++ private final ConcurrentHashMap<Long, Long> _pingMap = new ConcurrentHashMap<>(10007); + private int maxConcurrentNewAgentConnections; + private final ConcurrentHashMap<String, Long> newAgentConnections = new ConcurrentHashMap<>(); + protected ScheduledExecutorService newAgentConnectionsMonitor; @Inject ResourceManager _resourceMgr; @Inject ManagementServiceConfiguration mgmtServiceConf; -- protected final ConfigKey<Integer> Workers = new ConfigKey<Integer>("Advanced", Integer.class, "workers", "5", ++ protected final ConfigKey<Integer> Workers = new ConfigKey<>("Advanced", Integer.class, "workers", "5", "Number of worker threads handling remote agent connections.", false); -- protected final ConfigKey<Integer> Port = new ConfigKey<Integer>("Advanced", Integer.class, "port", "8250", "Port to listen on for remote agent connections.", false); - protected final ConfigKey<Integer> AlertWait = new ConfigKey<Integer>("Advanced", Integer.class, "alert.wait", "1800", ++ protected final ConfigKey<Integer> Port = new ConfigKey<>("Advanced", Integer.class, "port", "8250", "Port to listen on for remote agent connections.", false); + protected final ConfigKey<Integer> RemoteAgentSslHandshakeTimeout = new ConfigKey<>("Advanced", + Integer.class, "agent.ssl.handshake.timeout", "30", + "Seconds after which SSL handshake times out during remote agent connections.", false); + protected final ConfigKey<Integer> RemoteAgentMaxConcurrentNewConnections = new ConfigKey<>("Advanced", + Integer.class, "agent.max.concurrent.new.connections", "0", + "Number of maximum concurrent new connections server allows for remote agents. " + + "If set to zero (default value) then no limit will be enforced on concurrent new connections", + false); - protected final ConfigKey<Integer> AlertWait = new ConfigKey<Integer>("Advanced", Integer.class, "alert.wait", "1800", ++ protected final ConfigKey<Integer> AlertWait = new ConfigKey<>("Advanced", Integer.class, "alert.wait", "1800", "Seconds to wait before alerting on a disconnected agent", true); -- protected final ConfigKey<Integer> DirectAgentLoadSize = new ConfigKey<Integer>("Advanced", Integer.class, "direct.agent.load.size", "16", ++ protected final ConfigKey<Integer> DirectAgentLoadSize = new ConfigKey<>("Advanced", Integer.class, "direct.agent.load.size", "16", "The number of direct agents to load each time", false); -- protected final ConfigKey<Integer> DirectAgentPoolSize = new ConfigKey<Integer>("Advanced", Integer.class, "direct.agent.pool.size", "500", ++ protected final ConfigKey<Integer> DirectAgentPoolSize = new ConfigKey<>("Advanced", Integer.class, "direct.agent.pool.size", "500", "Default size for DirectAgentPool", false); -- protected final ConfigKey<Float> DirectAgentThreadCap = new ConfigKey<Float>("Advanced", Float.class, "direct.agent.thread.cap", "1", ++ protected final ConfigKey<Float> DirectAgentThreadCap = new ConfigKey<>("Advanced", Float.class, "direct.agent.thread.cap", "1", "Percentage (as a value between 0 and 1) of direct.agent.pool.size to be used as upper thread cap for a single direct agent to process requests", false); -- protected final ConfigKey<Boolean> CheckTxnBeforeSending = new ConfigKey<Boolean>("Developer", Boolean.class, "check.txn.before.sending.agent.commands", "false", ++ protected final ConfigKey<Boolean> CheckTxnBeforeSending = new ConfigKey<>("Developer", Boolean.class, "check.txn.before.sending.agent.commands", "false", "This parameter allows developers to enable a check to see if a transaction wraps commands that are sent to the resource. This is not to be enabled on production systems.", true); @Override @@@ -239,11 -236,10 +249,13 @@@ registerForHostEvents(new SetHostParamsListener(), true, true, false); + managementServerMaintenanceManager.registerListener(this); + - _executor = new ThreadPoolExecutor(threads, threads, 60l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("AgentTaskPool")); + final int agentTaskThreads = DirectAgentLoadSize.value(); - _executor = new ThreadPoolExecutor(agentTaskThreads, agentTaskThreads, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("AgentTaskPool")); + - _connectExecutor = new ThreadPoolExecutor(100, 500, 60l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("AgentConnectTaskPool")); ++ _executor = new ThreadPoolExecutor(agentTaskThreads, agentTaskThreads, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("AgentTaskPool")); + - _connectExecutor = new ThreadPoolExecutor(100, 500, 60l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("AgentConnectTaskPool")); ++ _connectExecutor = new ThreadPoolExecutor(100, 500, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("AgentConnectTaskPool")); // allow core threads to time out even when there are no items in the queue _connectExecutor.allowCoreThreadTimeOut(true); @@@ -269,22 -271,44 +287,44 @@@ return new AgentHandler(type, link, data); } + @Override + public int getMaxConcurrentNewConnectionsCount() { + return maxConcurrentNewAgentConnections; + } + + @Override + public int getNewConnectionsCount() { + return newAgentConnections.size(); + } + + @Override + public void registerNewConnection(SocketAddress address) { - logger.trace(String.format("Adding new agent connection from %s", address.toString())); ++ logger.trace("Adding new agent connection from {}", address.toString()); + newAgentConnections.putIfAbsent(address.toString(), System.currentTimeMillis()); + } + + @Override + public void unregisterNewConnection(SocketAddress address) { - logger.trace(String.format("Removing new agent connection for %s", address.toString())); ++ logger.trace("Removing new agent connection for {}", address.toString()); + newAgentConnections.remove(address.toString()); + } + @Override public int registerForHostEvents(final Listener listener, final boolean connections, final boolean commands, final boolean priority) { synchronized (_hostMonitors) { _monitorId++; if (connections) { if (priority) { -- _hostMonitors.add(0, new Pair<Integer, Listener>(_monitorId, listener)); ++ _hostMonitors.add(0, new Pair<>(_monitorId, listener)); } else { -- _hostMonitors.add(new Pair<Integer, Listener>(_monitorId, listener)); ++ _hostMonitors.add(new Pair<>(_monitorId, listener)); } } if (commands) { if (priority) { -- _cmdMonitors.add(0, new Pair<Integer, Listener>(_monitorId, listener)); ++ _cmdMonitors.add(0, new Pair<>(_monitorId, listener)); } else { -- _cmdMonitors.add(new Pair<Integer, Listener>(_monitorId, listener)); ++ _cmdMonitors.add(new Pair<>(_monitorId, listener)); } } logger.debug("Registering listener {} with id {}", listener.getClass().getSimpleName(), _monitorId); @@@ -297,9 -321,9 +337,9 @@@ synchronized (_hostMonitors) { _monitorId++; if (priority) { -- _creationMonitors.add(0, new Pair<Integer, StartupCommandProcessor>(_monitorId, creator)); ++ _creationMonitors.add(0, new Pair<>(_monitorId, creator)); } else { -- _creationMonitors.add(new Pair<Integer, StartupCommandProcessor>(_monitorId, creator)); ++ _creationMonitors.add(new Pair<>(_monitorId, creator)); } return _monitorId; } @@@ -311,47 -335,8 +351,47 @@@ _hostMonitors.remove(id); } + @Override + public void onManagementServerMaintenance() { + logger.debug("Management server maintenance enabled"); + _monitorExecutor.shutdownNow(); + if (_connection != null) { + _connection.stop(); + + try { + _connection.cleanUp(); + } catch (final IOException e) { + logger.warn("Fail to clean up old connection", e); + } + } + _connectExecutor.shutdownNow(); + } + + @Override + public void onManagementServerCancelMaintenance() { + logger.debug("Management server maintenance disabled"); + if (_connectExecutor.isShutdown()) { - _connectExecutor = new ThreadPoolExecutor(100, 500, 60l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("AgentConnectTaskPool")); ++ _connectExecutor = new ThreadPoolExecutor(100, 500, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("AgentConnectTaskPool")); + _connectExecutor.allowCoreThreadTimeOut(true); + } + + startDirectlyConnectedHosts(true); + if (_connection != null) { + try { + _connection.start(); + } catch (final NioConnectionException e) { + logger.error("Error when connecting to the NioServer!", e); + } + } + + if (_monitorExecutor.isShutdown()) { + _monitorExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AgentMonitor")); + _monitorExecutor.scheduleWithFixedDelay(new MonitorTask(), mgmtServiceConf.getPingInterval(), mgmtServiceConf.getPingInterval(), TimeUnit.SECONDS); + } + } + private AgentControlAnswer handleControlCommand(final AgentAttache attache, final AgentControlCommand cmd) { -- AgentControlAnswer answer = null; ++ AgentControlAnswer answer; for (final Pair<Integer, Listener> listener : _cmdMonitors) { answer = listener.second().processControlCommand(attache.getId(), cmd); @@@ -379,7 -364,7 +419,7 @@@ } public AgentAttache findAttache(final long hostId) { -- AgentAttache attache = null; ++ AgentAttache attache; synchronized (_agents) { attache = _agents.get(hostId); } @@@ -431,12 -406,12 +471,10 @@@ cmds.addCommand(cmd); send(hostId, cmds, cmd.getWait()); final Answer[] answers = cmds.getAnswers(); -- if (answers != null && !(answers[0] instanceof UnsupportedAnswer)) { -- return answers[0]; -- } -- -- if (answers != null && answers[0] instanceof UnsupportedAnswer) { -- logger.warn("Unsupported Command: {}", answers[0].getDetails()); ++ if (answers != null) { ++ if (answers[0] instanceof UnsupportedAnswer) { ++ logger.warn("Unsupported Command: {}", answers[0].getDetails()); ++ } return answers[0]; } @@@ -467,8 -442,8 +505,8 @@@ } /** -- * @param commands -- * @return ++ * @param commands object container of commands ++ * @return array of commands */ private Command[] checkForCommandsAndTag(final Commands commands) { final Command[] cmds = commands.toCommands(); @@@ -484,8 -459,8 +522,8 @@@ } /** -- * @param commands -- * @param cmds ++ * @param commands object container of commands ++ * @param cmds array of commands */ private void setEmptyAnswers(final Commands commands, final Command[] cmds) { if (cmds.length == 0) { @@@ -524,7 -499,7 +562,7 @@@ String commandWaits = GranularWaitTimeForCommands.value().trim(); if (StringUtils.isNotEmpty(commandWaits)) { _commandTimeouts = getCommandTimeoutsMap(commandWaits); -- logger.info(String.format("Timeouts for management server internal commands successfully initialized from global setting commands.timeout: %s", _commandTimeouts)); ++ logger.info("Timeouts for management server internal commands successfully initialized from global setting commands.timeout: {}", _commandTimeouts); } } @@@ -540,10 -515,10 +578,10 @@@ int commandTimeout = Integer.parseInt(parts[1].trim()); commandTimeouts.put(commandName, commandTimeout); } catch (NumberFormatException e) { -- logger.error(String.format("Initialising the timeouts using commands.timeout: %s for management server internal commands failed with error %s", commandPair, e.getMessage())); ++ logger.error("Initialising the timeouts using commands.timeout: {} for management server internal commands failed with error {}", commandPair, e.getMessage()); } } else { -- logger.error(String.format("Error initialising the timeouts for management server internal commands. Invalid format in commands.timeout: %s", commandPair)); ++ logger.error("Error initialising the timeouts for management server internal commands. Invalid format in commands.timeout: {}", commandPair); } } return commandTimeouts; @@@ -557,7 -532,7 +595,7 @@@ } int wait = getTimeout(commands, timeout); -- logger.debug(String.format("Wait time setting on %s is %d seconds", commands, wait)); ++ logger.debug("Wait time setting on {} is {} seconds", commands, wait); for (Command cmd : commands) { String simpleCommandName = cmd.getClass().getSimpleName(); Integer commandTimeout = _commandTimeouts.get(simpleCommandName); @@@ -644,7 -619,7 +682,7 @@@ } final long hostId = attache.getId(); logger.debug("Remove Agent : {}", attache); -- AgentAttache removed = null; ++ AgentAttache removed; boolean conflict = false; synchronized (_agents) { removed = _agents.remove(hostId); @@@ -697,16 -672,16 +735,15 @@@ } } catch (final HypervisorVersionChangedException hvce) { handleDisconnectWithoutInvestigation(attache, Event.ShutdownRequested, true, true); -- throw new CloudRuntimeException("Unable to connect " + (attache == null ? "<unknown agent>" : attache.getId()), hvce); ++ throw new CloudRuntimeException("Unable to connect " + attache.getId(), hvce); } catch (final Exception e) { logger.error("Monitor {} says there is an error in the connect process for {} due to {}", monitor.second().getClass().getSimpleName(), hostId, e.getMessage(), e); handleDisconnectWithoutInvestigation(attache, Event.AgentDisconnected, true, true); -- throw new CloudRuntimeException("Unable to connect " + (attache == null ? "<unknown agent>" : attache.getId()), e); ++ throw new CloudRuntimeException("Unable to connect " + attache.getId(), e); } } } -- final Long dcId = host.getDataCenterId(); final ReadyCommand ready = new ReadyCommand(host, NumbersUtil.enableHumanReadableSizes); ready.setWait(ReadyCommandWait.value()); final Answer answer = easySend(hostId, ready); @@@ -775,25 -748,25 +816,25 @@@ final Constructor<?> constructor = clazz.getConstructor(); resource = (ServerResource)constructor.newInstance(); } catch (final ClassNotFoundException e) { -- logger.warn("Unable to find class " + host.getResource(), e); ++ logger.warn("Unable to find class {}", host.getResource(), e); } catch (final InstantiationException e) { -- logger.warn("Unable to instantiate class " + host.getResource(), e); ++ logger.warn("Unable to instantiate class {}", host.getResource(), e); } catch (final IllegalAccessException e) { -- logger.warn("Illegal access " + host.getResource(), e); ++ logger.warn("Illegal access {}", host.getResource(), e); } catch (final SecurityException e) { -- logger.warn("Security error on " + host.getResource(), e); ++ logger.warn("Security error on {}", host.getResource(), e); } catch (final NoSuchMethodException e) { -- logger.warn("NoSuchMethodException error on " + host.getResource(), e); ++ logger.warn("NoSuchMethodException error on {}", host.getResource(), e); } catch (final IllegalArgumentException e) { -- logger.warn("IllegalArgumentException error on " + host.getResource(), e); ++ logger.warn("IllegalArgumentException error on {}", host.getResource(), e); } catch (final InvocationTargetException e) { -- logger.warn("InvocationTargetException error on " + host.getResource(), e); ++ logger.warn("InvocationTargetException error on {}", host.getResource(), e); } if (resource != null) { _hostDao.loadDetails(host); -- final HashMap<String, Object> params = new HashMap<String, Object>(host.getDetails().size() + 5); ++ final HashMap<String, Object> params = new HashMap<>(host.getDetails().size() + 5); params.putAll(host.getDetails()); params.put("guid", host.getGuid()); @@@ -803,7 -776,7 +844,7 @@@ } if (host.getClusterId() != null) { params.put("cluster", Long.toString(host.getClusterId())); -- String guid = null; ++ String guid; final ClusterVO cluster = _clusterDao.findById(host.getClusterId()); if (cluster.getGuid() == null) { guid = host.getDetail("pool"); @@@ -838,12 -811,8 +879,12 @@@ } protected boolean loadDirectlyConnectedHost(final HostVO host, final boolean forRebalance) { + return loadDirectlyConnectedHost(host, forRebalance, false); + } + + protected boolean loadDirectlyConnectedHost(final HostVO host, final boolean forRebalance, final boolean isTransferredConnection) { boolean initialized = false; -- ServerResource resource = null; ++ ServerResource resource; try { // load the respective discoverer final Discoverer discoverer = _resourceMgr.getMatchingDiscover(host.getHypervisorType()); @@@ -870,21 -839,21 +911,21 @@@ if (forRebalance) { tapLoadingAgents(host.getId(), TapAgentsAction.Add); - final Host h = _resourceMgr.createHostAndAgent(host.getId(), resource, host.getDetails(), false, null, true); + final Host h = _resourceMgr.createHostAndAgent(host.getId(), resource, host.getDetails(), false, null, true, isTransferredConnection); tapLoadingAgents(host.getId(), TapAgentsAction.Del); -- return h == null ? false : true; ++ return h != null; } else { _executor.execute(new SimulateStartTask(host.getId(), host.getUuid(), host.getName(), resource, host.getDetails())); return true; } } -- protected AgentAttache createAttacheForDirectConnect(final Host host, final ServerResource resource) throws ConnectionException { ++ protected AgentAttache createAttacheForDirectConnect(final Host host, final ServerResource resource) { logger.debug("create DirectAgentAttache for {}", host); final DirectAgentAttache attache = new DirectAgentAttache(this, host.getId(), host.getUuid(), host.getName(), resource, host.isInMaintenanceStates()); -- AgentAttache old = null; ++ AgentAttache old; synchronized (_agents) { old = _agents.put(host.getId(), attache); } @@@ -949,7 -919,7 +991,7 @@@ try { logger.info("Host {} is disconnecting with event {}", attache, event); -- Status nextStatus = null; ++ Status nextStatus; final HostVO host = _hostDao.findById(hostId); if (host == null) { logger.warn("Can't find host with {} ({})", hostId, attache); @@@ -1082,7 -1052,7 +1124,7 @@@ @Override protected void runInContext() { try { -- if (_investigate == true) { ++ if (_investigate) { handleDisconnectWithInvestigation(_attache, _event); } else { handleDisconnectWithoutInvestigation(_attache, _event, true, false); @@@ -1134,8 -1104,8 +1176,8 @@@ public Answer[] send(final Long hostId, final Commands cmds) throws AgentUnavailableException, OperationTimedoutException { int wait = 0; if (cmds.size() > 1) { -- logger.debug(String.format("Checking the wait time in seconds to be used for the following commands : %s. If there are multiple commands sent at once," + -- "then max wait time of those will be used", cmds)); ++ logger.debug("Checking the wait time in seconds to be used for the following commands : {}. If there are multiple commands sent at once," + ++ "then max wait time of those will be used", cmds); } for (final Command cmd : cmds) { @@@ -1198,7 -1168,7 +1240,7 @@@ public boolean executeUserRequest(final long hostId, final Event event) throws AgentUnavailableException { if (event == Event.AgentDisconnected) { -- AgentAttache attache = null; ++ AgentAttache attache; attache = findAttache(hostId); logger.debug("Received agent disconnect event for host {} ({})", hostId, attache); if (attache != null) { @@@ -1224,12 -1194,12 +1266,12 @@@ return agentAttache != null; } -- protected AgentAttache createAttacheForConnect(final HostVO host, final Link link) throws ConnectionException { ++ protected AgentAttache createAttacheForConnect(final HostVO host, final Link link) { logger.debug("create ConnectedAgentAttache for {}", host); final AgentAttache attache = new ConnectedAgentAttache(this, host.getId(), host.getUuid(), host.getName(), link, host.isInMaintenanceStates()); link.attach(attache); -- AgentAttache old = null; ++ AgentAttache old; synchronized (_agents) { old = _agents.put(host.getId(), attache); } @@@ -1254,7 -1224,7 +1296,7 @@@ } } ready.setArch(host.getArch().getType()); -- AgentAttache attache = null; ++ AgentAttache attache; GlobalLock joinLock = getHostJoinLock(host.getId()); if (joinLock.lock(60)) { try { @@@ -1280,7 -1250,7 +1322,7 @@@ return attache; } -- private AgentAttache handleConnectedAgent(final Link link, final StartupCommand[] startup, final Request request) { ++ private AgentAttache handleConnectedAgent(final Link link, final StartupCommand[] startup) { AgentAttache attache = null; ReadyCommand ready = null; try { @@@ -1308,7 -1278,7 +1350,7 @@@ easySend(attache.getId(), ready); } } catch (final Exception e) { -- logger.debug("Failed to send ready command:" + e.toString()); ++ logger.debug("Failed to send ready command:", e); } return attache; } @@@ -1334,6 -1304,6 +1376,8 @@@ this.id = id; this.resource = resource; this.details = details; ++ this.uuid = uuid; ++ this.name = name; } @Override @@@ -1382,7 -1352,7 +1426,7 @@@ startups[i] = (StartupCommand)_cmds[i]; } -- final AgentAttache attache = handleConnectedAgent(_link, startups, _request); ++ final AgentAttache attache = handleConnectedAgent(_link, startups); if (attache == null) { logger.warn("Unable to create attache for agent: {}", _request); } @@@ -1402,7 -1373,7 +1447,7 @@@ break; } } -- Response response = null; ++ Response response; response = new Response(request, answers[0], _nodeId, -1); try { link.send(response.toBytes()); @@@ -1483,7 -1454,7 +1528,6 @@@ } final long hostId = attache.getId(); -- final String hostName = attache.getName(); if (logger.isDebugEnabled()) { if (cmd instanceof PingRoutingCommand) { @@@ -1502,7 -1473,7 +1546,7 @@@ final Answer[] answers = new Answer[cmds.length]; for (int i = 0; i < cmds.length; i++) { cmd = cmds[i]; -- Answer answer = null; ++ Answer answer; try { if (cmd instanceof StartupRoutingCommand) { final StartupRoutingCommand startup = (StartupRoutingCommand) cmd; @@@ -1536,7 -1507,7 +1580,7 @@@ final long cmdHostId = ((PingCommand)cmd).getHostId(); boolean requestStartupCommand = false; -- final HostVO host = _hostDao.findById(Long.valueOf(cmdHostId)); ++ final HostVO host = _hostDao.findById(cmdHostId); boolean gatewayAccessible = true; // if the router is sending a ping, verify the // gateway was pingable @@@ -1586,7 -1557,7 +1630,7 @@@ if (logD) { logger.debug("SeqA {}-: Sending {}", attache.getId(), response.getSequence(), response); } else { -- logger.trace("SeqA {}-: Sending {}" + attache.getId(), response.getSequence(), response); ++ logger.trace("SeqA {}-: Sending {} {}", response.getSequence(), response, attache.getId()); } try { link.send(response.toBytes()); @@@ -1606,15 -1577,15 +1650,14 @@@ @Override protected void doTask(final Task task) throws TaskExecutionException { -- final TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB); -- try { ++ try (TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB)) { final Type type = task.getType(); -- if (type == Task.Type.DATA) { ++ if (type == Type.DATA) { final byte[] data = task.getData(); try { final Request event = Request.parse(data); if (event instanceof Response) { -- processResponse(task.getLink(), (Response)event); ++ processResponse(task.getLink(), (Response) event); } else { processRequest(task.getLink(), event); } @@@ -1626,10 -1597,10 +1669,10 @@@ logger.error(message); throw new TaskExecutionException(message, e); } -- } else if (type == Task.Type.CONNECT) { -- } else if (type == Task.Type.DISCONNECT) { ++ } else if (type == Type.CONNECT) { ++ } else if (type == Type.DISCONNECT) { final Link link = task.getLink(); -- final AgentAttache attache = (AgentAttache)link.attachment(); ++ final AgentAttache attache = (AgentAttache) link.attachment(); if (attache != null) { disconnectWithInvestigation(attache, Event.AgentDisconnected); } else { @@@ -1638,8 -1609,8 +1681,6 @@@ link.terminated(); } } -- } finally { -- txn.close(); } } } @@@ -1871,7 -1837,7 +1907,7 @@@ } protected List<Long> findAgentsBehindOnPing() { -- final List<Long> agentsBehind = new ArrayList<Long>(); ++ final List<Long> agentsBehind = new ArrayList<>(); final long cutoffTime = InaccurateClock.getTimeInSeconds() - mgmtServiceConf.getTimeout(); for (final Map.Entry<Long, Long> entry : _pingMap.entrySet()) { if (entry.getValue() < cutoffTime) { @@@ -1879,7 -1845,7 +1915,7 @@@ } } -- if (agentsBehind.size() > 0) { ++ if (!agentsBehind.isEmpty()) { logger.info("Found the following agents behind on ping: {}", agentsBehind); } @@@ -1887,6 -1853,35 +1923,35 @@@ } } + protected class AgentNewConnectionsMonitorTask extends ManagedContextRunnable { + @Override + protected void runInContext() { + logger.trace("Agent New Connections Monitor is started."); + final int cleanupTime = Wait.value(); + Set<Map.Entry<String, Long>> entrySet = newAgentConnections.entrySet(); + long cutOff = System.currentTimeMillis() - (cleanupTime * 60 * 1000L); + if (logger.isDebugEnabled()) { + List<String> expiredConnections = newAgentConnections.entrySet() + .stream() + .filter(e -> e.getValue() <= cutOff) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); - logger.debug(String.format("Currently %d active new connections, of which %d have expired - %s", ++ logger.debug("Currently {} active new connections, of which {} have expired - {}", + entrySet.size(), + expiredConnections.size(), - StringUtils.join(expiredConnections))); ++ StringUtils.join(expiredConnections)); + } + for (Map.Entry<String, Long> entry : entrySet) { + if (entry.getValue() <= cutOff) { + if (logger.isTraceEnabled()) { - logger.trace(String.format("Cleaning up new agent connection for %s", entry.getKey())); ++ logger.trace("Cleaning up new agent connection for {}", entry.getKey()); + } + newAgentConnections.remove(entry.getKey()); + } + } + } + } + protected class BehindOnPingListener implements Listener { @Override public boolean isRecurring() { @@@ -1992,15 -1988,12 +2058,15 @@@ @Override public void processConnect(final Host host, final StartupCommand cmd, final boolean forRebalance) { - if (cmd instanceof StartupRoutingCommand) { - if (((StartupRoutingCommand)cmd).getHypervisorType() == HypervisorType.KVM || ((StartupRoutingCommand)cmd).getHypervisorType() == HypervisorType.LXC) { - Map<String, String> params = new HashMap<String, String>(); - params.put(Config.RouterAggregationCommandEachTimeout.toString(), _configDao.getValue(Config.RouterAggregationCommandEachTimeout.toString())); - params.put(Config.MigrateWait.toString(), _configDao.getValue(Config.MigrateWait.toString())); - params.put(NetworkOrchestrationService.TUNGSTEN_ENABLED.key(), String.valueOf(NetworkOrchestrationService.TUNGSTEN_ENABLED.valueIn(host.getDataCenterId()))); + if (!(cmd instanceof StartupRoutingCommand) || cmd.isConnectionTransferred()) { + return; + } + + if (((StartupRoutingCommand)cmd).getHypervisorType() == HypervisorType.KVM || ((StartupRoutingCommand)cmd).getHypervisorType() == HypervisorType.LXC) { - Map<String, String> params = new HashMap<String, String>(); ++ Map<String, String> params = new HashMap<>(); + params.put(Config.RouterAggregationCommandEachTimeout.toString(), _configDao.getValue(Config.RouterAggregationCommandEachTimeout.toString())); + params.put(Config.MigrateWait.toString(), _configDao.getValue(Config.MigrateWait.toString())); + params.put(NetworkOrchestrationService.TUNGSTEN_ENABLED.key(), String.valueOf(NetworkOrchestrationService.TUNGSTEN_ENABLED.valueIn(host.getDataCenterId()))); try { SetHostParamsCommand cmds = new SetHostParamsCommand(params); @@@ -2042,13 -2037,13 +2108,13 @@@ if (allHosts == null) { return null; } -- Map<Long, List<Long>> hostsByZone = new HashMap<Long, List<Long>>(); ++ Map<Long, List<Long>> hostsByZone = new HashMap<>(); for (HostVO host : allHosts) { if (host.getHypervisorType() == HypervisorType.KVM || host.getHypervisorType() == HypervisorType.LXC) { Long zoneId = host.getDataCenterId(); List<Long> hostIds = hostsByZone.get(zoneId); if (hostIds == null) { -- hostIds = new ArrayList<Long>(); ++ hostIds = new ArrayList<>(); } hostIds.add(host.getId()); hostsByZone.put(zoneId, hostIds); diff --cc engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java index 732ce9d61f5,dd3666e5561..c667df5412e --- a/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java +++ b/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java @@@ -47,16 -47,15 +47,17 @@@ import org.apache.cloudstack.framework. import org.apache.cloudstack.framework.config.ConfigKey; import org.apache.cloudstack.framework.config.dao.ConfigurationDao; import org.apache.cloudstack.ha.dao.HAConfigDao; +import org.apache.cloudstack.maintenance.ManagementServerMaintenanceManager; +import org.apache.cloudstack.maintenance.command.BaseShutdownManagementServerHostCommand; +import org.apache.cloudstack.maintenance.command.CancelMaintenanceManagementServerHostCommand; +import org.apache.cloudstack.maintenance.command.CancelShutdownManagementServerHostCommand; +import org.apache.cloudstack.maintenance.command.PrepareForMaintenanceManagementServerHostCommand; +import org.apache.cloudstack.maintenance.command.PrepareForShutdownManagementServerHostCommand; +import org.apache.cloudstack.maintenance.command.TriggerShutdownManagementServerHostCommand; import org.apache.cloudstack.managed.context.ManagedContextRunnable; import org.apache.cloudstack.managed.context.ManagedContextTimerTask; + import org.apache.cloudstack.management.ManagementServerHost; import org.apache.cloudstack.outofbandmanagement.dao.OutOfBandManagementDao; -import org.apache.cloudstack.shutdown.ShutdownManager; -import org.apache.cloudstack.shutdown.command.BaseShutdownManagementServerHostCommand; -import org.apache.cloudstack.shutdown.command.CancelShutdownManagementServerHostCommand; -import org.apache.cloudstack.shutdown.command.PrepareForShutdownManagementServerHostCommand; -import org.apache.cloudstack.shutdown.command.TriggerShutdownManagementServerHostCommand; import org.apache.cloudstack.utils.identity.ManagementServerNode; import org.apache.cloudstack.utils.security.SSLUtils; @@@ -107,14 -100,14 +105,16 @@@ import com.cloud.utils.nio.Link import com.cloud.utils.nio.Task; import com.google.gson.Gson; ++import org.apache.commons.collections.CollectionUtils; ++ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements ClusterManagerListener, ClusteredAgentRebalanceService { - private static final ScheduledExecutorService s_transferExecutor = Executors.newScheduledThreadPool(2, new NamedThreadFactory("Cluster-AgentRebalancingExecutor")); + private static ScheduledExecutorService s_transferExecutor = Executors.newScheduledThreadPool(2, new NamedThreadFactory("Cluster-AgentRebalancingExecutor")); private final long rebalanceTimeOut = 300000; // 5 mins - after this time remove the agent from the transfer list public final static long STARTUP_DELAY = 5000; public final static long SCAN_INTERVAL = 90000; // 90 seconds, it takes 60 sec for xenserver to fail login public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds -- protected Set<Long> _agentToTransferIds = new HashSet<Long>(); ++ protected Set<Long> _agentToTransferIds = new HashSet<>(); Gson _gson; protected HashMap<String, SocketChannel> _peers; protected HashMap<String, SSLEngine> _sslEngines; @@@ -151,17 -139,17 +151,17 @@@ super(); } -- protected final ConfigKey<Boolean> EnableLB = new ConfigKey<Boolean>(Boolean.class, "agent.lb.enabled", "Advanced", "false", "Enable agent load balancing between management server nodes", true); -- protected final ConfigKey<Double> ConnectedAgentThreshold = new ConfigKey<Double>(Double.class, "agent.load.threshold", "Advanced", "0.7", ++ protected final ConfigKey<Boolean> EnableLB = new ConfigKey<>(Boolean.class, "agent.lb.enabled", "Advanced", "false", "Enable agent load balancing between management server nodes", true); ++ protected final ConfigKey<Double> ConnectedAgentThreshold = new ConfigKey<>(Double.class, "agent.load.threshold", "Advanced", "0.7", "What percentage of the agents can be held by one management server before load balancing happens", true, EnableLB.key()); -- protected final ConfigKey<Integer> LoadSize = new ConfigKey<Integer>(Integer.class, "direct.agent.load.size", "Advanced", "16", "How many agents to connect to in each round", true); -- protected final ConfigKey<Integer> ScanInterval = new ConfigKey<Integer>(Integer.class, "direct.agent.scan.interval", "Advanced", "90", "Interval between scans to load agents", false, ++ protected final ConfigKey<Integer> LoadSize = new ConfigKey<>(Integer.class, "direct.agent.load.size", "Advanced", "16", "How many agents to connect to in each round", true); ++ protected final ConfigKey<Integer> ScanInterval = new ConfigKey<>(Integer.class, "direct.agent.scan.interval", "Advanced", "90", "Interval between scans to load agents", false, ConfigKey.Scope.Global, 1000); @Override public boolean configure(final String name, final Map<String, Object> xmlParams) throws ConfigurationException { -- _peers = new HashMap<String, SocketChannel>(7); -- _sslEngines = new HashMap<String, SSLEngine>(7); ++ _peers = new HashMap<>(7); ++ _sslEngines = new HashMap<>(7); _nodeId = ManagementServerNode.getManagementServerId(); logger.info("Configuring ClusterAgentManagerImpl. management server node id(msid): {}", _nodeId); @@@ -220,7 -201,7 +220,7 @@@ if (hosts != null) { hosts.addAll(appliances); -- if (hosts.size() > 0) { ++ if (!hosts.isEmpty()) { logger.debug("Found {} unmanaged direct hosts, processing connect for them...", hosts.size()); for (final HostVO host : hosts) { try { @@@ -267,10 -246,10 +265,10 @@@ logger.debug("create forwarding ClusteredAgentAttache for {}", host); long id = host.getId(); final AgentAttache attache = new ClusteredAgentAttache(this, id, host.getUuid(), host.getName()); -- AgentAttache old = null; ++ AgentAttache old; synchronized (_agents) { - old = _agents.get(id); - _agents.put(id, attache); + old = _agents.get(host.getId()); + _agents.put(host.getId(), attache); } if (old != null) { logger.debug("Remove stale agent attache from current management server"); @@@ -284,7 -263,7 +282,7 @@@ logger.debug("create ClusteredAgentAttache for {}", host); final AgentAttache attache = new ClusteredAgentAttache(this, host.getId(), host.getUuid(), host.getName(), link, host.isInMaintenanceStates()); link.attach(attache); -- AgentAttache old = null; ++ AgentAttache old; synchronized (_agents) { old = _agents.get(host.getId()); _agents.put(host.getId(), attache); @@@ -299,7 -278,7 +297,7 @@@ protected AgentAttache createAttacheForDirectConnect(final Host host, final ServerResource resource) { logger.debug("Create ClusteredDirectAgentAttache for {}.", host); final DirectAgentAttache attache = new ClusteredDirectAgentAttache(this, host.getId(), host.getUuid(), host.getName(), _nodeId, resource, host.isInMaintenanceStates()); -- AgentAttache old = null; ++ AgentAttache old; synchronized (_agents) { old = _agents.get(host.getId()); _agents.put(host.getId(), attache); @@@ -418,12 -397,12 +416,12 @@@ public boolean routeToPeer(final String peer, final byte[] bytes) { int i = 0; SocketChannel ch = null; -- SSLEngine sslEngine = null; ++ SSLEngine sslEngine; while (i++ < 5) { ch = connectToPeer(peer, ch); if (ch == null) { try { -- logD(bytes, "Unable to route to peer: " + Request.parse(bytes).toString()); ++ logD(bytes, "Unable to route to peer: " + Request.parse(bytes)); } catch (ClassNotFoundException | UnsupportedVersionException e) { // Request.parse thrown exception when we try to log it, log as much as we can logD(bytes, "Unable to route to peer, and Request.parse further caught exception" + e.getMessage()); @@@ -441,7 -420,7 +439,7 @@@ return true; } catch (final IOException e) { try { -- logI(bytes, "Unable to route to peer: " + Request.parse(bytes).toString() + " due to " + e.getMessage()); ++ logI(bytes, "Unable to route to peer: " + Request.parse(bytes) + " due to " + e.getMessage()); } catch (ClassNotFoundException | UnsupportedVersionException ex) { // Request.parse thrown exception when we try to log it, log as much as we can logI(bytes, "Unable to route to peer due to" + e.getMessage() + ". Also caught exception when parsing request: " + ex.getMessage()); @@@ -484,7 -463,7 +482,7 @@@ public SocketChannel connectToPeer(final String peerName, final SocketChannel prevCh) { synchronized (_peers) { final SocketChannel ch = _peers.get(peerName); -- SSLEngine sslEngine = null; ++ SSLEngine sslEngine; if (prevCh != null) { try { prevCh.close(); @@@ -617,9 -596,9 +615,8 @@@ @Override protected void doTask(final Task task) throws TaskExecutionException { -- final TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB); -- try { -- if (task.getType() != Task.Type.DATA) { ++ try (TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB)) { ++ if (task.getType() != Type.DATA) { super.doTask(task); return; } @@@ -646,7 -625,7 +643,7 @@@ } final Request req = Request.parse(data); final Command[] cmds = req.getCommands(); -- final CancelCommand cancel = (CancelCommand)cmds[0]; ++ final CancelCommand cancel = (CancelCommand) cmds[0]; logD(data, "Cancel request received"); agent.cancel(cancel.getSequence()); final Long current = agent._currentSequence; @@@ -670,10 -649,10 +667,9 @@@ // to deserialize this and send it through the agent attache. final Request req = Request.parse(data); agent.send(req, null); -- return; } else { if (agent instanceof Routable) { -- final Routable cluster = (Routable)agent; ++ final Routable cluster = (Routable) agent; cluster.routeToAgent(data); } else { agent.send(Request.parse(data)); @@@ -690,13 -669,13 +686,12 @@@ if (mgmtId != -1 && mgmtId != _nodeId) { routeToPeer(Long.toString(mgmtId), data); if (Request.requiresSequentialExecution(data)) { -- final AgentAttache attache = (AgentAttache)link.attachment(); ++ final AgentAttache attache = (AgentAttache) link.attachment(); if (attache != null) { attache.sendNext(Request.getSequence(data)); } -- logD(data, "No attache to process " + Request.parse(data).toString()); ++ logD(data, "No attache to process " + Request.parse(data)); } -- return; } else { if (Request.isRequest(data)) { super.doTask(task); @@@ -712,7 -691,7 +707,6 @@@ logger.info("SeqA {}-{}: Response is not processed: {}", attache.getId(), response.getSequence(), response.toString()); } } -- return; } } } catch (final ClassNotFoundException e) { @@@ -723,8 -702,8 +717,6 @@@ final String message = String.format("UnsupportedVersionException occurred when executing tasks! Error '%s'", e.getMessage()); logger.error(message); throw new TaskExecutionException(message, e); -- } finally { -- txn.close(); } } } @@@ -761,17 -740,12 +753,17 @@@ @Override public boolean executeRebalanceRequest(final long agentId, final long currentOwnerId, final long futureOwnerId, final Event event) throws AgentUnavailableException, OperationTimedoutException { + return executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event, false); + } + + @Override + public boolean executeRebalanceRequest(final long agentId, final long currentOwnerId, final long futureOwnerId, final Event event, boolean isConnectionTransfer) throws AgentUnavailableException, OperationTimedoutException { boolean result = false; if (event == Event.RequestAgentRebalance) { -- return setToWaitForRebalance(agentId, currentOwnerId, futureOwnerId); ++ return setToWaitForRebalance(agentId); } else if (event == Event.StartAgentRebalance) { try { - result = rebalanceHost(agentId, currentOwnerId, futureOwnerId); + result = rebalanceHost(agentId, currentOwnerId, futureOwnerId, isConnectionTransfer); } catch (final Exception e) { logger.warn("Unable to rebalance host id={} ({})", agentId, findAttache(agentId), e); } @@@ -823,7 -797,7 +815,7 @@@ sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing); final List<HostVO> allManagedAgents = sc.list(); -- int avLoad = 0; ++ int avLoad; if (!allManagedAgents.isEmpty() && !allMS.isEmpty()) { avLoad = allManagedAgents.size() / allMS.size(); @@@ -841,7 -815,7 +833,7 @@@ for (final ManagementServerHostVO node : allMS) { if (node.getMsid() != _nodeId) { -- List<HostVO> hostsToRebalance = new ArrayList<HostVO>(); ++ List<HostVO> hostsToRebalance = new ArrayList<>(); for (final AgentLoadBalancerPlanner lbPlanner : _lbPlanners) { hostsToRebalance = lbPlanner.getHostsToRebalance(node, avLoad); if (hostsToRebalance != null && !hostsToRebalance.isEmpty()) { @@@ -867,7 -841,7 +859,7 @@@ HostTransferMapVO transfer = null; try { transfer = _hostTransferDao.startAgentTransfering(hostId, node.getMsid(), _nodeId); -- final Answer[] answer = sendRebalanceCommand(node.getMsid(), hostId, node.getMsid(), _nodeId, Event.RequestAgentRebalance); ++ final Answer[] answer = sendRebalanceCommand(node.getMsid(), hostId, node.getMsid(), _nodeId); if (answer == null) { logger.warn("Failed to get host {} from management server {}", host, node); result = false; @@@ -894,12 -868,8 +886,12 @@@ } } -- private Answer[] sendRebalanceCommand(final long peer, final long agentId, final long currentOwnerId, final long futureOwnerId, final Event event) { - return sendRebalanceCommand(peer, agentId, currentOwnerId, futureOwnerId, event, false); - final TransferAgentCommand transfer = new TransferAgentCommand(agentId, currentOwnerId, futureOwnerId, event); ++ private Answer[] sendRebalanceCommand(final long peer, final long agentId, final long currentOwnerId, final long futureOwnerId) { ++ return sendRebalanceCommand(peer, agentId, currentOwnerId, futureOwnerId, Event.RequestAgentRebalance, false); + } + + private Answer[] sendRebalanceCommand(final long peer, final long agentId, final long currentOwnerId, final long futureOwnerId, final Event event, final boolean isConnectionTransfer) { + final TransferAgentCommand transfer = new TransferAgentCommand(agentId, currentOwnerId, futureOwnerId, event, isConnectionTransfer); final Commands commands = new Commands(Command.OnError.Stop); commands.addCommand(transfer); @@@ -910,8 -880,8 +902,7 @@@ final String peerName = Long.toString(peer); final String cmdStr = _gson.toJson(cmds); final String ansStr = _clusterMgr.execute(peerName, agentId, cmdStr, true); -- final Answer[] answers = _gson.fromJson(ansStr, Answer[].class); -- return answers; ++ return _gson.fromJson(ansStr, Answer[].class); } catch (final Exception e) { logger.warn("Caught exception while talking to {}", currentOwnerId, e); return null; @@@ -960,7 -930,7 +951,7 @@@ try { logger.trace("Clustered agent transfer scan check, management server id: {}", _nodeId); synchronized (_agentToTransferIds) { -- if (_agentToTransferIds.size() > 0) { ++ if (!_agentToTransferIds.isEmpty()) { logger.debug("Found {} agents to transfer", _agentToTransferIds.size()); // for (Long hostId : _agentToTransferIds) { for (final Iterator<Long> iterator = _agentToTransferIds.iterator(); iterator.hasNext();) { @@@ -984,7 -954,7 +975,7 @@@ } if (transferMap.getInitialOwner() != _nodeId || attache == null || attache.forForward()) { -- logger.debug(String.format("Management server %d doesn't own host id=%d (%s) any more, skipping rebalance for the host", _nodeId, hostId, attache)); ++ logger.debug("Management server {} doesn't own host id={} ({}) any more, skipping rebalance for the host", _nodeId, hostId, attache); iterator.remove(); _hostTransferDao.completeAgentTransfer(hostId); continue; @@@ -1004,9 -974,9 +995,7 @@@ _executor.execute(new RebalanceTask(hostId, transferMap.getInitialOwner(), transferMap.getFutureOwner())); } catch (final RejectedExecutionException ex) { logger.warn("Failed to submit rebalance task for host id={} ({}); postponing the execution", hostId, attache); -- continue; } -- } else { logger.debug("Agent {} ({}) can't be transferred yet as its request queue size is {} and listener queue size is {}", hostId, attache, attache.getQueueSize(), attache.getNonRecurringListenersSize()); @@@ -1016,7 -986,7 +1005,6 @@@ logger.trace("Found no agents to be transferred by the management server {}", _nodeId); } } -- } catch (final Throwable e) { logger.error("Problem with the clustered agent transfer scan check!", e); } @@@ -1024,7 -994,7 +1012,7 @@@ }; } -- private boolean setToWaitForRebalance(final long hostId, final long currentOwnerId, final long futureOwnerId) { ++ private boolean setToWaitForRebalance(final long hostId) { logger.debug("Adding agent {} ({}) to the list of agents to transfer", hostId, findAttache(hostId)); synchronized (_agentToTransferIds) { return _agentToTransferIds.add(hostId); @@@ -1096,12 -1063,12 +1084,12 @@@ protected void finishRebalance(final long hostId, final long futureOwnerId, final Event event) { -- final boolean success = event == Event.RebalanceCompleted ? true : false; ++ final boolean success = event == Event.RebalanceCompleted; final AgentAttache attache = findAttache(hostId); logger.debug("Finishing rebalancing for the agent {} ({}) with event {}", hostId, attache, event); -- if (attache == null || !(attache instanceof ClusteredAgentAttache)) { ++ if (!(attache instanceof ClusteredAgentAttache)) { logger.debug("Unable to find forward attache for the host id={} assuming that the agent disconnected already", hostId); _hostTransferDao.completeAgentTransfer(hostId); return; @@@ -1197,9 -1164,9 +1185,9 @@@ } protected class RebalanceTask extends ManagedContextRunnable { -- Long hostId = null; -- Long currentOwnerId = null; -- Long futureOwnerId = null; ++ Long hostId; ++ Long currentOwnerId; ++ Long futureOwnerId; public RebalanceTask(final long hostId, final long currentOwnerId, final long futureOwnerId) { this.hostId = hostId; @@@ -1268,7 -1235,7 +1256,7 @@@ final ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0]; logger.debug("Intercepting command for agent change: agent {} event: {}", cmd.getAgentId(), cmd.getEvent()); -- boolean result = false; ++ boolean result; try { result = executeAgentUserRequest(cmd.getAgentId(), cmd.getEvent()); logger.debug("Result is {}", result); @@@ -1284,10 -1251,10 +1272,10 @@@ } else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) { final TransferAgentCommand cmd = (TransferAgentCommand)cmds[0]; - logger.debug("Intercepting command for agent rebalancing: agent {} event: {}", cmd.getAgentId(), cmd.getEvent()); - boolean result = false; + logger.debug("Intercepting command for agent rebalancing: agent: {}, event: {}, connection transfer: {}", cmd.getAgentId(), cmd.getEvent(), cmd.isConnectionTransfer()); - boolean result = false; ++ boolean result; try { - result = rebalanceAgent(cmd.getAgentId(), cmd.getEvent(), cmd.getCurrentOwner(), cmd.getFutureOwner()); + result = rebalanceAgent(cmd.getAgentId(), cmd.getEvent(), cmd.getCurrentOwner(), cmd.getFutureOwner(), cmd.isConnectionTransfer()); logger.debug("Result is {}", result); } catch (final AgentUnavailableException e) { @@@ -1305,7 -1272,7 +1293,7 @@@ logger.debug("Intercepting command to propagate event {} for host {} ({})", () -> cmd.getEvent().name(), cmd::getHostId, () -> _hostDao.findById(cmd.getHostId())); -- boolean result = false; ++ boolean result; try { result = _resourceMgr.executeUserRequest(cmd.getHostId(), cmd.getEvent()); logger.debug("Result is {}", result); @@@ -1400,133 -1349,6 +1388,127 @@@ } } + @Override + public boolean transferDirectAgentsFromMS(String fromMsUuid, long fromMsId, long timeoutDurationInMs) { + if (timeoutDurationInMs <= 0) { - logger.debug(String.format("Not transferring direct agents from management server node %d (id: %s) to other nodes, invalid timeout duration", fromMsId, fromMsUuid)); ++ logger.debug("Not transferring direct agents from management server node {} (id: {}) to other nodes, invalid timeout duration", fromMsId, fromMsUuid); + return false; + } + + long transferStartTime = System.currentTimeMillis(); + if (CollectionUtils.isEmpty(getDirectAgentHosts(fromMsId))) { - logger.info(String.format("No direct agent hosts available on management server node %d (id: %s), to transfer", fromMsId, fromMsUuid)); ++ logger.info("No direct agent hosts available on management server node {} (id: {}), to transfer", fromMsId, fromMsUuid); + return true; + } + + List<ManagementServerHostVO> msHosts = getUpMsHostsExcludingMs(fromMsId); + if (msHosts.isEmpty()) { - logger.warn(String.format("No management server nodes available to transfer agents from management server node %d (id: %s)", fromMsId, fromMsUuid)); ++ logger.warn("No management server nodes available to transfer agents from management server node {} (id: {})", fromMsId, fromMsUuid); + return false; + } + - logger.debug(String.format("Transferring direct agents from management server node %d (id: %s) to other nodes", fromMsId, fromMsUuid)); ++ logger.debug("Transferring direct agents from management server node {} (id: {}) to other nodes", fromMsId, fromMsUuid); + int agentTransferFailedCount = 0; + List<DataCenterVO> dataCenterList = dcDao.listAll(); + for (DataCenterVO dc : dataCenterList) { + List<HostVO> directAgentHostsInDc = getDirectAgentHostsInDc(fromMsId, dc.getId()); + if (CollectionUtils.isEmpty(directAgentHostsInDc)) { + continue; + } - logger.debug(String.format("Transferring %d direct agents from management server node %d (id: %s) of zone %s", directAgentHostsInDc.size(), fromMsId, fromMsUuid, dc.toString())); ++ logger.debug("Transferring {} direct agents from management server node {} (id: {}) of zone {}", directAgentHostsInDc.size(), fromMsId, fromMsUuid, dc); + for (HostVO host : directAgentHostsInDc) { + long transferElapsedTimeInMs = System.currentTimeMillis() - transferStartTime; + if (transferElapsedTimeInMs >= timeoutDurationInMs) { - logger.debug(String.format("Stop transferring remaining direct agents from management server node %d (id: %s), timed out", fromMsId, fromMsUuid)); ++ logger.debug("Stop transferring remaining direct agents from management server node {} (id: {}), timed out", fromMsId, fromMsUuid); + return false; + } + + try { + if (_mshostCounter >= msHosts.size()) { + _mshostCounter = 0; + } + ManagementServerHostVO msHost = msHosts.get(_mshostCounter % msHosts.size()); + _mshostCounter++; + + _hostTransferDao.startAgentTransfering(host.getId(), fromMsId, msHost.getMsid()); + if (!rebalanceAgent(host.getId(), Event.StartAgentRebalance, fromMsId, msHost.getMsid(), true)) { + agentTransferFailedCount++; + } else { + updateLastManagementServer(host.getId(), fromMsId); + } + } catch (Exception e) { - logger.warn(String.format("Failed to transfer direct agent of the host %s from management server node %d (id: %s), due to %s", host, fromMsId, fromMsUuid, e.getMessage())); ++ logger.warn("Failed to transfer direct agent of the host {} from management server node {} (id: {}), due to {}", host, fromMsId, fromMsUuid, e.getMessage()); + } + } + } + + return (agentTransferFailedCount == 0); + } + + private List<HostVO> getDirectAgentHosts(long msId) { + List<HostVO> directAgentHosts = new ArrayList<>(); + List<HostVO> hosts = _hostDao.listHostsByMs(msId); + for (HostVO host : hosts) { + AgentAttache agent = findAttache(host.getId()); - if (agent != null && agent instanceof DirectAgentAttache) { ++ if (agent instanceof DirectAgentAttache) { + directAgentHosts.add(host); + } + } + + return directAgentHosts; + } + + private List<HostVO> getDirectAgentHostsInDc(long msId, long dcId) { + List<HostVO> directAgentHosts = new ArrayList<>(); + List<HostVO> hosts = _hostDao.listHostsByMsAndDc(msId, dcId); + for (HostVO host : hosts) { + AgentAttache agent = findAttache(host.getId()); - if (agent != null && agent instanceof DirectAgentAttache) { ++ if (agent instanceof DirectAgentAttache) { + directAgentHosts.add(host); + } + } + + return directAgentHosts; + } + + private List<ManagementServerHostVO> getUpMsHostsExcludingMs(long avoidMsId) { + final List<ManagementServerHostVO> msHosts = _mshostDao.listBy(ManagementServerHost.State.Up); - Iterator<ManagementServerHostVO> iterator = msHosts.iterator(); - while (iterator.hasNext()) { - ManagementServerHostVO ms = iterator.next(); - if (ms.getMsid() == avoidMsId || _mshostPeerDao.findByPeerMsAndState(ms.getId(), ManagementServerHost.State.Up) == null) { - iterator.remove(); - } - } ++ msHosts.removeIf(ms -> ms.getMsid() == avoidMsId || _mshostPeerDao.findByPeerMsAndState(ms.getId(), ManagementServerHost.State.Up) == null); + + return msHosts; + } + + private void updateLastManagementServer(long hostId, long msId) { + HostVO hostVO = _hostDao.findById(hostId); + if (hostVO != null) { + hostVO.setLastManagementServerId(msId); + _hostDao.update(hostId, hostVO); + } + } + + @Override + public void onManagementServerMaintenance() { + logger.debug("Management server maintenance enabled"); + s_transferExecutor.shutdownNow(); + cleanupTransferMap(_nodeId); + _agentLbHappened = false; + super.onManagementServerMaintenance(); + } + + @Override + public void onManagementServerCancelMaintenance() { + logger.debug("Management server maintenance disabled"); + super.onManagementServerCancelMaintenance(); + if (isAgentRebalanceEnabled()) { + cleanupTransferMap(_nodeId); + if (s_transferExecutor.isShutdown()) { + s_transferExecutor = Executors.newScheduledThreadPool(2, new NamedThreadFactory("Cluster-AgentRebalancingExecutor")); + s_transferExecutor.scheduleAtFixedRate(getAgentRebalanceScanTask(), 60000, 60000, TimeUnit.MILLISECONDS); + s_transferExecutor.scheduleAtFixedRate(getTransferScanTask(), 60000, ClusteredAgentRebalanceService.DEFAULT_TRANSFER_CHECK_INTERVAL, TimeUnit.MILLISECONDS); + } + } + } + public boolean executeAgentUserRequest(final long agentId, final Event event) throws AgentUnavailableException { return executeUserRequest(agentId, event); } @@@ -1593,8 -1411,8 +1575,7 @@@ public ConfigKey<?>[] getConfigKeys() { final ConfigKey<?>[] keys = super.getConfigKeys(); -- final List<ConfigKey<?>> keysLst = new ArrayList<ConfigKey<?>>(); -- keysLst.addAll(Arrays.asList(keys)); ++ final List<ConfigKey<?>> keysLst = new ArrayList<>(Arrays.asList(keys)); keysLst.add(EnableLB); keysLst.add(ConnectedAgentThreshold); keysLst.add(LoadSize); diff --cc engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java index 4e1be3ae0fb,94a16497e87..54146e55049 --- a/engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java +++ b/engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java @@@ -124,9 -126,8 +126,10 @@@ public class HostDaoImpl extends Generi protected SearchBuilder<HostVO> UnmanagedApplianceSearch; protected SearchBuilder<HostVO> MaintenanceCountSearch; protected SearchBuilder<HostVO> HostTypeCountSearch; + protected SearchBuilder<HostVO> ResponsibleMsSearch; + protected SearchBuilder<HostVO> ResponsibleMsDcSearch; + protected GenericSearchBuilder<HostVO, String> ResponsibleMsIdSearch; + protected SearchBuilder<HostVO> HostTypeClusterCountSearch; - protected SearchBuilder<HostVO> ResponsibleMsCountSearch; protected SearchBuilder<HostVO> HostTypeZoneCountSearch; protected SearchBuilder<HostVO> ClusterStatusSearch; protected SearchBuilder<HostVO> TypeNameZoneSearch; @@@ -189,22 -189,21 +191,31 @@@ HostTypeCountSearch = createSearchBuilder(); HostTypeCountSearch.and("type", HostTypeCountSearch.entity().getType(), SearchCriteria.Op.EQ); + HostTypeCountSearch.and("zoneId", HostTypeCountSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ); + HostTypeCountSearch.and("resourceState", HostTypeCountSearch.entity().getResourceState(), SearchCriteria.Op.EQ); HostTypeCountSearch.done(); - ResponsibleMsCountSearch = createSearchBuilder(); - ResponsibleMsCountSearch.and("managementServerId", ResponsibleMsCountSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ); - ResponsibleMsCountSearch.done(); + ResponsibleMsSearch = createSearchBuilder(); + ResponsibleMsSearch.and("managementServerId", ResponsibleMsSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ); + ResponsibleMsSearch.done(); + + ResponsibleMsDcSearch = createSearchBuilder(); + ResponsibleMsDcSearch.and("managementServerId", ResponsibleMsDcSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ); + ResponsibleMsDcSearch.and("dcId", ResponsibleMsDcSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ); + ResponsibleMsDcSearch.done(); + + ResponsibleMsIdSearch = createSearchBuilder(String.class); + ResponsibleMsIdSearch.selectFields(ResponsibleMsIdSearch.entity().getUuid()); + ResponsibleMsIdSearch.and("managementServerId", ResponsibleMsIdSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ); + ResponsibleMsIdSearch.done(); + HostTypeClusterCountSearch = createSearchBuilder(); + HostTypeClusterCountSearch.and("cluster", HostTypeClusterCountSearch.entity().getClusterId(), SearchCriteria.Op.EQ); + HostTypeClusterCountSearch.and("type", HostTypeClusterCountSearch.entity().getType(), SearchCriteria.Op.EQ); + HostTypeClusterCountSearch.and("status", HostTypeClusterCountSearch.entity().getStatus(), SearchCriteria.Op.IN); + HostTypeClusterCountSearch.and("removed", HostTypeClusterCountSearch.entity().getRemoved(), SearchCriteria.Op.NULL); + HostTypeClusterCountSearch.done(); + HostTypeZoneCountSearch = createSearchBuilder(); HostTypeZoneCountSearch.and("type", HostTypeZoneCountSearch.entity().getType(), SearchCriteria.Op.EQ); HostTypeZoneCountSearch.and("dc", HostTypeZoneCountSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ); @@@ -1457,16 -1535,9 +1562,16 @@@ return getCount(sc); } + @Override + public List<String> listByMs(long msId) { + SearchCriteria<String> sc = ResponsibleMsIdSearch.create(); + sc.addAnd("managementServerId", SearchCriteria.Op.EQ, msId); + return customSearch(sc, null); + } + @Override public List<String> listOrderedHostsHypervisorVersionsInDatacenter(long datacenterId, HypervisorType hypervisorType) { - PreparedStatement pstmt = null; + PreparedStatement pstmt; List<String> result = new ArrayList<>(); try { TransactionLegacy txn = TransactionLegacy.currentTxn(); diff --cc server/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImpl.java index 027a0530383,84c3081bfc1..1f0f439d819 --- a/server/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImpl.java +++ b/server/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImpl.java @@@ -23,7 -22,6 +22,7 @@@ import java.util.Comparator import java.util.HashMap; import java.util.List; import java.util.Map; ++import java.util.EnumSet; import javax.inject.Inject; import javax.naming.ConfigurationException; @@@ -37,22 -36,16 +37,22 @@@ import org.apache.commons.lang3.StringU import com.cloud.agent.AgentManager; import com.cloud.agent.api.Answer; +import com.cloud.agent.api.MigrateAgentConnectionCommand; +import com.cloud.cluster.ManagementServerHostVO; +import com.cloud.cluster.dao.ManagementServerHostDao; import com.cloud.dc.DataCenterVO; + import com.cloud.dc.dao.ClusterDao; import com.cloud.dc.dao.DataCenterDao; import com.cloud.host.Host; +import com.cloud.host.HostVO; import com.cloud.host.dao.HostDao; import com.cloud.hypervisor.Hypervisor; import com.cloud.resource.ResourceState; import com.cloud.utils.component.ComponentLifecycleBase; import com.cloud.utils.exception.CloudRuntimeException; +import org.apache.commons.collections.CollectionUtils; - import org.apache.commons.lang3.StringUtils; + public class IndirectAgentLBServiceImpl extends ComponentLifecycleBase implements IndirectAgentLB, Configurable { public static final ConfigKey<String> IndirectAgentLBAlgorithm = new ConfigKey<>(String.class, @@@ -70,65 -67,52 +74,74 @@@ @Inject private HostDao hostDao; @Inject + private DataCenterDao dcDao; + @Inject + private ManagementServerHostDao mshostDao; + @Inject private AgentManager agentManager; + private static final List<ResourceState> agentValidResourceStates = List.of( + ResourceState.Enabled, ResourceState.Maintenance, ResourceState.Disabled, + ResourceState.ErrorInMaintenance, ResourceState.PrepareForMaintenance); + private static final List<Host.Type> agentValidHostTypes = List.of(Host.Type.Routing, Host.Type.ConsoleProxy, + Host.Type.SecondaryStorage, Host.Type.SecondaryStorageVM); + private static final List<Hypervisor.HypervisorType> agentValidHypervisorTypes = List.of( + Hypervisor.HypervisorType.KVM, Hypervisor.HypervisorType.LXC); + ////////////////////////////////////////////////////// /////////////// Agent MSLB Methods /////////////////// ////////////////////////////////////////////////////// + @Override + public List<String> getManagementServerList() { + final String msServerAddresses = ApiServiceConfiguration.ManagementServerAddresses.value(); + if (StringUtils.isEmpty(msServerAddresses)) { + throw new CloudRuntimeException(String.format("No management server addresses are defined in '%s' setting", + ApiServiceConfiguration.ManagementServerAddresses.key())); + } + - List<String> msList = new ArrayList<>(Arrays.asList(msServerAddresses.replace(" ", "").split(","))); - return msList; ++ return new ArrayList<>(Arrays.asList(msServerAddresses.replace(" ", "").split(","))); + } + @Override public List<String> getManagementServerList(final Long hostId, final Long dcId, final List<Long> orderedHostIdList) { + return getManagementServerList(hostId, dcId, orderedHostIdList, null); + } + + @Override + public List<String> getManagementServerList(final Long hostId, final Long dcId, final List<Long> orderedHostIdList, String lbAlgorithm) { final String msServerAddresses = ApiServiceConfiguration.ManagementServerAddresses.value(); if (StringUtils.isEmpty(msServerAddresses)) { throw new CloudRuntimeException(String.format("No management server addresses are defined in '%s' setting", ApiServiceConfiguration.ManagementServerAddresses.key())); } + + final List<String> msList = Arrays.asList(msServerAddresses.replace(" ", "").split(",")); + if (msList.size() == 1) { + return msList; + } + - final org.apache.cloudstack.agent.lb.IndirectAgentLBAlgorithm algorithm = getAgentMSLBAlgorithm(); ++ final org.apache.cloudstack.agent.lb.IndirectAgentLBAlgorithm algorithm = getAgentMSLBAlgorithm(lbAlgorithm); List<Long> hostIdList = orderedHostIdList; if (hostIdList == null) { - hostIdList = getOrderedHostIdList(dcId); + hostIdList = algorithm.isHostListNeeded() ? getOrderedHostIdList(dcId) : new ArrayList<>(); } // just in case we have a host in creating state make sure it is in the list: if (null != hostId && ! hostIdList.contains(hostId)) { - if (logger.isTraceEnabled()) { - logger.trace("adding requested host to host list as it does not seem to be there; " + hostId); - } + logger.trace("adding requested host to host list as it does not seem to be there; {}", hostId); hostIdList.add(hostId); } + - final org.apache.cloudstack.agent.lb.IndirectAgentLBAlgorithm algorithm = getAgentMSLBAlgorithm(lbAlgorithm); - final List<String> msList = Arrays.asList(msServerAddresses.replace(" ", "").split(",")); return algorithm.sort(msList, hostIdList, hostId); } @Override public boolean compareManagementServerList(final Long hostId, final Long dcId, final List<String> receivedMSHosts, final String lbAlgorithm) { -- if (receivedMSHosts == null || receivedMSHosts.size() < 1) { ++ if (receivedMSHosts == null || receivedMSHosts.isEmpty()) { return false; } -- if (getLBAlgorithmName() != lbAlgorithm) { ++ if (!getLBAlgorithmName().equals(lbAlgorithm)) { return false; } final List<String> expectedMSList = getManagementServerList(hostId, dcId, null); @@@ -162,114 -136,18 +165,119 @@@ return hostIdList; } + private List<Host> getAllAgentBasedHosts() { + final List<HostVO> allHosts = hostDao.listAll(); + if (allHosts == null) { + return new ArrayList<>(); + } + final List <Host> agentBasedHosts = new ArrayList<>(); + for (final Host host : allHosts) { + conditionallyAddHost(agentBasedHosts, host); + } + return agentBasedHosts; + } + + private List<Host> getAllAgentBasedHosts(long msId) { + final List<HostVO> allHosts = hostDao.listHostsByMs(msId); + if (allHosts == null) { + return new ArrayList<>(); + } + final List <Host> agentBasedHosts = new ArrayList<>(); + for (final Host host : allHosts) { + conditionallyAddHost(agentBasedHosts, host); + } + return agentBasedHosts; + } + + private List<Host> getAllAgentBasedHostsInDc(long msId, long dcId) { + final List<HostVO> allHosts = hostDao.listHostsByMsAndDc(msId, dcId); + if (allHosts == null) { + return new ArrayList<>(); + } + final List <Host> agentBasedHosts = new ArrayList<>(); + for (final Host host : allHosts) { + conditionallyAddHost(agentBasedHosts, host); + } + return agentBasedHosts; + } + + private void conditionallyAddHost(List<Host> agentBasedHosts, Host host) { + if (host == null) { + if (logger.isTraceEnabled()) { + logger.trace("trying to add no host to a list"); + } + return; + } + + EnumSet<ResourceState> allowedStates = EnumSet.of( + ResourceState.Enabled, + ResourceState.Maintenance, + ResourceState.Disabled, + ResourceState.ErrorInMaintenance, + ResourceState.PrepareForMaintenance); + // so the remaining EnumSet<ResourceState> disallowedStates = EnumSet.complementOf(allowedStates) + // would be {ResourceState.Creating, ResourceState.Error}; + if (!allowedStates.contains(host.getResourceState())) { + if (logger.isTraceEnabled()) { + logger.trace("host ({}) is in '{}' state, not adding to the host list", host, host.getResourceState()); + } + return; + } + + if (host.getType() != Host.Type.Routing + && host.getType() != Host.Type.ConsoleProxy + && host.getType() != Host.Type.SecondaryStorage + && host.getType() != Host.Type.SecondaryStorageVM) { + if (logger.isTraceEnabled()) { + logger.trace(String.format("host (%s) is of wrong type, not adding to the host list, type = %s", host, host.getType())); + } + return; + } + + if (host.getHypervisorType() != null - && ! (host.getHypervisorType() == Hypervisor.HypervisorType.KVM || host.getHypervisorType() == Hypervisor.HypervisorType.LXC)) { ++ && !(host.getHypervisorType() == Hypervisor.HypervisorType.KVM || host.getHypervisorType() == Hypervisor.HypervisorType.LXC)) { + + if (logger.isTraceEnabled()) { + logger.trace(String.format("hypervisor is not the right type, not adding to the host list, (host: %s, hypervisortype: %s)", host, host.getHypervisorType())); + } + return; + } + + agentBasedHosts.add(host); + } + + private List<Long> getAllAgentBasedHostsFromDB(final Long zoneId, final Long clusterId) { + return hostDao.findHostIdsByZoneClusterResourceStateTypeAndHypervisorType(zoneId, clusterId, + agentValidResourceStates, agentValidHostTypes, agentValidHypervisorTypes); + } + + @Override + public boolean haveAgentBasedHosts(long msId) { + return CollectionUtils.isNotEmpty(getAllAgentBasedHosts(msId)); + } + private org.apache.cloudstack.agent.lb.IndirectAgentLBAlgorithm getAgentMSLBAlgorithm() { - final String algorithm = getLBAlgorithmName(); - if (algorithmMap.containsKey(algorithm)) { - return algorithmMap.get(algorithm); + return getAgentMSLBAlgorithm(null); + } + + private org.apache.cloudstack.agent.lb.IndirectAgentLBAlgorithm getAgentMSLBAlgorithm(String lbAlgorithm) { + boolean algorithmNameFromConfig = false; + if (StringUtils.isEmpty(lbAlgorithm)) { + lbAlgorithm = getLBAlgorithmName(); + algorithmNameFromConfig = true; + } + if (algorithmMap.containsKey(lbAlgorithm)) { + return algorithmMap.get(lbAlgorithm); + } + throw new CloudRuntimeException(String.format("Algorithm %s%s not found, valid values are: %s", + lbAlgorithm, algorithmNameFromConfig? " configured for '" + IndirectAgentLBAlgorithm.key() + "'" : "", algorithmMap.keySet())); + } + + @Override + public void checkLBAlgorithmName(String lbAlgorithm) { + if (!algorithmMap.containsKey(lbAlgorithm)) { + throw new CloudRuntimeException(String.format("Invalid algorithm %s, valid values are: %s", lbAlgorithm, algorithmMap.keySet())); } - throw new CloudRuntimeException(String.format("Algorithm configured for '%s' not found, valid values are: %s", - IndirectAgentLBAlgorithm.key(), algorithmMap.keySet())); } //////////////////////////////////////////////////////////// @@@ -296,73 -184,6 +314,73 @@@ } } + @Override + public boolean migrateAgents(String fromMsUuid, long fromMsId, String lbAlgorithm, long timeoutDurationInMs) { + if (timeoutDurationInMs <= 0) { + logger.debug(String.format("Not migrating indirect agents from management server node %d (id: %s) to other nodes, invalid timeout duration", fromMsId, fromMsUuid)); + return false; + } + + logger.debug(String.format("Migrating indirect agents from management server node %d (id: %s) to other nodes", fromMsId, fromMsUuid)); + long migrationStartTime = System.currentTimeMillis(); + if (!haveAgentBasedHosts(fromMsId)) { + logger.info(String.format("No indirect agents available on management server node %d (id: %s), to migrate", fromMsId, fromMsUuid)); + return true; + } + + boolean lbAlgorithmChanged = false; + if (StringUtils.isNotBlank(lbAlgorithm) && !lbAlgorithm.equalsIgnoreCase(getLBAlgorithmName())) { + logger.debug(String.format("Indirect agent lb algorithm changed to %s", lbAlgorithm)); + lbAlgorithmChanged = true; + } + + final List<String> avoidMsList = mshostDao.listNonUpStateMsIPs(); + ManagementServerHostVO ms = mshostDao.findByMsid(fromMsId); + if (ms != null && !avoidMsList.contains(ms.getServiceIP())) { + avoidMsList.add(ms.getServiceIP()); + } + + List<DataCenterVO> dataCenterList = dcDao.listAll(); + for (DataCenterVO dc : dataCenterList) { + Long dcId = dc.getId(); + List<Long> orderedHostIdList = getOrderedHostIdList(dcId); + List<Host> agentBasedHostsOfMsInDc = getAllAgentBasedHostsInDc(fromMsId, dcId); + if (CollectionUtils.isEmpty(agentBasedHostsOfMsInDc)) { + continue; + } - logger.debug(String.format("Migrating %d indirect agents from management server node %d (id: %s) of zone %s", agentBasedHostsOfMsInDc.size(), fromMsId, fromMsUuid, dc.toString())); ++ logger.debug(String.format("Migrating %d indirect agents from management server node %d (id: %s) of zone %s", agentBasedHostsOfMsInDc.size(), fromMsId, fromMsUuid, dc)); + for (final Host host : agentBasedHostsOfMsInDc) { + long migrationElapsedTimeInMs = System.currentTimeMillis() - migrationStartTime; + if (migrationElapsedTimeInMs >= timeoutDurationInMs) { + logger.debug(String.format("Stop migrating remaining indirect agents from management server node %d (id: %s), timed out", fromMsId, fromMsUuid)); + return false; + } + + List<String> msList = null; + Long lbCheckInterval = 0L; + if (lbAlgorithmChanged) { + // send new MS list when there is change in lb algorithm + msList = getManagementServerList(host.getId(), dcId, orderedHostIdList, lbAlgorithm); + lbCheckInterval = getLBPreferredHostCheckInterval(host.getClusterId()); + } + + final MigrateAgentConnectionCommand cmd = new MigrateAgentConnectionCommand(msList, avoidMsList, lbAlgorithm, lbCheckInterval); + agentManager.easySend(host.getId(), cmd); //answer not received as the agent disconnects and reconnects to other ms + updateLastManagementServer(host.getId(), fromMsId); + } + } + + return true; + } + + private void updateLastManagementServer(long hostId, long msId) { + HostVO hostVO = hostDao.findById(hostId); + if (hostVO != null) { + hostVO.setLastManagementServerId(msId); + hostDao.update(hostId, hostVO); + } + } + private void configureAlgorithmMap() { final List<org.apache.cloudstack.agent.lb.IndirectAgentLBAlgorithm> algorithms = new ArrayList<>(); algorithms.add(new IndirectAgentLBStaticAlgorithm()); diff --cc utils/src/main/java/com/cloud/utils/nio/NioConnection.java index 2a468fa3c85,2a157b9c001..98fa69716cd --- a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java @@@ -32,8 -32,8 +32,9 @@@ import java.nio.channels.SelectionKey import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; + import java.security.GeneralSecurityException; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; @@@ -76,17 -78,27 +80,29 @@@ public abstract class NioConnection imp protected ExecutorService _executor; protected ExecutorService _sslHandshakeExecutor; protected CAService caService; + protected Set<SocketChannel> socketChannels = new HashSet<>(); + protected Integer sslHandshakeTimeout = null; + private final int factoryMaxNewConnectionsCount; public NioConnection(final String name, final int port, final int workers, final HandlerFactory factory) { _name = name; _isRunning = false; _selector = null; _port = port; + _workers = workers; _factory = factory; - _executor = new ThreadPoolExecutor(workers, 5 * workers, 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(name + "-Handler")); - _sslHandshakeExecutor = Executors.newCachedThreadPool(new NamedThreadFactory(name + "-SSLHandshakeHandler")); + this.factoryMaxNewConnectionsCount = factory.getMaxConcurrentNewConnectionsCount(); + _executor = new ThreadPoolExecutor(workers, 5 * workers, 1, TimeUnit.DAYS, + new LinkedBlockingQueue<>(5 * workers), new NamedThreadFactory(name + "-Handler"), + new ThreadPoolExecutor.AbortPolicy()); + String sslHandshakeHandlerName = name + "-SSLHandshakeHandler"; + if (factoryMaxNewConnectionsCount > 0) { + _sslHandshakeExecutor = new ThreadPoolExecutor(0, this.factoryMaxNewConnectionsCount, 30, + TimeUnit.MINUTES, new SynchronousQueue<>(), new NamedThreadFactory(sslHandshakeHandlerName), + new ThreadPoolExecutor.AbortPolicy()); + } else { + _sslHandshakeExecutor = Executors.newCachedThreadPool(new NamedThreadFactory(sslHandshakeHandlerName)); + } } public void setCAService(final CAService caService) { @@@ -94,7 -106,7 +110,7 @@@ } public void start() throws NioConnectionException { -- _todos = new ArrayList<ChangeRequest>(); ++ _todos = new ArrayList<>(); try { init(); @@@ -110,9 -122,6 +126,9 @@@ } _isStartup = true; + if (_executor.isShutdown()) { - _executor = new ThreadPoolExecutor(_workers, 5 * _workers, 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(_name + "-Handler")); ++ _executor = new ThreadPoolExecutor(_workers, 5 * _workers, 1, TimeUnit.DAYS, new LinkedBlockingQueue<>(), new NamedThreadFactory(_name + "-Handler")); + } _threadExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory(this._name + "-NioConnectionHandler")); _isRunning = true; _futureTask = _threadExecutor.submit(this); @@@ -145,18 -155,18 +162,14 @@@ final Set<SelectionKey> readyKeys = _selector.selectedKeys(); final Iterator<SelectionKey> i = readyKeys.iterator(); -- if (logger.isTraceEnabled()) { -- logger.trace("Keys Processing: " + readyKeys.size()); -- } ++ logger.trace("Keys Processing: {}", readyKeys.size()); // Walk through the ready keys collection. while (i.hasNext()) { final SelectionKey sk = i.next(); i.remove(); if (!sk.isValid()) { -- if (logger.isTraceEnabled()) { -- logger.trace("Selection Key is invalid: " + sk.toString()); -- } ++ logger.trace("Selection Key is invalid: {}", sk); final Link link = (Link)sk.attachment(); if (link != null) { link.terminated(); @@@ -205,53 -231,44 +234,42 @@@ final Socket socket = socketChannel.socket(); socket.setKeepAlive(true); -- if (logger.isTraceEnabled()) { -- logger.trace("Connection accepted for " + socket); -- } ++ logger.trace("Connection accepted for {}", socket); - final SSLEngine sslEngine; try { - sslEngine = Link.initServerSSLEngine(caService, socketChannel.getRemoteAddress().toString()); - sslEngine.setUseClientMode(false); - sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols())); final NioConnection nioConnection = this; - _sslHandshakeExecutor.submit(new Runnable() { - @Override - public void run() { - _selector.wakeup(); - try { - sslEngine.beginHandshake(); - if (!Link.doHandshake(socketChannel, sslEngine)) { - throw new IOException("SSL handshake timed out with " + socketChannel.getRemoteAddress()); - } - if (logger.isTraceEnabled()) { - logger.trace("SSL: Handshake done"); - } - final InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress(); - final Link link = new Link(saddr, nioConnection); - link.setSSLEngine(sslEngine); - link.setKey(socketChannel.register(key.selector(), SelectionKey.OP_READ, link)); - final Task task = _factory.create(Task.Type.CONNECT, link, null); - socketChannels.add(socketChannel); - registerLink(saddr, link); - _executor.submit(task); - } catch (IOException e) { - if (logger.isTraceEnabled()) { - logger.trace("Connection closed due to failure: " + e.getMessage()); - } - closeAutoCloseable(socket, "accepting socket"); - closeAutoCloseable(socketChannel, "accepting socketChannel"); - } finally { - _selector.wakeup(); + _sslHandshakeExecutor.submit(() -> { + final InetSocketAddress socketAddress = (InetSocketAddress)socket.getRemoteSocketAddress(); + _factory.registerNewConnection(socketAddress); + _selector.wakeup(); + try { + final SSLEngine sslEngine = Link.initServerSSLEngine(caService, socketChannel.getRemoteAddress().toString()); + sslEngine.setUseClientMode(false); + sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols())); + sslEngine.beginHandshake(); + if (!Link.doHandshake(socketChannel, sslEngine, getSslHandshakeTimeout())) { + throw new IOException("SSL handshake timed out with " + socketAddress); } + logger.trace("SSL: Handshake done"); + final Link link = new Link(socketAddress, nioConnection); + link.setSSLEngine(sslEngine); + link.setKey(socketChannel.register(key.selector(), SelectionKey.OP_READ, link)); + final Task task = _factory.create(Task.Type.CONNECT, link, null); + registerLink(socketAddress, link); + _executor.submit(task); + } catch (final GeneralSecurityException | IOException e) { + _factory.unregisterNewConnection(socketAddress); + logger.trace("Connection closed with {} due to failure: {}", socket.getRemoteSocketAddress(), e.getMessage()); + closeAutoCloseable(socket, "accepting socket"); + closeAutoCloseable(socketChannel, "accepting socketChannel"); + } finally { + _selector.wakeup(); } }); - } catch (final Exception e) { - if (logger.isTraceEnabled()) { - logger.trace("Connection closed due to failure: " + e.getMessage()); - } - closeAutoCloseable(socket, "accepting socket"); - closeAutoCloseable(socketChannel, "accepting socketChannel"); + } catch (final RejectedExecutionException e) { + logger.trace("{} Accept Task rejected: {}", socket.getRemoteSocketAddress(), e.getMessage()); + closeAutoCloseable(socket, "Rejecting connection - accepting socket"); + closeAutoCloseable(socketChannel, "Rejecting connection - accepting socketChannel"); } finally { _selector.wakeup(); } @@@ -277,14 -295,14 +296,10 @@@ final Link link = (Link)key.attachment(); try { final SocketChannel socketChannel = (SocketChannel)key.channel(); -- if (logger.isTraceEnabled()) { -- logger.trace("Reading from: " + socketChannel.socket().toString()); -- } ++ logger.trace("Reading from: {}", socketChannel.socket().toString()); final byte[] data = link.read(socketChannel); if (data == null) { -- if (logger.isTraceEnabled()) { -- logger.trace("Packet is incomplete. Waiting for more."); -- } ++ logger.trace("Packet is incomplete. Waiting for more."); return; } final Task task = _factory.create(Task.Type.DATA, link, data); @@@ -330,18 -348,18 +345,17 @@@ protected void processTodos() { List<ChangeRequest> todos; -- if (_todos.size() == 0) { ++ if (_todos.isEmpty()) { return; // Nothing to do. } synchronized (this) { todos = _todos; -- _todos = new ArrayList<ChangeRequest>(); ++ _todos = new ArrayList<>(); } -- if (logger.isTraceEnabled()) { -- logger.trace("Todos Processing: " + todos.size()); -- } ++ logger.trace("Todos Processing: {}", todos.size()); ++ SelectionKey key; for (final ChangeRequest todo : todos) { switch (todo.type) { @@@ -368,7 -386,7 +382,7 @@@ link.setKey(key); } } catch (final ClosedChannelException e) { -- logger.warn("Couldn't register socket: " + todo.key); ++ logger.warn("Couldn't register socket: {}", todo.key); try { ((SocketChannel)todo.key).close(); } catch (final IOException ignore) { @@@ -380,9 -398,9 +394,7 @@@ } break; case ChangeRequest.CLOSE: -- if (logger.isTraceEnabled()) { -- logger.trace("Trying to close " + todo.key); -- } ++ logger.trace("Trying to close {}", todo.key); key = (SelectionKey)todo.key; closeConnection(key); if (key != null) { @@@ -410,9 -428,9 +422,7 @@@ if (!socket.getKeepAlive()) { socket.setKeepAlive(true); } -- if (logger.isDebugEnabled()) { -- logger.debug("Connected to " + socket); -- } ++ logger.debug("Connected to {}", socket); final Link link = new Link((InetSocketAddress)socket.getRemoteSocketAddress(), this); link.setKey(key); key.attach(link); @@@ -440,9 -458,9 +450,7 @@@ protected void write(final SelectionKey key) throws IOException { final Link link = (Link)key.attachment(); try { -- if (logger.isTraceEnabled()) { -- logger.trace("Writing to " + link.getSocketAddress().toString()); -- } ++ logger.trace("Writing to {}", link.getSocketAddress().toString()); final boolean close = link.write((SocketChannel)key.channel()); if (close) { closeConnection(key); @@@ -462,9 -480,9 +470,7 @@@ key.cancel(); try { if (channel != null) { -- if (logger.isDebugEnabled()) { -- logger.debug("Closing socket " + channel.socket()); -- } ++ logger.debug("Closing socket {}", channel.socket()); channel.close(); } } catch (final IOException ignore) { @@@ -499,17 -517,8 +505,17 @@@ /* Release the resource used by the instance */ public void cleanUp() throws IOException { - if (_selector != null && _selector.isOpen()) { - _selector.wakeup(); + for (SocketChannel channel : socketChannels) { + if (channel != null && channel.isOpen()) { + try { - logger.info(String.format("Closing connection: %s", channel.getRemoteAddress())); ++ logger.info("Closing connection: {}", channel.getRemoteAddress()); + channel.close(); + } catch (IOException e) { - logger.warn(String.format("Unable to close connection due to %s", e.getMessage())); ++ logger.warn("Unable to close connection due to {}", e.getMessage()); + } + } + } + if (_selector != null) { _selector.close(); } }