Updated Branches: refs/heads/master 99ead3419 -> 269a4ef11
CLOUDSTACK-4855: Throttle based on the # of outstanding requests to the directly managed HV host (direct agents) Cloudstack sends requests to directly managed HV hosts (direct agents) using the direct agent thread pool. The size of the pool is determined by global config direct.agent.pool.size defaulted to 500. Currently there is no restriction on the number of threads a direct agent can use from this shared thread pool to send requests to the host. This is fine as long as the host is responding to requests in a reasonable amount of time. But if there is a considerable delay in getting response, the thread remain blocked for that much time. As more commands are send to the slow host threads keep getting blocked. This can eventually lead to a situation where requests to healthy hosts cannot be processed as there are not enough free threads. The problem being addressed here is to localize the impact of few bad hosts, so that entire management server is not affected. One such way is to throttle based on the # of outstanding requests on per host basis. The outstanding requests to a host will be a % of direct agent pool size. This is configurable based on direct.agent.thread.cap. The default value is 0.1 or 10%, a value of 1 would mean the old behavior where there is no upper cap. This will ensure that the impacted host will be bound by a upper cap on the number of threads it can use to process requests and not the entire pool. Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/269a4ef1 Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/269a4ef1 Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/269a4ef1 Branch: refs/heads/master Commit: 269a4ef11ee151fa408a7dd1f2e69cd1f7f05191 Parents: 99ead34 Author: Koushik Das <kous...@apache.org> Authored: Wed Oct 30 15:32:01 2013 +0530 Committer: Koushik Das <kous...@apache.org> Committed: Mon Nov 4 14:52:26 2013 +0530 ---------------------------------------------------------------------- .../com/cloud/agent/manager/AgentAttache.java | 5 ++- .../cloud/agent/manager/AgentManagerImpl.java | 28 +++++++++++------ .../cloud/agent/manager/DirectAgentAttache.java | 33 +++++++++++++++++++- 3 files changed, 54 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/269a4ef1/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java ---------------------------------------------------------------------- diff --git a/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java b/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java index ff35255..9c87812 100755 --- a/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java +++ b/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java @@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.cloudstack.managed.context.ManagedContextRunnable; import org.apache.log4j.Logger; @@ -107,7 +108,8 @@ public abstract class AgentAttache { protected Long _currentSequence; protected Status _status = Status.Connecting; protected boolean _maintenance; - protected long _nextSequence; + protected long _nextSequence; + protected AtomicInteger _outstandingTaskCount; protected AgentManagerImpl _agentMgr; @@ -131,6 +133,7 @@ public abstract class AgentAttache { _requests = new LinkedList<Request>(); _agentMgr = agentMgr; _nextSequence = s_rand.nextInt(Short.MAX_VALUE) << 48; + _outstandingTaskCount = new AtomicInteger(0); } public synchronized long getNextSequence() { http://git-wip-us.apache.org/repos/asf/cloudstack/blob/269a4ef1/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java ---------------------------------------------------------------------- diff --git a/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java b/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java index 3e684cc..39d4702 100755 --- a/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java +++ b/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java @@ -159,24 +159,28 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl protected ScheduledExecutorService _directAgentExecutor; protected ScheduledExecutorService _monitorExecutor; + private int _directAgentThreadCap; + protected StateMachine2<Status, Status.Event, Host> _statusStateMachine = Status.getStateMachine(); private final Map<Long, Long> _pingMap = new ConcurrentHashMap<Long, Long>(10007); @Inject ResourceManager _resourceMgr; - protected final ConfigKey<Integer> Workers = new ConfigKey<Integer>(Integer.class, "workers", "Advance", "5", + protected final ConfigKey<Integer> Workers = new ConfigKey<Integer>(Integer.class, "workers", "Advanced", "5", "Number of worker threads handling remote agent connections.", false); - protected final ConfigKey<Integer> Port = new ConfigKey<Integer>(Integer.class, "port", "Advance", "8250", "Port to listen on for remote agent connections.", false); - protected final ConfigKey<Integer> PingInterval = new ConfigKey<Integer>(Integer.class, "ping.interval", "Advance", "60", + protected final ConfigKey<Integer> Port = new ConfigKey<Integer>(Integer.class, "port", "Advanced", "8250", "Port to listen on for remote agent connections.", false); + protected final ConfigKey<Integer> PingInterval = new ConfigKey<Integer>(Integer.class, "ping.interval", "Advanced", "60", "Interval to send application level pings to make sure the connection is still working", false); - protected final ConfigKey<Float> PingTimeout = new ConfigKey<Float>(Float.class, "ping.timeout", "Advance", "2.5", + protected final ConfigKey<Float> PingTimeout = new ConfigKey<Float>(Float.class, "ping.timeout", "Advanced", "2.5", "Multiplier to ping.interval before announcing an agent has timed out", true); - protected final ConfigKey<Integer> AlertWait = new ConfigKey<Integer>(Integer.class, "alert.wait", "Advance", "1800", + protected final ConfigKey<Integer> AlertWait = new ConfigKey<Integer>(Integer.class, "alert.wait", "Advanced", "1800", "Seconds to wait before alerting on a disconnected agent", true); - protected final ConfigKey<Integer> DirectAgentLoadSize = new ConfigKey<Integer>(Integer.class, "direct.agent.load.size", "Advance", "16", + protected final ConfigKey<Integer> DirectAgentLoadSize = new ConfigKey<Integer>(Integer.class, "direct.agent.load.size", "Advanced", "16", "The number of direct agents to load each time", false); - protected final ConfigKey<Integer> DirectAgentPoolSize = new ConfigKey<Integer>(Integer.class, "direct.agent.pool.size", "Advance", "500", + protected final ConfigKey<Integer> DirectAgentPoolSize = new ConfigKey<Integer>(Integer.class, "direct.agent.pool.size", "Advanced", "500", "Default size for DirectAgentPool", false); + protected final ConfigKey<Float> DirectAgentThreadCap = new ConfigKey<Float>(Float.class, "direct.agent.thread.cap", "Advanced", "0.1", + "Percentage (as a value between 0 and 1) of direct.agent.pool.size to be used as upper thread cap for a single direct agent to process requests", false); @Override public boolean configure(final String name, final Map<String, Object> params) throws ConfigurationException { @@ -202,10 +206,10 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl _connection = new NioServer("AgentManager", Port.value(), Workers.value() + 10, this); s_logger.info("Listening on " + Port.value() + " with " + Workers.value() + " workers"); - _directAgentExecutor = new ScheduledThreadPoolExecutor(DirectAgentPoolSize.value(), new NamedThreadFactory("DirectAgent")); s_logger.debug("Created DirectAgentAttache pool with size: " + DirectAgentPoolSize.value()); - + _directAgentThreadCap = Math.round(DirectAgentPoolSize.value() * DirectAgentThreadCap.value()) + 1; // add 1 to always make the value > 0 + _monitorExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AgentMonitor")); return true; @@ -1422,6 +1426,10 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl return _directAgentExecutor; } + public int getDirectAgentThreadCap() { + return _directAgentThreadCap; + } + public Long getAgentPingTime(long agentId) { return _pingMap.get(agentId); } @@ -1568,7 +1576,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl @Override public ConfigKey<?>[] getConfigKeys() { - return new ConfigKey<?>[] {Workers, Port, PingInterval, PingTimeout, Wait, AlertWait, DirectAgentLoadSize, DirectAgentPoolSize}; + return new ConfigKey<?>[] {Workers, Port, PingInterval, PingTimeout, Wait, AlertWait, DirectAgentLoadSize, DirectAgentPoolSize, DirectAgentThreadCap}; } } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/269a4ef1/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java ---------------------------------------------------------------------- diff --git a/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java b/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java index 7d3f765..0b6a011 100755 --- a/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java +++ b/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java @@ -132,6 +132,11 @@ public class DirectAgentAttache extends AgentAttache { @Override protected synchronized void runInContext() { try { + if (_outstandingTaskCount.incrementAndGet() > _agentMgr.getDirectAgentThreadCap()) { + s_logger.warn("Task execution for direct attache(" + _id + ") has reached maximum outstanding limit(" + _agentMgr.getDirectAgentThreadCap() + "), bailing out"); + return; + } + ServerResource resource = _resource; if (resource != null) { @@ -156,6 +161,8 @@ public class DirectAgentAttache extends AgentAttache { } } catch (Exception e) { s_logger.warn("Unable to complete the ping task", e); + } finally { + _outstandingTaskCount.decrementAndGet(); } } } @@ -168,10 +175,32 @@ public class DirectAgentAttache extends AgentAttache { _req = req; } + private void bailout() { + long seq = _req.getSequence(); + try { + Command[] cmds = _req.getCommands(); + ArrayList<Answer> answers = new ArrayList<Answer>(cmds.length); + for (Command cmd : cmds) { + Answer answer = new Answer(cmd, false, "Bailed out as maximum oustanding task limit reached"); + answers.add(answer); + } + Response resp = new Response(_req, answers.toArray(new Answer[answers.size()])); + processAnswers(seq, resp); + } catch (Exception e) { + s_logger.warn(log(seq, "Exception caught in bailout "), e); + } + } + @Override protected void runInContext() { long seq = _req.getSequence(); try { + if (_outstandingTaskCount.incrementAndGet() > _agentMgr.getDirectAgentThreadCap()) { + s_logger.warn("Task execution for direct attache(" + _id + ") has reached maximum outstanding limit(" + _agentMgr.getDirectAgentThreadCap() + "), bailing out"); + bailout(); + return; + } + ServerResource resource = _resource; Command[] cmds = _req.getCommands(); boolean stopOnError = _req.stopOnError(); @@ -186,7 +215,7 @@ public class DirectAgentAttache extends AgentAttache { if (resource != null) { answer = resource.executeRequest(cmds[i]); if(answer == null) { - s_logger.warn("Resource returned null answer!"); + s_logger.warn("Resource returned null answer!"); answer = new Answer(cmds[i], false, "Resource returned null answer"); } } else { @@ -213,6 +242,8 @@ public class DirectAgentAttache extends AgentAttache { processAnswers(seq, resp); } catch (Exception e) { s_logger.warn(log(seq, "Exception caught "), e); + } finally { + _outstandingTaskCount.decrementAndGet(); } } }