Repository: cloudstack Updated Branches: refs/heads/master cb4513379 -> a2d85c8ca
CLOUDSTACK-7566:Many jobs getting stuck in pending state and cloud is unusable. Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/a2d85c8c Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/a2d85c8c Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/a2d85c8c Branch: refs/heads/master Commit: a2d85c8cae5f603bbcfcd3659c1207f0bfe461a7 Parents: cb45133 Author: Min Chen <min.c...@citrix.com> Authored: Tue Sep 16 15:14:08 2014 -0700 Committer: Min Chen <min.c...@citrix.com> Committed: Tue Sep 16 15:14:08 2014 -0700 ---------------------------------------------------------------------- .../framework/messagebus/MessageBusBase.java | 37 ++++++++++++++++---- .../jobs/impl/AsyncJobManagerImpl.java | 13 +++++++ 2 files changed, 44 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a2d85c8c/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java index 9432da0..e8f9bce 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java @@ -26,6 +26,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.log4j.Logger; + import org.apache.cloudstack.framework.serializer.MessageSerializer; public class MessageBusBase implements MessageBus { @@ -36,6 +38,8 @@ public class MessageBusBase implements MessageBus { private final SubscriptionNode _subscriberRoot; private MessageSerializer _messageSerializer; + private static final Logger s_logger = Logger.getLogger(MessageBusBase.class); + public MessageBusBase() { _gate = new Gate(); _pendingActions = new ArrayList<ActionRecord>(); @@ -58,6 +62,9 @@ public class MessageBusBase implements MessageBus { assert (subject != null); assert (subscriber != null); if (_gate.enter()) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Enter gate in message bus subscribe"); + } try { SubscriptionNode current = locate(subject, null, true); assert (current != null); @@ -75,6 +82,9 @@ public class MessageBusBase implements MessageBus { @Override public void unsubscribe(String subject, MessageSubscriber subscriber) { if (_gate.enter()) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Enter gate in message bus unsubscribe"); + } try { if (subject != null) { SubscriptionNode current = locate(subject, null, false); @@ -96,6 +106,9 @@ public class MessageBusBase implements MessageBus { @Override public void clearAll() { if (_gate.enter()) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Enter gate in message bus clearAll"); + } try { _subscriberRoot.clearAll(); doPrune(); @@ -112,6 +125,9 @@ public class MessageBusBase implements MessageBus { @Override public void prune() { if (_gate.enter()) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Enter gate in message bus prune"); + } try { doPrune(); } finally { @@ -144,6 +160,9 @@ public class MessageBusBase implements MessageBus { public void publish(String senderAddress, String subject, PublishScope scope, Object args) { if (_gate.enter(true)) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Enter gate in message bus publish"); + } try { List<SubscriptionNode> chainFromTop = new ArrayList<SubscriptionNode>(); SubscriptionNode current = locate(subject, chainFromTop, false); @@ -309,14 +328,20 @@ public class MessageBusBase implements MessageBus { public void leave() { synchronized (this) { if (_reentranceCount > 0) { - assert (_gateOwner == Thread.currentThread()); + try { + assert (_gateOwner == Thread.currentThread()); - onGateOpen(); - _reentranceCount--; - assert (_reentranceCount == 0); - _gateOwner = null; + onGateOpen(); + } finally { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Open gate of message bus"); + } + _reentranceCount--; + assert (_reentranceCount == 0); + _gateOwner = null; - notifyAll(); + notifyAll(); + } } } } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a2d85c8c/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index c28e87b..7d374da 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -236,10 +236,20 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, job.setResult(resultObject); } + if (s_logger.isDebugEnabled()) { + s_logger.debug("Publish async job-" + jobId + " complete on message bus"); + } publishOnEventBus(job, "complete"); // publish before the instance type and ID are wiped out + + if (s_logger.isDebugEnabled()) { + s_logger.debug("Wake up jobs related to job- " + jobId); + } List<Long> wakeupList = Transaction.execute(new TransactionCallback<List<Long>>() { @Override public List<Long> doInTransaction(TransactionStatus status) { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Update db status for job- " + jobId); + } job.setCompleteMsid(getMsid()); job.setStatus(jobStatus); job.setResultCode(resultCode); @@ -253,6 +263,9 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, job.setLastUpdated(DateUtil.currentGMTTime()); _jobDao.update(jobId, job); + if (s_logger.isDebugEnabled()) { + s_logger.debug("Wake up jobs joined with job- " + jobId + " and disjoin all subjobs created from job- " + jobId); + } List<Long> wakeupList = wakeupByJoinedJobCompletion(jobId); _joinMapDao.disjoinAllJobs(jobId);