Updated Branches: refs/heads/vmsync e2edae171 -> 0bfc817bc
Handle transitional states across management server restart Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/0bfc817b Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/0bfc817b Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/0bfc817b Branch: refs/heads/vmsync Commit: 0bfc817bc6b8c2887e9ff83f22cd75263cffbb54 Parents: e2edae1 Author: Kelven Yang <kelv...@gmail.com> Authored: Wed Jun 19 17:47:14 2013 -0700 Committer: Kelven Yang <kelv...@gmail.com> Committed: Wed Jun 19 17:47:14 2013 -0700 ---------------------------------------------------------------------- api/src/com/cloud/vm/VirtualMachine.java | 6 +- .../com/cloud/vm/VirtualMachineManagerImpl.java | 201 ++++++++++--------- .../cloudstack/vm/jobs/VmWorkJobDaoImpl.java | 12 +- .../framework/jobs/dao/AsyncJobDaoImpl.java | 2 +- 4 files changed, 123 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0bfc817b/api/src/com/cloud/vm/VirtualMachine.java ---------------------------------------------------------------------- diff --git a/api/src/com/cloud/vm/VirtualMachine.java b/api/src/com/cloud/vm/VirtualMachine.java index 3ac9aed..fe0ea76 100755 --- a/api/src/com/cloud/vm/VirtualMachine.java +++ b/api/src/com/cloud/vm/VirtualMachine.java @@ -108,8 +108,12 @@ public interface VirtualMachine extends RunningOn, ControlledEntity, Identity, I s_fsm.addTransition(State.Stopping, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running); s_fsm.addTransition(State.Stopped, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running); s_fsm.addTransition(State.Running, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running); - s_fsm.addTransition(State.Migrating, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running); + + s_fsm.addTransition(State.Starting, VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped); + s_fsm.addTransition(State.Stopping, VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped); + s_fsm.addTransition(State.Running, VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped); + s_fsm.addTransition(State.Migrating, VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped); } public static boolean isVmStarted(State oldState, Event e, State newState) { http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0bfc817b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java ---------------------------------------------------------------------- diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java index 8989235..141fcb4 100755 --- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -457,12 +457,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac _vmDao.remove(vm.getId()); } - - @Override public boolean start() { _executor.scheduleAtFixedRate(new TransitionTask(), _pingInterval.value(), _pingInterval.value(), TimeUnit.SECONDS); _executor.scheduleAtFixedRate(new CleanupTask(), _pingInterval.value()*2, _pingInterval.value()*2, TimeUnit.SECONDS); + + // cancel jobs left-over from last run cancelWorkItems(_nodeId); return true; @@ -1801,42 +1801,42 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac protected void cancelWorkItems(long nodeId) { /* - GlobalLock scanLock = GlobalLock.getInternLock("vmmgr.cancel.workitem"); + GlobalLock scanLock = GlobalLock.getInternLock("vmmgr.cancel.workitem"); - try { - if (scanLock.lock(3)) { - try { - List<VmWorkJobVO> works = _workDao.listWorkInProgressFor(nodeId); - for (VmWorkJobVO work : works) { - s_logger.info("Handling unfinished work item: " + work); - try { - VMInstanceVO vm = _vmDao.findById(work.getInstanceId()); - if (vm != null) { - if (work.getType() == State.Starting) { - _haMgr.scheduleRestart(vm, true); - work.setManagementServerId(_nodeId); - _workDao.update(work.getId(), work); - } else if (work.getType() == State.Stopping) { - _haMgr.scheduleStop(vm, vm.getHostId(), WorkType.CheckStop); - work.setManagementServerId(_nodeId); - _workDao.update(work.getId(), work); - } else if (work.getType() == State.Migrating) { - _haMgr.scheduleMigration(vm); - work.setStep(Step.Done); - _workDao.update(work.getId(), work); - } + try { + if (scanLock.lock(3)) { + try { + List<VmWorkJobVO> works = _workDao.listWorkInProgressFor(nodeId); + for (VmWorkJobVO work : works) { + s_logger.info("Handling unfinished work item: " + work); + try { + VMInstanceVO vm = _vmDao.findById(work.getInstanceId()); + if (vm != null) { + if (work.getType() == State.Starting) { + _haMgr.scheduleRestart(vm, true); + work.setManagementServerId(_nodeId); + _workDao.update(work.getId(), work); + } else if (work.getType() == State.Stopping) { + _haMgr.scheduleStop(vm, vm.getHostId(), WorkType.CheckStop); + work.setManagementServerId(_nodeId); + _workDao.update(work.getId(), work); + } else if (work.getType() == State.Migrating) { + _haMgr.scheduleMigration(vm); + work.setStep(Step.Done); + _workDao.update(work.getId(), work); } - } catch (Exception e) { - s_logger.error("Error while handling " + work, e); } + } catch (Exception e) { + s_logger.error("Error while handling " + work, e); } - } finally { - scanLock.unlock(); } + } finally { + scanLock.unlock(); } - } finally { - scanLock.releaseRef(); } + } finally { + scanLock.releaseRef(); + } */ } @@ -3475,7 +3475,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } _alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + " -> Running) from out-of-context transition. VM network environment may need to be reset"); - break; + break; case Destroyed : case Expunging : @@ -3501,30 +3501,29 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac private void handlePowerOffReportWithNoPendingJobsOnVM(VMInstanceVO vm) { - // TODO : // 1) handle left-over transitional VM states // 2) handle out of sync stationary states, schedule force-stop to release resources // switch(vm.getState()) { case Starting : - break; - - case Running : - break; - case Stopping : - break; - case Stopped : - break; + case Migrating : + try { + stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOffReport, vm.getPowerHostId()); + } catch(NoTransitionException e) { + s_logger.warn("Unexpected VM state transition exception, race-condition?", e); + } + _alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), + VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + " -> Stopped) from out-of-context transition."); + // TODO: we need to forcely release all resource allocation + break; + case Running : case Destroyed : case Expunging : break; - case Migrating : - break; - case Error : default : break; @@ -3582,79 +3581,99 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac // VMs that in transitional state without recent power state report - @DB private List<Long> listStalledVMInTransitionStateOnUpHost(long hostId, Date cutTime) { String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " + "AND h.id = ? AND i.power_state_update_time < ? AND i.host_id = h.id " + "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " + - "AND i.id NOT IN (SELECT vm_instance_id FROM vm_work_job)"; + "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)"; List<Long> l = new ArrayList<Long>(); - Transaction txn = Transaction.currentTxn();; - PreparedStatement pstmt = null; - try { - pstmt = txn.prepareAutoCloseStatement(sql); - - pstmt.setLong(1, hostId); - pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime)); - ResultSet rs = pstmt.executeQuery(); - while(rs.next()) { - l.add(rs.getLong(1)); - } - } catch (SQLException e) { - } catch (Throwable e) { - } + Transaction txn = null; + try { + txn = Transaction.open(Transaction.CLOUD_DB); + + PreparedStatement pstmt = null; + try { + pstmt = txn.prepareAutoCloseStatement(sql); + + pstmt.setLong(1, hostId); + pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime)); + pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal()); + ResultSet rs = pstmt.executeQuery(); + while(rs.next()) { + l.add(rs.getLong(1)); + } + } catch (SQLException e) { + } catch (Throwable e) { + } + + } finally { + if(txn != null) + txn.close(); + } return l; } // VMs that in transitional state and recently have power state update - @DB private List<Long> listVMInTransitionStateWithRecentReportOnUpHost(long hostId, Date cutTime) { String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " + "AND h.id = ? AND i.power_state_update_time > ? AND i.host_id = h.id " + "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " + - "AND i.id NOT IN (SELECT vm_instance_id FROM vm_work_job)"; + "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)"; List<Long> l = new ArrayList<Long>(); - Transaction txn = Transaction.currentTxn();; - PreparedStatement pstmt = null; - try { - pstmt = txn.prepareAutoCloseStatement(sql); - - pstmt.setLong(1, hostId); - pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime)); - ResultSet rs = pstmt.executeQuery(); - while(rs.next()) { - l.add(rs.getLong(1)); - } - } catch (SQLException e) { - } catch (Throwable e) { - } - return l; + Transaction txn = null; + try { + txn = Transaction.open(Transaction.CLOUD_DB); + PreparedStatement pstmt = null; + try { + pstmt = txn.prepareAutoCloseStatement(sql); + + pstmt.setLong(1, hostId); + pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime)); + pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal()); + ResultSet rs = pstmt.executeQuery(); + while(rs.next()) { + l.add(rs.getLong(1)); + } + } catch (SQLException e) { + } catch (Throwable e) { + } + return l; + } finally { + if(txn != null) + txn.close(); + } } - @DB private List<Long> listStalledVMInTransitionStateOnDisconnectedHosts(Date cutTime) { String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status != 'UP' " + "AND i.power_state_update_time < ? AND i.host_id = h.id " + "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " + - "AND i.id NOT IN (SELECT vm_instance_id FROM vm_work_job)"; + "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)"; List<Long> l = new ArrayList<Long>(); - Transaction txn = Transaction.currentTxn();; - PreparedStatement pstmt = null; + Transaction txn = null; try { - pstmt = txn.prepareAutoCloseStatement(sql); - - pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime)); - ResultSet rs = pstmt.executeQuery(); - while(rs.next()) { - l.add(rs.getLong(1)); - } - } catch (SQLException e) { - } catch (Throwable e) { + txn = Transaction.open(Transaction.CLOUD_DB); + PreparedStatement pstmt = null; + try { + pstmt = txn.prepareAutoCloseStatement(sql); + + pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime)); + pstmt.setInt(2, JobInfo.Status.IN_PROGRESS.ordinal()); + ResultSet rs = pstmt.executeQuery(); + while(rs.next()) { + l.add(rs.getLong(1)); + } + } catch (SQLException e) { + } catch (Throwable e) { + } + return l; + } finally { + if(txn != null) + txn.close(); } - return l; } @Override http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0bfc817b/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java ---------------------------------------------------------------------- diff --git a/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java b/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java index 6361a23..f353357 100644 --- a/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java +++ b/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java @@ -44,12 +44,14 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen @PostConstruct public void init() { PendingWorkJobSearch = createSearchBuilder(); + PendingWorkJobSearch.and("jobStatus", PendingWorkJobSearch.entity().getStatus(), Op.EQ); PendingWorkJobSearch.and("vmType", PendingWorkJobSearch.entity().getVmType(), Op.EQ); PendingWorkJobSearch.and("vmInstanceId", PendingWorkJobSearch.entity().getVmInstanceId(), Op.EQ); PendingWorkJobSearch.and("step", PendingWorkJobSearch.entity().getStep(), Op.NEQ); PendingWorkJobSearch.done(); PendingWorkJobByCommandSearch = createSearchBuilder(); + PendingWorkJobByCommandSearch.and("jobStatus", PendingWorkJobByCommandSearch.entity().getStatus(), Op.EQ); PendingWorkJobByCommandSearch.and("vmType", PendingWorkJobByCommandSearch.entity().getVmType(), Op.EQ); PendingWorkJobByCommandSearch.and("vmInstanceId", PendingWorkJobByCommandSearch.entity().getVmInstanceId(), Op.EQ); PendingWorkJobByCommandSearch.and("step", PendingWorkJobByCommandSearch.entity().getStep(), Op.NEQ); @@ -58,7 +60,7 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen ExpungeWorkJobSearch = createSearchBuilder(); ExpungeWorkJobSearch.and("lastUpdated", ExpungeWorkJobSearch.entity().getLastUpdated(), Op.LT); - ExpungeWorkJobSearch.and("status", ExpungeWorkJobSearch.entity().getStatus(), Op.NEQ); + ExpungeWorkJobSearch.and("jobStatus", ExpungeWorkJobSearch.entity().getStatus(), Op.NEQ); ExpungeWorkJobSearch.done(); } @@ -66,9 +68,9 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen public VmWorkJobVO findPendingWorkJob(VirtualMachine.Type type, long instanceId) { SearchCriteria<VmWorkJobVO> sc = PendingWorkJobSearch.create(); + sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS); sc.setParameters("vmType", type); sc.setParameters("vmInstanceId", instanceId); - sc.setParameters("step", Step.Done); Filter filter = new Filter(VmWorkJobVO.class, "created", true, null, null); List<VmWorkJobVO> result = this.listBy(sc, filter); @@ -82,9 +84,9 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen public List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId) { SearchCriteria<VmWorkJobVO> sc = PendingWorkJobSearch.create(); + sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS); sc.setParameters("vmType", type); sc.setParameters("vmInstanceId", instanceId); - sc.setParameters("step", Step.Done); Filter filter = new Filter(VmWorkJobVO.class, "created", true, null, null); return this.listBy(sc, filter); @@ -94,9 +96,9 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen public List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId, String jobCmd) { SearchCriteria<VmWorkJobVO> sc = PendingWorkJobByCommandSearch.create(); + sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS); sc.setParameters("vmType", type); sc.setParameters("vmInstanceId", instanceId); - sc.setParameters("step", Step.Done); sc.setParameters("cmd", jobCmd); Filter filter = new Filter(VmWorkJobVO.class, "created", true, null, null); @@ -115,7 +117,7 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen public void expungeCompletedWorkJobs(Date cutDate) { SearchCriteria<VmWorkJobVO> sc = ExpungeWorkJobSearch.create(); sc.setParameters("lastUpdated",cutDate); - sc.setParameters("status", JobInfo.Status.IN_PROGRESS); + sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS); expunge(sc); } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0bfc817b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java index 9b1eda6..fb3845c 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java @@ -180,7 +180,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements public void resetJobProcess(long msid, int jobResultCode, String jobResultMessage) { String sql = "UPDATE async_job SET job_status=" + JobInfo.Status.FAILED.ordinal() + ", job_result_code=" + jobResultCode + ", job_result='" + jobResultMessage + "' where job_status=" + JobInfo.Status.IN_PROGRESS.ordinal() - + " AND (job_complete_msid=? OR (job_complete_msid IS NULL AND job_init_msid=?))"; + + " AND (job_executing_msid=? OR (job_executing_msid IS NULL AND job_init_msid=?))"; Transaction txn = Transaction.currentTxn(); PreparedStatement pstmt = null;