Updated Branches: refs/heads/vmsync 2892469cf -> 0f26d5a05
Add skeleton to support out-of-band sync and recovery from left-over transitional VM states Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/0f26d5a0 Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/0f26d5a0 Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/0f26d5a0 Branch: refs/heads/vmsync Commit: 0f26d5a050e9fd4e14c9d459828dc070da4ea493 Parents: 2892469 Author: Kelven Yang <kelv...@gmail.com> Authored: Wed Jun 12 17:14:54 2013 -0700 Committer: Kelven Yang <kelv...@gmail.com> Committed: Wed Jun 12 17:14:54 2013 -0700 ---------------------------------------------------------------------- .../apache/cloudstack/vm/jobs/VmWorkJobDao.java | 1 - .../com/cloud/vm/VirtualMachineManagerImpl.java | 279 +++++++------------ .../vm/VirtualMachinePowerStateSyncImpl.java | 22 +- .../cloudstack/messagebus/TopicConstants.java | 6 +- 4 files changed, 118 insertions(+), 190 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0f26d5a0/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDao.java ---------------------------------------------------------------------- diff --git a/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDao.java b/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDao.java index 735a04a..9ea1857 100644 --- a/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDao.java +++ b/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDao.java @@ -23,7 +23,6 @@ import org.apache.cloudstack.vm.jobs.VmWorkJobVO.Step; import com.cloud.utils.db.GenericDao; import com.cloud.vm.VirtualMachine; -import com.cloud.vm.VirtualMachine.Type; public interface VmWorkJobDao extends GenericDao<VmWorkJobVO, Long> { VmWorkJobVO findPendingWorkJob(VirtualMachine.Type type, long instanceId); http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0f26d5a0/server/src/com/cloud/vm/VirtualMachineManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/vm/VirtualMachineManagerImpl.java b/server/src/com/cloud/vm/VirtualMachineManagerImpl.java index 0e320e2..af29307 100755 --- a/server/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/server/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -41,6 +41,8 @@ import org.apache.cloudstack.engine.subsystem.api.storage.StoragePoolAllocator; import org.apache.cloudstack.framework.jobs.AsyncJobConstants; import org.apache.cloudstack.framework.jobs.AsyncJobManager; import org.apache.cloudstack.framework.messagebus.MessageBus; +import org.apache.cloudstack.framework.messagebus.MessageDispatcher; +import org.apache.cloudstack.framework.messagebus.MessageHandler; import org.apache.cloudstack.messagebus.TopicConstants; import org.apache.cloudstack.storage.datastore.db.StoragePoolVO; import org.apache.cloudstack.vm.jobs.VmWorkJobDao; @@ -238,6 +240,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac protected ClusterDetailsDao _clusterDetailsDao; @Inject protected UserVmDetailsDao _uservmDetailsDao; + + @Inject + protected VMInstanceDao _instanceDao; @Inject protected ConfigurationDao _configDao; @@ -427,14 +432,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac _vmDao.remove(vm.getId()); } + + @Override public boolean start() { _executor.scheduleAtFixedRate(new CleanupTask(), _cleanupInterval, _cleanupInterval, TimeUnit.SECONDS); cancelWorkItems(_nodeId); + return true; } - + @Override public boolean stop() { return true; @@ -469,6 +477,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac .and(rvsEntity.getInstanceId(), SearchCriteria.Op.EQ, "instance") .and(rvsEntity.getDeviceId(), SearchCriteria.Op.EQ).values(0) .done(); + + + _messageBus.subscribe(TopicConstants.VM_POWER_STATE, MessageDispatcher.getDispatcher(this)); return true; } @@ -1311,176 +1322,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } } - /* - @Override - public boolean advanceStop(String vmUuid, boolean forced, User user, Account account) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException { - State state = vm.getState(); - if (state == State.Stopped) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("VM is already stopped: " + vm); - } - return true; - } - - if (state == State.Destroyed || state == State.Expunging || state == State.Error) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Stopped called on " + vm + " but the state is " + state); - } - return true; - } - // grab outstanding work item if any - VmWorkJobVO work = _workDao.findByOutstandingWork(vm.getId(), vm.getState()); - if (work != null) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Found an outstanding work item for this vm " + vm + " with state:" + vm.getState() + ", work id:" + work.getId()); - } - } - Long hostId = vm.getHostId(); - if (hostId == null) { - if (!forced) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("HostId is null but this is not a forced stop, cannot stop vm " + vm + " with state:" + vm.getState()); - } - return false; - } - try { - stateTransitTo(vm, Event.AgentReportStopped, null, null); - } catch (NoTransitionException e) { - s_logger.warn(e.getMessage()); - } - // mark outstanding work item if any as done - if (work != null) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Updating work item to Done, id:" + work.getId()); - } - work.setStep(Step.Done); - _workDao.update(work.getId(), work); - } - return true; - } - - VirtualMachineGuru<T> vmGuru = getVmGuru(vm); - VirtualMachineProfile profile = new VirtualMachineProfileImpl(vm); - - try { - if (!stateTransitTo(vm, Event.StopRequested, vm.getHostId())) { - throw new ConcurrentOperationException("VM is being operated on."); - } - } catch (NoTransitionException e1) { - if (!forced) { - throw new CloudRuntimeException("We cannot stop " + vm + " when it is in state " + vm.getState()); - } - boolean doCleanup = false; - if (s_logger.isDebugEnabled()) { - s_logger.debug("Unable to transition the state but we're moving on because it's forced stop"); - } - if (state == State.Starting || state == State.Migrating) { - if (work != null) { - doCleanup = true; - } else { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Unable to cleanup VM: " + vm + " ,since outstanding work item is not found"); - } - throw new CloudRuntimeException("Work item not found, We cannot stop " + vm + " when it is in state " + vm.getState()); - } - } else if (state == State.Stopping) { - doCleanup = true; - } - - if (doCleanup) { - if (cleanup(vmGuru, new VirtualMachineProfileImpl(vm), work, Event.StopRequested, forced, user, account)) { - try { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Updating work item to Done, id:" + work.getId()); - } - return changeState(vm, Event.AgentReportStopped, null, work, Step.Done); - } catch (NoTransitionException e) { - s_logger.warn("Unable to cleanup " + vm); - return false; - } - } else { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Failed to cleanup VM: " + vm); - } - throw new CloudRuntimeException("Failed to cleanup " + vm + " , current state " + vm.getState()); - } - } - } - - if (vm.getState() != State.Stopping) { - throw new CloudRuntimeException("We cannot proceed with stop VM " + vm + " since it is not in 'Stopping' state, current state: " + vm.getState()); - } - - vmGuru.prepareStop(profile); - - StopCommand stop = new StopCommand(vm); - boolean stopped = false; - StopAnswer answer = null; - try { - answer = (StopAnswer) _agentMgr.send(vm.getHostId(), stop); - stopped = answer.getResult(); - if (!stopped) { - throw new CloudRuntimeException("Unable to stop the virtual machine due to " + answer.getDetails()); - } - vmGuru.finalizeStop(profile, answer); - - } catch (AgentUnavailableException e) { - s_logger.warn("Unable to stop vm, agent unavailable: " + e.toString()); - } catch (OperationTimedoutException e) { - s_logger.warn("Unable to stop vm, operation timed out: " + e.toString()); - } finally { - if (!stopped) { - if (!forced) { - s_logger.warn("Unable to stop vm " + vm); - try { - stateTransitTo(vm, Event.OperationFailed, vm.getHostId()); - } catch (NoTransitionException e) { - s_logger.warn("Unable to transition the state " + vm); - } - return false; - } else { - s_logger.warn("Unable to actually stop " + vm + " but continue with release because it's a force stop"); - vmGuru.finalizeStop(profile, answer); - } - } - } - - if (s_logger.isDebugEnabled()) { - s_logger.debug(vm + " is stopped on the host. Proceeding to release resource held."); - } - - try { - _networkMgr.release(profile, forced); - s_logger.debug("Successfully released network resources for the vm " + vm); - } catch (Exception e) { - s_logger.warn("Unable to release some network resources.", e); - } - - try { - if (vm.getHypervisorType() != HypervisorType.BareMetal) { - this.volumeMgr.release(profile); - s_logger.debug("Successfully released storage resources for the vm " + vm); - } - } catch (Exception e) { - s_logger.warn("Unable to release storage resources.", e); - } - - try { - if (work != null) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Updating the outstanding work item to Done, id:" + work.getId()); - } - work.setStep(Step.Done); - _workDao.update(work.getId(), work); - } - - return stateTransitTo(vm, Event.OperationSucceeded, null); - } catch (NoTransitionException e) { - s_logger.warn(e.getMessage()); - return false; - } - } - */ private void setStateMachine() { _stateMachine = VirtualMachine.State.getStateMachine(); } @@ -3569,5 +3410,101 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return false; } + + @MessageHandler(topic=TopicConstants.VM_POWER_STATE) + private void HandlePownerStateReport(Object target, String subject, String senderAddress, Object args) { + assert(args != null); + Long vmId = (Long)args; + + List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vmId); + if(pendingWorkJobs.size() == 0) { + // there is no pending operation job + VMInstanceVO vm = _instanceDao.findById(vmId); + if(vm != null) { + switch(vm.getPowerState()) { + case PowerOn : + HandlePowerOnReportWithNoPendingJobsOnVM(vm); + break; + + case PowerOff : + HandlePowerOffReportWithNoPendingJobsOnVM(vm); + break; + + // PowerUnknown shouldn't be reported, it is a derived + // VM power state from host state (host un-reachable + case PowerUnknown : + default : + assert(false); + break; + } + } else { + s_logger.warn("VM " + vmId + " no longer exists when processing VM state report"); + } + } else { + // TODO, wake-up signalling + } + } + + private void HandlePowerOnReportWithNoPendingJobsOnVM(VMInstanceVO vm) { + // TODO : + // 1) handle left-over transitional VM states + // 2) handle out of band VM live migration + // 3) handle out of sync stationary states, marking VM from Stopped to Running with + // alert messages + + switch(vm.getState()) { + case Starting: + break; + + case Running : + break; + + case Stopping : + case Stopped : + break; + + case Destroyed : + case Expunging : + break; + + case Migrating : + break; + + case Error : + default : + break; + } + } + + private void HandlePowerOffReportWithNoPendingJobsOnVM(VMInstanceVO vm) { + + // TODO : + // 1) handle left-over transitional VM states + // 2) handle out of sync stationary states, schedule force-stop to release resources + // + switch(vm.getState()) { + case Starting: + break; + + case Running : + break; + + case Stopping : + case Stopped : + break; + + case Destroyed : + case Expunging : + break; + + case Migrating : + break; + + case Error : + default : + break; + } + } } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0f26d5a0/server/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java b/server/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java index 96ecd7d..9273ed0 100644 --- a/server/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java +++ b/server/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java @@ -66,6 +66,8 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat } private void processReport(long hostId, Map<Long, VirtualMachine.PowerState> translatedInfo) { + + for(Map.Entry<Long, VirtualMachine.PowerState> entry : translatedInfo.entrySet()) { if(s_logger.isDebugEnabled()) @@ -79,6 +81,11 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat _messageBus.publish(null, TopicConstants.VM_POWER_STATE, PublishScope.GLOBAL, entry.getKey()); } } + + // + // TODO + // 1) publish missing report (if VM is missing from host report) for KVM/XenServer + // } private Map<Long, VirtualMachine.PowerState> convertHostPingInfos(Map<String, PowerState> states) { @@ -121,20 +128,5 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat private VMInstanceVO findVM(String vmName) { return _instanceDao.findVMByInstanceName(vmName); -// Collection<VirtualMachineGuru> vmGurus = _vmMgr.getRegisteredGurus(); -// -// for (VirtualMachineGuru vmGuru : vmGurus) { -// VMInstanceVO vm = vmGuru.findByName(vmName); -// if (vm != null) -// return vm; -// -// Long id = vmGuru.convertToId(vmName); -// if (id != null) { -// vm = vmGuru.findById(id); -// if(vm != null) -// return vm; -// } -// } -// return null; } } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0f26d5a0/server/src/org/apache/cloudstack/messagebus/TopicConstants.java ---------------------------------------------------------------------- diff --git a/server/src/org/apache/cloudstack/messagebus/TopicConstants.java b/server/src/org/apache/cloudstack/messagebus/TopicConstants.java index d90f602..6f465f4 100644 --- a/server/src/org/apache/cloudstack/messagebus/TopicConstants.java +++ b/server/src/org/apache/cloudstack/messagebus/TopicConstants.java @@ -18,9 +18,9 @@ package org.apache.cloudstack.messagebus; public interface TopicConstants { // VM power state messages on message bus - public static final String VM_POWER_STATE = "vm.powerstate"; + public static final String VM_POWER_STATE = "vm.powerstate"; // args <Long> vmid // job messages on message bus - public static final String JOB_HEARTBEAT = "job.heartbeat"; - public static final String JOB_STATE = "job.state"; + public static final String JOB_HEARTBEAT = "job.heartbeat"; // args <Long> jobid + public static final String JOB_STATE = "job.state"; // args <Long> jobid }