Updated Branches: refs/heads/4.3 f1b157114 -> be0d68851
CLOUDSTACK-5731: Use general instance type to categorize VM work jobs to correctly serialize VM operations Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/be0d6885 Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/be0d6885 Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/be0d6885 Branch: refs/heads/4.3 Commit: be0d688511577cdb227de8bca9314f71fe1ab51e Parents: f1b1571 Author: Kelven Yang <kelv...@gmail.com> Authored: Fri Jan 17 10:41:42 2014 -0800 Committer: Kelven Yang <kelv...@gmail.com> Committed: Fri Jan 17 10:42:04 2014 -0800 ---------------------------------------------------------------------- .../src/com/cloud/vm/VmWorkConstants.java | 1 + .../com/cloud/vm/VirtualMachineManagerImpl.java | 171 ++++++++++++++++--- .../src/com/cloud/vm/VmWorkJobDispatcher.java | 49 +++--- .../jobs/AsyncJobExecutionContext.java | 48 ++++-- .../framework/jobs/dao/VmWorkJobDao.java | 4 +- .../framework/jobs/dao/VmWorkJobDaoImpl.java | 121 ++++++++++--- .../com/cloud/storage/VolumeApiServiceImpl.java | 99 +++++++++-- .../vm/snapshot/VMSnapshotManagerImpl.java | 80 ++++++++- 8 files changed, 468 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/be0d6885/engine/components-api/src/com/cloud/vm/VmWorkConstants.java ---------------------------------------------------------------------- diff --git a/engine/components-api/src/com/cloud/vm/VmWorkConstants.java b/engine/components-api/src/com/cloud/vm/VmWorkConstants.java index 20e40b7..4627cfe 100644 --- a/engine/components-api/src/com/cloud/vm/VmWorkConstants.java +++ b/engine/components-api/src/com/cloud/vm/VmWorkConstants.java @@ -20,4 +20,5 @@ public interface VmWorkConstants { public static final String VM_WORK_QUEUE = "VmWorkJobQueue"; public static final String VM_WORK_JOB_DISPATCHER = "VmWorkJobDispatcher"; public static final String VM_WORK_JOB_WAKEUP_DISPATCHER = "VmWorkJobWakeupDispatcher"; + public static final String VM_WORK_JOB_PLACEHOLDER = "VmWorkJobPlaceHolder"; } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/be0d6885/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java ---------------------------------------------------------------------- diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java index e90ff1f..0393e39 100755 --- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -558,6 +558,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac _executor.scheduleAtFixedRate(new TransitionTask(), 5000, VmJobStateReportInterval.value(), TimeUnit.SECONDS); _executor.scheduleAtFixedRate(new CleanupTask(), VmOpCleanupInterval.value(), VmOpCleanupInterval.value(), TimeUnit.SECONDS); cancelWorkItems(_nodeId); + + // cleanup left over place holder works + _workJobDao.expungeLeftoverWorkJobs(ManagementServerNode.getManagementServerId()); return true; } @@ -749,7 +752,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - orchestrateStart(vmUuid, params, planToDeploy, planner); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + VirtualMachine vm = _vmDao.findByUuid(vmUuid); + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + orchestrateStart(vmUuid, params, planToDeploy, planner); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } } else { Outcome<VirtualMachine> outcome = startVmThroughJobQueue(vmUuid, params, planToDeploy); @@ -1275,7 +1288,19 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - orchestrateStop(vmUuid, cleanUpEvenIfUnableToStop); + + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + VirtualMachine vm = _vmDao.findByUuid(vmUuid); + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + orchestrateStop(vmUuid, cleanUpEvenIfUnableToStop); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } + } else { Outcome<VirtualMachine> outcome = stopVmThroughJobQueue(vmUuid, cleanUpEvenIfUnableToStop); @@ -1567,7 +1592,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - orchestrateStorageMigration(vmUuid, destPool); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + VirtualMachine vm = _vmDao.findByUuid(vmUuid); + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + orchestrateStorageMigration(vmUuid, destPool); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } } else { Outcome<VirtualMachine> outcome = migrateVmStorageThroughJobQueue(vmUuid, destPool); @@ -1649,7 +1684,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - orchestrateMigrate(vmUuid, srcHostId, dest); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + VirtualMachine vm = _vmDao.findByUuid(vmUuid); + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + orchestrateMigrate(vmUuid, srcHostId, dest); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } } else { Outcome<VirtualMachine> outcome = migrateVmThroughJobQueue(vmUuid, srcHostId, dest); @@ -1920,7 +1965,19 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - orchestrateMigrateWithStorage(vmUuid, srcHostId, destHostId, volumeToPool); + + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + VirtualMachine vm = _vmDao.findByUuid(vmUuid); + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + orchestrateMigrateWithStorage(vmUuid, srcHostId, destHostId, volumeToPool); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } + } else { Outcome<VirtualMachine> outcome = migrateVmWithStorageThroughJobQueue(vmUuid, srcHostId, destHostId, volumeToPool); @@ -2163,6 +2220,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac s_logger.trace("VM Operation Thread Running"); try { _workDao.cleanup(VmOpCleanupWait.value()); + + // TODO. hard-coded to one hour after job has been completed + Date cutDate = new Date(new Date().getTime() - 3600000); + _workJobDao.expungeCompletedWorkJobs(cutDate); } catch (Exception e) { s_logger.error("VM Operations failed due to ", e); } @@ -2199,7 +2260,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - orchestrateReboot(vmUuid, params); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + VirtualMachine vm = _vmDao.findByUuid(vmUuid); + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + orchestrateReboot(vmUuid, params); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } } else { Outcome<VirtualMachine> outcome = rebootVmThroughJobQueue(vmUuid, params); @@ -3123,7 +3194,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateAddVmToNetwork(vm, network,requested); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + return orchestrateAddVmToNetwork(vm, network, requested); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } } else { Outcome<VirtualMachine> outcome = addVmToNetworkThroughJobQueue(vm, network, requested); @@ -3226,7 +3306,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateRemoveNicFromVm(vm, nic); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + return orchestrateRemoveNicFromVm(vm, nic); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } + } else { Outcome<VirtualMachine> outcome = removeNicFromVmThroughJobQueue(vm, nic); @@ -3462,7 +3552,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - orchestrateMigrateForScale(vmUuid, srcHostId, dest, oldSvcOfferingId); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + VirtualMachine vm = _vmDao.findByUuid(vmUuid); + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + orchestrateMigrateForScale(vmUuid, srcHostId, dest, oldSvcOfferingId); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } } else { Outcome<VirtualMachine> outcome = migrateVmForScaleThroughJobQueue(vmUuid, srcHostId, dest, oldSvcOfferingId); @@ -3711,7 +3811,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateReConfigureVm(vmUuid, oldServiceOffering, reconfiguringOnExistingHost); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + VirtualMachine vm = _vmDao.findByUuid(vmUuid); + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + return orchestrateReConfigureVm(vmUuid, oldServiceOffering, reconfiguringOnExistingHost); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } } else { Outcome<VirtualMachine> outcome = reconfigureVmThroughJobQueue(vmUuid, oldServiceOffering, reconfiguringOnExistingHost); @@ -4182,7 +4292,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setAccountId(callingAccount.getId()); workJob.setUserId(callingUser.getId()); workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -4235,7 +4345,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); workJob.setStep(VmWorkJobVO.Step.Prepare); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -4288,7 +4398,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); workJob.setStep(VmWorkJobVO.Step.Prepare); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -4340,7 +4450,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -4394,7 +4504,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -4448,7 +4558,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -4502,7 +4612,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -4554,7 +4664,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -4605,7 +4715,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -4656,7 +4766,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -4709,7 +4819,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -4866,4 +4976,23 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac public Pair<JobInfo.Status, String> handleVmWorkJob(VmWork work) throws Exception { return _jobHandlerProxy.handleVmWorkJob(work); } + + private VmWorkJobVO createPlaceHolderWork(long instanceId) { + VmWorkJobVO workJob = new VmWorkJobVO(""); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER); + workJob.setCmd(""); + workJob.setCmdInfo(""); + + workJob.setAccountId(0); + workJob.setUserId(0); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(instanceId); + workJob.setInitMsid(ManagementServerNode.getManagementServerId()); + + _workJobDao.persist(workJob); + + return workJob; + } } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/be0d6885/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java ---------------------------------------------------------------------- diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java index bea09b1..40b7784 100644 --- a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java +++ b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java @@ -59,9 +59,6 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch String cmd = job.getCmd(); assert(cmd != null); - if(s_logger.isDebugEnabled()) - s_logger.debug("Run VM work job: " + cmd + ", job origin: " + job.getRelated()); - Class<?> workClz = null; try { workClz = Class.forName(job.getCmd()); @@ -79,27 +76,33 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch return; } - if (_handlers == null || _handlers.isEmpty()) { - s_logger.error("Invalid startup configuration, no work job handler is found. cmd: " + job.getCmd() + ", job info: " + job.getCmdInfo() - + ", job origin: " + job.getRelated()); - _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Invalid startup configuration. no job handler is found"); - return; - } - - VmWorkJobHandler handler = _handlers.get(work.getHandlerName()); - - if (handler == null) { - s_logger.error("Unable to find work job handler. handler name: " + work.getHandlerName() + ", job cmd: " + job.getCmd() - + ", job info: " + job.getCmdInfo() + ", job origin: " + job.getRelated()); - _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Unable to find work job handler"); - return; + if (s_logger.isDebugEnabled()) + s_logger.debug("Run VM work job: " + cmd + " for VM " + work.getVmId() + ", job origin: " + job.getRelated()); + try { + if (_handlers == null || _handlers.isEmpty()) { + s_logger.error("Invalid startup configuration, no work job handler is found. cmd: " + job.getCmd() + ", job info: " + job.getCmdInfo() + + ", job origin: " + job.getRelated()); + _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Invalid startup configuration. no job handler is found"); + return; + } + + VmWorkJobHandler handler = _handlers.get(work.getHandlerName()); + + if (handler == null) { + s_logger.error("Unable to find work job handler. handler name: " + work.getHandlerName() + ", job cmd: " + job.getCmd() + + ", job info: " + job.getCmdInfo() + ", job origin: " + job.getRelated()); + _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Unable to find work job handler"); + return; + } + + CallContext.register(work.getUserId(), work.getAccountId(), job.getRelated()); + + Pair<JobInfo.Status, String> result = handler.handleVmWorkJob(work); + _asyncJobMgr.completeAsyncJob(job.getId(), result.first(), 0, result.second()); + } finally { + if (s_logger.isDebugEnabled()) + s_logger.debug("Done with run of VM work job: " + cmd + " for VM " + work.getVmId() + ", job origin: " + job.getRelated()); } - - CallContext.register(work.getUserId(), work.getAccountId(), job.getRelated()); - - Pair<JobInfo.Status, String> result = handler.handleVmWorkJob(work); - _asyncJobMgr.completeAsyncJob(job.getId(), result.first(), 0, result.second()); - } catch(Throwable e) { s_logger.error("Unable to complete " + job + ", job origin:" + job.getRelated(), e); http://git-wip-us.apache.org/repos/asf/cloudstack/blob/be0d6885/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java index e9bdae2..966e638 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java @@ -31,6 +31,8 @@ import com.cloud.exception.InsufficientCapacityException; import com.cloud.exception.ResourceUnavailableException; public class AsyncJobExecutionContext { + private static final Logger s_logger = Logger.getLogger(AsyncJobExecutionContext.class); + private AsyncJob _job; static private AsyncJobManager _jobMgr; @@ -112,7 +114,8 @@ public class AsyncJobExecutionContext { } // - // check failure exception before we disjoin the worker job + // check failure exception before we disjoin the worker job, work job usually fails with exception + // this will help propogate exception between jobs // TODO : it is ugly and this will become unnecessary after we switch to full-async mode // public void disjoinJob(long joinedJobId) throws InsufficientCapacityException, @@ -120,21 +123,34 @@ public class AsyncJobExecutionContext { assert(_job != null); AsyncJobJoinMapVO record = _joinMapDao.getJoinRecord(_job.getId(), joinedJobId); - if(record.getJoinStatus() == JobInfo.Status.FAILED && record.getJoinResult() != null) { - Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult()); - if(exception != null && exception instanceof Exception) { - if(exception instanceof InsufficientCapacityException) - throw (InsufficientCapacityException)exception; - else if(exception instanceof ConcurrentOperationException) - throw (ConcurrentOperationException)exception; - else if(exception instanceof ResourceUnavailableException) - throw (ResourceUnavailableException)exception; - else - throw new RuntimeException((Exception)exception); - } + _jobMgr.disjoinJob(_job.getId(), joinedJobId); + + if (record.getJoinStatus() == JobInfo.Status.FAILED) { + if (record.getJoinResult() != null) { + Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult()); + if (exception != null && exception instanceof Exception) { + if (exception instanceof InsufficientCapacityException) { + s_logger.error("Job " + joinedJobId + " failed with InsufficientCapacityException"); + throw (InsufficientCapacityException)exception; + } + else if (exception instanceof ConcurrentOperationException) { + s_logger.error("Job " + joinedJobId + " failed with ConcurrentOperationException"); + throw (ConcurrentOperationException)exception; + } + else if (exception instanceof ResourceUnavailableException) { + s_logger.error("Job " + joinedJobId + " failed with ResourceUnavailableException"); + throw (ResourceUnavailableException)exception; + } + else { + s_logger.error("Job " + joinedJobId + " failed with exception"); + throw new RuntimeException((Exception)exception); + } + } + } else { + s_logger.error("Job " + joinedJobId + " failed without providing an error object"); + throw new RuntimeException("Job " + joinedJobId + " failed without providing an error object"); + } } - - _jobMgr.disjoinJob(_job.getId(), joinedJobId); } public void completeJoin(JobInfo.Status joinStatus, String joinResult) { @@ -151,6 +167,8 @@ public class AsyncJobExecutionContext { public static AsyncJobExecutionContext getCurrentExecutionContext() { AsyncJobExecutionContext context = s_currentExectionContext.get(); if (context == null) { + // TODO, this has security implicitions + s_logger.warn("Job is executed without a context, setup psudo job for the executing thread"); context = registerPseudoExecutionContext(CallContext.current().getCallingAccountId(), CallContext.current().getCallingUserId()); } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/be0d6885/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java index dfb063f..bd6289e 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java @@ -29,7 +29,9 @@ public interface VmWorkJobDao extends GenericDao<VmWorkJobVO, Long> { VmWorkJobVO findPendingWorkJob(VirtualMachine.Type type, long instanceId); List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId); List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId, String jobCmd); - + void updateStep(long workJobId, Step step); void expungeCompletedWorkJobs(Date cutDate); + + void expungeLeftoverWorkJobs(long msid); } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/be0d6885/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java index 77515a7..1ee7b25 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java @@ -16,8 +16,11 @@ // under the License. package org.apache.cloudstack.framework.jobs.dao; +import java.sql.PreparedStatement; +import java.sql.SQLException; import java.util.Date; import java.util.List; +import java.util.TimeZone; import javax.annotation.PostConstruct; @@ -31,24 +34,26 @@ import com.cloud.utils.db.GenericDaoBase; import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; import com.cloud.utils.db.SearchCriteria.Op; +import com.cloud.utils.db.Transaction; +import com.cloud.utils.db.TransactionCallbackNoReturn; +import com.cloud.utils.db.TransactionLegacy; +import com.cloud.utils.db.TransactionStatus; import com.cloud.vm.VirtualMachine; public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implements VmWorkJobDao { protected SearchBuilder<VmWorkJobVO> PendingWorkJobSearch; protected SearchBuilder<VmWorkJobVO> PendingWorkJobByCommandSearch; - protected SearchBuilder<VmWorkJobVO> ExpungeWorkJobSearch; - + public VmWorkJobDaoImpl() { } - + @PostConstruct public void init() { PendingWorkJobSearch = createSearchBuilder(); PendingWorkJobSearch.and("jobStatus", PendingWorkJobSearch.entity().getStatus(), Op.EQ); PendingWorkJobSearch.and("vmType", PendingWorkJobSearch.entity().getVmType(), Op.EQ); PendingWorkJobSearch.and("vmInstanceId", PendingWorkJobSearch.entity().getVmInstanceId(), Op.EQ); - PendingWorkJobSearch.and("step", PendingWorkJobSearch.entity().getStep(), Op.NEQ); PendingWorkJobSearch.done(); PendingWorkJobByCommandSearch = createSearchBuilder(); @@ -58,54 +63,49 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen PendingWorkJobByCommandSearch.and("step", PendingWorkJobByCommandSearch.entity().getStep(), Op.NEQ); PendingWorkJobByCommandSearch.and("cmd", PendingWorkJobByCommandSearch.entity().getCmd(), Op.EQ); PendingWorkJobByCommandSearch.done(); - - ExpungeWorkJobSearch = createSearchBuilder(); - ExpungeWorkJobSearch.and("lastUpdated", ExpungeWorkJobSearch.entity().getLastUpdated(), Op.LT); - ExpungeWorkJobSearch.and("jobStatus", ExpungeWorkJobSearch.entity().getStatus(), Op.NEQ); - ExpungeWorkJobSearch.done(); } - + @Override public VmWorkJobVO findPendingWorkJob(VirtualMachine.Type type, long instanceId) { - + SearchCriteria<VmWorkJobVO> sc = PendingWorkJobSearch.create(); sc.setParameters("jobStatus", JobInfo. Status.IN_PROGRESS); sc.setParameters("vmType", type); sc.setParameters("vmInstanceId", instanceId); - + Filter filter = new Filter(VmWorkJobVO.class, "created", true, null, null); List<VmWorkJobVO> result = this.listBy(sc, filter); if(result != null && result.size() > 0) return result.get(0); - + return null; } - + @Override public List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId) { - + SearchCriteria<VmWorkJobVO> sc = PendingWorkJobSearch.create(); sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS); sc.setParameters("vmType", type); sc.setParameters("vmInstanceId", instanceId); - + Filter filter = new Filter(VmWorkJobVO.class, "created", true, null, null); return this.listBy(sc, filter); } @Override public List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId, String jobCmd) { - + SearchCriteria<VmWorkJobVO> sc = PendingWorkJobByCommandSearch.create(); sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS); sc.setParameters("vmType", type); sc.setParameters("vmInstanceId", instanceId); sc.setParameters("cmd", jobCmd); - + Filter filter = new Filter(VmWorkJobVO.class, "created", true, null, null); return this.listBy(sc, filter); } - + @Override public void updateStep(long workJobId, Step step) { VmWorkJobVO jobVo = findById(workJobId); @@ -113,13 +113,82 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen jobVo.setLastUpdated(DateUtil.currentGMTTime()); update(workJobId, jobVo); } - + @Override - public void expungeCompletedWorkJobs(Date cutDate) { - SearchCriteria<VmWorkJobVO> sc = ExpungeWorkJobSearch.create(); - sc.setParameters("lastUpdated",cutDate); - sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS); - - expunge(sc); + public void expungeCompletedWorkJobs(final Date cutDate) { + // current DAO machenism does not support following usage + /* + SearchCriteria<VmWorkJobVO> sc = ExpungeWorkJobSearch.create(); + sc.setParameters("lastUpdated",cutDate); + sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS); + + expunge(sc); + */ + Transaction.execute(new TransactionCallbackNoReturn() { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { + TransactionLegacy txn = TransactionLegacy.currentTxn(); + + PreparedStatement pstmt = null; + try { + pstmt = txn.prepareAutoCloseStatement( + "DELETE FROM vm_work_job WHERE id IN (SELECT id FROM async_job WHERE job_dispatcher='VmWorkJobDispatcher' AND job_status != 0 AND last_updated < ?)"); + pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + + pstmt.execute(); + } catch (SQLException e) { + } catch (Throwable e) { + } + + try { + pstmt = txn.prepareAutoCloseStatement( + "DELETE FROM async_job WHERE job_dispatcher='VmWorkJobDispatcher' AND job_status != 0 AND last_updated < ?"); + pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + + pstmt.execute(); + } catch (SQLException e) { + } catch (Throwable e) { + } + } + }); } + + @Override + public void expungeLeftoverWorkJobs(final long msid) { + // current DAO machenism does not support following usage + /* + SearchCriteria<VmWorkJobVO> sc = ExpungePlaceHolderWorkJobSearch.create(); + sc.setParameters("dispatcher", "VmWorkJobPlaceHolder"); + sc.setParameters("msid", msid); + + expunge(sc); + */ + Transaction.execute(new TransactionCallbackNoReturn() { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { + TransactionLegacy txn = TransactionLegacy.currentTxn(); + + PreparedStatement pstmt = null; + try { + pstmt = txn.prepareAutoCloseStatement( + "DELETE FROM vm_work_job WHERE id IN (SELECT id FROM async_job WHERE (job_dispatcher='VmWorkJobPlaceHolder' OR job_dispatcher='VmWorkJobDispatcher') AND job_init_msid=?)"); + pstmt.setLong(1, msid); + + pstmt.execute(); + } catch (SQLException e) { + } catch (Throwable e) { + } + + try { + pstmt = txn.prepareAutoCloseStatement( + "DELETE FROM async_job WHERE (job_dispatcher='VmWorkJobPlaceHolder' OR job_dispatcher='VmWorkJobDispatcher') AND job_init_msid=?"); + pstmt.setLong(1, msid); + + pstmt.execute(); + } catch (SQLException e) { + } catch (Throwable e) { + } + } + }); + } } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/be0d6885/server/src/com/cloud/storage/VolumeApiServiceImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/storage/VolumeApiServiceImpl.java b/server/src/com/cloud/storage/VolumeApiServiceImpl.java index 0c89541..337aea1 100644 --- a/server/src/com/cloud/storage/VolumeApiServiceImpl.java +++ b/server/src/com/cloud/storage/VolumeApiServiceImpl.java @@ -59,6 +59,7 @@ import org.apache.cloudstack.framework.jobs.AsyncJob; import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext; import org.apache.cloudstack.framework.jobs.AsyncJobManager; import org.apache.cloudstack.framework.jobs.Outcome; +import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao; import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO; import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl; import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO; @@ -73,6 +74,7 @@ import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao; import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao; import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO; import org.apache.cloudstack.storage.image.datastore.ImageStoreEntity; +import org.apache.cloudstack.utils.identity.ManagementServerNode; import com.cloud.agent.AgentManager; import com.cloud.agent.api.Answer; @@ -324,6 +326,9 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic @Inject protected AsyncJobManager _jobMgr; + @Inject + protected VmWorkJobDao _workJobDao; + VmWorkJobHandlerProxy _jobHandlerProxy = new VmWorkJobHandlerProxy(this); // TODO @@ -910,8 +915,19 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateResizeVolume(volume.getId(), currentSize, newSize, - newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk); + + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + placeHolder = createPlaceHolderWork(userVm.getId()); + } + try { + return orchestrateResizeVolume(volume.getId(), currentSize, newSize, + newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } + } else { Outcome<Volume> outcome = resizeVolumeThroughJobQueue(userVm.getId(), volume.getId(), currentSize, newSize, newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk); @@ -1101,7 +1117,18 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateAttachVolumeToVM(command.getVirtualMachineId(), command.getId(), command.getDeviceId()); + + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + placeHolder = createPlaceHolderWork(command.getVirtualMachineId()); + } + try { + return orchestrateAttachVolumeToVM(command.getVirtualMachineId(), command.getId(), command.getDeviceId()); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } + } else { Outcome<Volume> outcome = attachVolumeToVmThroughJobQueue(command.getVirtualMachineId(), command.getId(), command.getDeviceId()); @@ -1412,7 +1439,16 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateDetachVolumeFromVM(vmId, volumeId); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + placeHolder = createPlaceHolderWork(vmId); + } + try { + return orchestrateDetachVolumeFromVM(vmId, volumeId); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } } else { Outcome<Volume> outcome = detachVolumeFromVmThroughJobQueue(vmId, volumeId); @@ -1577,7 +1613,18 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateMigrateVolume(vol.getId(), destPool.getId(), liveMigrateVolume); + + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + return orchestrateMigrateVolume(vol.getId(), destPool.getId(), liveMigrateVolume); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } + } else { Outcome<Volume> outcome = migrateVolumeThroughJobQueue(vm.getId(), vol.getId(), destPool.getId(), liveMigrateVolume); @@ -1668,7 +1715,18 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateTakeVolumeSnapshot(volumeId, policyId, snapshotId, account, quiescevm); + + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + return orchestrateTakeVolumeSnapshot(volumeId, policyId, snapshotId, account, quiescevm); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } + } else { Outcome<Snapshot> outcome = takeVolumeSnapshotThroughJobQueue(vm.getId(), volumeId, policyId, snapshotId, account.getId(), quiescevm); @@ -2197,7 +2255,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic workJob.setAccountId(callingAccount.getId()); workJob.setUserId(callingUser.getId()); workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -2244,7 +2302,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic workJob.setAccountId(callingAccount.getId()); workJob.setUserId(callingUser.getId()); workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -2289,7 +2347,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic workJob.setAccountId(callingAccount.getId()); workJob.setUserId(callingUser.getId()); workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -2334,7 +2392,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic workJob.setAccountId(callingAccount.getId()); workJob.setUserId(callingUser.getId()); workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -2379,7 +2437,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic workJob.setAccountId(callingAccount.getId()); workJob.setUserId(callingUser.getId()); workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -2436,4 +2494,23 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic public Pair<JobInfo.Status, String> handleVmWorkJob(VmWork work) throws Exception { return _jobHandlerProxy.handleVmWorkJob(work); } + + private VmWorkJobVO createPlaceHolderWork(long instanceId) { + VmWorkJobVO workJob = new VmWorkJobVO(""); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER); + workJob.setCmd(""); + workJob.setCmdInfo(""); + + workJob.setAccountId(0); + workJob.setUserId(0); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(instanceId); + workJob.setInitMsid(ManagementServerNode.getManagementServerId()); + + _workJobDao.persist(workJob); + + return workJob; + } } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/be0d6885/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java b/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java index 5bf248d..e273e1a 100644 --- a/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java +++ b/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java @@ -41,10 +41,12 @@ import org.apache.cloudstack.framework.jobs.AsyncJob; import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext; import org.apache.cloudstack.framework.jobs.AsyncJobManager; import org.apache.cloudstack.framework.jobs.Outcome; +import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao; import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO; import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl; import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO; import org.apache.cloudstack.jobs.JobInfo; +import org.apache.cloudstack.utils.identity.ManagementServerNode; import com.cloud.event.ActionEvent; import com.cloud.event.EventTypes; @@ -124,6 +126,9 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana @Inject AsyncJobManager _jobMgr; + @Inject + VmWorkJobDao _workJobDao; + VmWorkJobHandlerProxy _jobHandlerProxy = new VmWorkJobHandlerProxy(this); int _vmSnapshotMax; @@ -367,7 +372,17 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateCreateVMSnapshot(vmId, vmSnapshotId, quiescevm); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + placeHolder = createPlaceHolderWork(vmId); + } + try { + return orchestrateCreateVMSnapshot(vmId, vmSnapshotId, quiescevm); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } + } else { Outcome<VMSnapshot> outcome = createVMSnapshotThroughJobQueue(vmId, vmSnapshotId, quiescevm); @@ -455,7 +470,16 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateDeleteVMSnapshot(vmSnapshotId); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + placeHolder = createPlaceHolderWork(vmSnapshot.getVmId()); + } + try { + return orchestrateDeleteVMSnapshot(vmSnapshotId); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } } else { Outcome<VMSnapshot> outcome = deleteVMSnapshotThroughJobQueue(vmSnapshot.getVmId(), vmSnapshotId); @@ -564,7 +588,18 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateRevertToVMSnapshot(vmSnapshotId); + + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + placeHolder = createPlaceHolderWork(vmSnapshotVo.getVmId()); + } + try { + return orchestrateRevertToVMSnapshot(vmSnapshotId); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } + } else { Outcome<VMSnapshot> outcome = revertToVMSnapshotThroughJobQueue(vmSnapshotVo.getVmId(), vmSnapshotId); @@ -694,7 +729,17 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateDeleteAllVMSnapshots(vmId, type); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + placeHolder = createPlaceHolderWork(vmId); + } + try { + return orchestrateDeleteAllVMSnapshots(vmId, type); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } + } else { Outcome<VirtualMachine> outcome = deleteAllVMSnapshotsThroughJobQueue(vmId, type); @@ -838,7 +883,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana workJob.setAccountId(callingAccount.getId()); workJob.setUserId(callingUser.getId()); workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -882,7 +927,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana workJob.setAccountId(callingAccount.getId()); workJob.setUserId(callingUser.getId()); workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -926,7 +971,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana workJob.setAccountId(callingAccount.getId()); workJob.setUserId(callingUser.getId()); workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -970,7 +1015,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana workJob.setAccountId(callingAccount.getId()); workJob.setUserId(callingUser.getId()); workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -1019,4 +1064,23 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana public Pair<JobInfo.Status, String> handleVmWorkJob(VmWork work) throws Exception { return _jobHandlerProxy.handleVmWorkJob(work); } + + private VmWorkJobVO createPlaceHolderWork(long instanceId) { + VmWorkJobVO workJob = new VmWorkJobVO(""); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER); + workJob.setCmd(""); + workJob.setCmdInfo(""); + + workJob.setAccountId(0); + workJob.setUserId(0); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(instanceId); + workJob.setInitMsid(ManagementServerNode.getManagementServerId()); + + _workJobDao.persist(workJob); + + return workJob; + } }