Updated Branches:
  refs/heads/master 99ead3419 -> 269a4ef11

CLOUDSTACK-4855: Throttle based on the # of outstanding requests to the 
directly managed HV host (direct agents)
Cloudstack sends requests to directly managed HV hosts (direct agents) using 
the direct agent thread pool. The size of the pool is determined by global 
config direct.agent.pool.size defaulted to 500.

Currently there is no restriction on the number of threads a direct agent can 
use from this shared thread pool to send requests to the host. This is fine as 
long as the host is responding to requests
in a reasonable amount of time. But if there is a considerable delay in getting 
response, the thread remain blocked for that much time. As more commands are 
send to the slow host threads keep getting
blocked. This can eventually lead to a situation where requests to healthy 
hosts cannot be processed as there are not enough free threads.

The problem being addressed here is to localize the impact of few bad hosts, so 
that entire management server is not affected.

One such way is to throttle based on the # of outstanding requests on per host 
basis. The outstanding requests to a host will be a % of direct agent pool 
size. This is configurable based on
direct.agent.thread.cap. The default value is 0.1 or 10%, a value of 1 would 
mean the old behavior where there is no upper cap. This will ensure that the 
impacted host will be bound by a upper cap on the number of threads it can use 
to process requests and not the entire pool.


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/269a4ef1
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/269a4ef1
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/269a4ef1

Branch: refs/heads/master
Commit: 269a4ef11ee151fa408a7dd1f2e69cd1f7f05191
Parents: 99ead34
Author: Koushik Das <kous...@apache.org>
Authored: Wed Oct 30 15:32:01 2013 +0530
Committer: Koushik Das <kous...@apache.org>
Committed: Mon Nov 4 14:52:26 2013 +0530

----------------------------------------------------------------------
 .../com/cloud/agent/manager/AgentAttache.java   |  5 ++-
 .../cloud/agent/manager/AgentManagerImpl.java   | 28 +++++++++++------
 .../cloud/agent/manager/DirectAgentAttache.java | 33 +++++++++++++++++++-
 3 files changed, 54 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/269a4ef1/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java 
b/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java
index ff35255..9c87812 100755
--- a/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java
+++ b/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cloudstack.managed.context.ManagedContextRunnable;
 import org.apache.log4j.Logger;
@@ -107,7 +108,8 @@ public abstract class AgentAttache {
     protected Long _currentSequence;
     protected Status _status = Status.Connecting;
     protected boolean _maintenance;
-    protected long                                    _nextSequence;
+    protected long _nextSequence;
+    protected AtomicInteger _outstandingTaskCount;
 
     protected AgentManagerImpl _agentMgr;
 
@@ -131,6 +133,7 @@ public abstract class AgentAttache {
         _requests = new LinkedList<Request>();
         _agentMgr = agentMgr;
         _nextSequence = s_rand.nextInt(Short.MAX_VALUE) << 48;
+        _outstandingTaskCount = new AtomicInteger(0);
     }
 
     public synchronized long getNextSequence() {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/269a4ef1/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java 
b/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java
index 3e684cc..39d4702 100755
--- a/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java
@@ -159,24 +159,28 @@ public class AgentManagerImpl extends ManagerBase 
implements AgentManager, Handl
     protected ScheduledExecutorService _directAgentExecutor;
     protected ScheduledExecutorService _monitorExecutor;
 
+    private int _directAgentThreadCap;
+
     protected StateMachine2<Status, Status.Event, Host> _statusStateMachine = 
Status.getStateMachine();
     private final Map<Long, Long> _pingMap = new ConcurrentHashMap<Long, 
Long>(10007);
 
     @Inject ResourceManager _resourceMgr;
 
-    protected final ConfigKey<Integer> Workers = new 
ConfigKey<Integer>(Integer.class, "workers", "Advance", "5",
+    protected final ConfigKey<Integer> Workers = new 
ConfigKey<Integer>(Integer.class, "workers", "Advanced", "5",
             "Number of worker threads handling remote agent connections.", 
false);
-    protected final ConfigKey<Integer> Port = new 
ConfigKey<Integer>(Integer.class, "port", "Advance", "8250", "Port to listen on 
for remote agent connections.", false);
-    protected final ConfigKey<Integer> PingInterval = new 
ConfigKey<Integer>(Integer.class, "ping.interval", "Advance", "60",
+    protected final ConfigKey<Integer> Port = new 
ConfigKey<Integer>(Integer.class, "port", "Advanced", "8250", "Port to listen 
on for remote agent connections.", false);
+    protected final ConfigKey<Integer> PingInterval = new 
ConfigKey<Integer>(Integer.class, "ping.interval", "Advanced", "60",
             "Interval to send application level pings to make sure the 
connection is still working", false);
-    protected final ConfigKey<Float> PingTimeout = new 
ConfigKey<Float>(Float.class, "ping.timeout", "Advance", "2.5",
+    protected final ConfigKey<Float> PingTimeout = new 
ConfigKey<Float>(Float.class, "ping.timeout", "Advanced", "2.5",
             "Multiplier to ping.interval before announcing an agent has timed 
out", true);
-    protected final ConfigKey<Integer> AlertWait = new 
ConfigKey<Integer>(Integer.class, "alert.wait", "Advance", "1800",
+    protected final ConfigKey<Integer> AlertWait = new 
ConfigKey<Integer>(Integer.class, "alert.wait", "Advanced", "1800",
             "Seconds to wait before alerting on a disconnected agent", true);
-    protected final ConfigKey<Integer> DirectAgentLoadSize = new 
ConfigKey<Integer>(Integer.class, "direct.agent.load.size", "Advance", "16",
+    protected final ConfigKey<Integer> DirectAgentLoadSize = new 
ConfigKey<Integer>(Integer.class, "direct.agent.load.size", "Advanced", "16",
             "The number of direct agents to load each time", false);
-    protected final ConfigKey<Integer> DirectAgentPoolSize = new 
ConfigKey<Integer>(Integer.class, "direct.agent.pool.size", "Advance", "500",
+    protected final ConfigKey<Integer> DirectAgentPoolSize = new 
ConfigKey<Integer>(Integer.class, "direct.agent.pool.size", "Advanced", "500",
             "Default size for DirectAgentPool", false);
+    protected final ConfigKey<Float> DirectAgentThreadCap = new 
ConfigKey<Float>(Float.class, "direct.agent.thread.cap", "Advanced", "0.1",
+            "Percentage (as a value between 0 and 1) of direct.agent.pool.size 
to be used as upper thread cap for a single direct agent to process requests", 
false);
 
     @Override
     public boolean configure(final String name, final Map<String, Object> 
params) throws ConfigurationException {
@@ -202,10 +206,10 @@ public class AgentManagerImpl extends ManagerBase 
implements AgentManager, Handl
         _connection = new NioServer("AgentManager", Port.value(), 
Workers.value() + 10, this);
         s_logger.info("Listening on " + Port.value() + " with " + 
Workers.value() + " workers");
 
-        
         _directAgentExecutor = new 
ScheduledThreadPoolExecutor(DirectAgentPoolSize.value(), new 
NamedThreadFactory("DirectAgent"));
         s_logger.debug("Created DirectAgentAttache pool with size: " + 
DirectAgentPoolSize.value());
-        
+        _directAgentThreadCap = Math.round(DirectAgentPoolSize.value() * 
DirectAgentThreadCap.value()) + 1; // add 1 to always make the value > 0
+
         _monitorExecutor = new ScheduledThreadPoolExecutor(1, new 
NamedThreadFactory("AgentMonitor"));
 
         return true;
@@ -1422,6 +1426,10 @@ public class AgentManagerImpl extends ManagerBase 
implements AgentManager, Handl
         return _directAgentExecutor;
     }
 
+    public int getDirectAgentThreadCap() {
+        return _directAgentThreadCap;
+    }
+
     public Long getAgentPingTime(long agentId) {
         return _pingMap.get(agentId);
     }
@@ -1568,7 +1576,7 @@ public class AgentManagerImpl extends ManagerBase 
implements AgentManager, Handl
 
     @Override
     public ConfigKey<?>[] getConfigKeys() {
-        return new ConfigKey<?>[] {Workers, Port, PingInterval, PingTimeout, 
Wait, AlertWait, DirectAgentLoadSize, DirectAgentPoolSize};
+        return new ConfigKey<?>[] {Workers, Port, PingInterval, PingTimeout, 
Wait, AlertWait, DirectAgentLoadSize, DirectAgentPoolSize, 
DirectAgentThreadCap};
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/269a4ef1/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java
----------------------------------------------------------------------
diff --git 
a/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java 
b/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java
index 7d3f765..0b6a011 100755
--- a/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java
+++ b/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java
@@ -132,6 +132,11 @@ public class DirectAgentAttache extends AgentAttache {
         @Override
         protected synchronized void runInContext() {
             try {
+                if (_outstandingTaskCount.incrementAndGet() > 
_agentMgr.getDirectAgentThreadCap()) {
+                    s_logger.warn("Task execution for direct attache(" + _id + 
") has reached maximum outstanding limit(" + 
_agentMgr.getDirectAgentThreadCap() + "), bailing out");
+                    return;
+                }
+
                 ServerResource resource = _resource;
 
                 if (resource != null) {
@@ -156,6 +161,8 @@ public class DirectAgentAttache extends AgentAttache {
                 }
             } catch (Exception e) {
                 s_logger.warn("Unable to complete the ping task", e);
+            } finally {
+                _outstandingTaskCount.decrementAndGet();
             }
         }
     }
@@ -168,10 +175,32 @@ public class DirectAgentAttache extends AgentAttache {
             _req = req;
         }
 
+        private void bailout() {
+            long seq = _req.getSequence();
+            try {
+                Command[] cmds = _req.getCommands();
+                ArrayList<Answer> answers = new ArrayList<Answer>(cmds.length);
+                for (Command cmd : cmds) {
+                    Answer answer = new Answer(cmd, false, "Bailed out as 
maximum oustanding task limit reached");
+                    answers.add(answer);
+                }
+                Response resp = new Response(_req, answers.toArray(new 
Answer[answers.size()]));
+                processAnswers(seq, resp);
+            } catch (Exception e) {
+                s_logger.warn(log(seq, "Exception caught in bailout "), e);
+            }
+        }
+
         @Override
         protected void runInContext() {
             long seq = _req.getSequence();
             try {
+                if (_outstandingTaskCount.incrementAndGet() > 
_agentMgr.getDirectAgentThreadCap()) {
+                    s_logger.warn("Task execution for direct attache(" + _id + 
") has reached maximum outstanding limit(" + 
_agentMgr.getDirectAgentThreadCap() + "), bailing out");
+                    bailout();
+                    return;
+                }
+
                 ServerResource resource = _resource;
                 Command[] cmds = _req.getCommands();
                 boolean stopOnError = _req.stopOnError();
@@ -186,7 +215,7 @@ public class DirectAgentAttache extends AgentAttache {
                         if (resource != null) {
                             answer = resource.executeRequest(cmds[i]);
                             if(answer == null) {
-                               s_logger.warn("Resource returned null answer!");
+                                s_logger.warn("Resource returned null 
answer!");
                                 answer = new Answer(cmds[i], false, "Resource 
returned null answer");
                             }
                         } else {
@@ -213,6 +242,8 @@ public class DirectAgentAttache extends AgentAttache {
                 processAnswers(seq, resp);
             } catch (Exception e) {
                 s_logger.warn(log(seq, "Exception caught "), e);
+            } finally {
+                _outstandingTaskCount.decrementAndGet();
             }
         }
     }

Reply via email to