Updated Branches:
  refs/heads/vmsync e2edae171 -> 0bfc817bc

Handle transitional states across management server restart


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/0bfc817b
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/0bfc817b
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/0bfc817b

Branch: refs/heads/vmsync
Commit: 0bfc817bc6b8c2887e9ff83f22cd75263cffbb54
Parents: e2edae1
Author: Kelven Yang <kelv...@gmail.com>
Authored: Wed Jun 19 17:47:14 2013 -0700
Committer: Kelven Yang <kelv...@gmail.com>
Committed: Wed Jun 19 17:47:14 2013 -0700

----------------------------------------------------------------------
 api/src/com/cloud/vm/VirtualMachine.java        |   6 +-
 .../com/cloud/vm/VirtualMachineManagerImpl.java | 201 ++++++++++---------
 .../cloudstack/vm/jobs/VmWorkJobDaoImpl.java    |  12 +-
 .../framework/jobs/dao/AsyncJobDaoImpl.java     |   2 +-
 4 files changed, 123 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0bfc817b/api/src/com/cloud/vm/VirtualMachine.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/vm/VirtualMachine.java 
b/api/src/com/cloud/vm/VirtualMachine.java
index 3ac9aed..fe0ea76 100755
--- a/api/src/com/cloud/vm/VirtualMachine.java
+++ b/api/src/com/cloud/vm/VirtualMachine.java
@@ -108,8 +108,12 @@ public interface VirtualMachine extends RunningOn, 
ControlledEntity, Identity, I
             s_fsm.addTransition(State.Stopping, 
VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
             s_fsm.addTransition(State.Stopped, 
VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
             s_fsm.addTransition(State.Running, 
VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
-
             s_fsm.addTransition(State.Migrating, 
VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
+
+            s_fsm.addTransition(State.Starting, 
VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped);
+            s_fsm.addTransition(State.Stopping, 
VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped);
+            s_fsm.addTransition(State.Running, 
VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped);
+            s_fsm.addTransition(State.Migrating, 
VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped);
         }
         
         public static boolean isVmStarted(State oldState, Event e, State 
newState) {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0bfc817b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java 
b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
index 8989235..141fcb4 100755
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -457,12 +457,12 @@ public class VirtualMachineManagerImpl extends 
ManagerBase implements VirtualMac
         _vmDao.remove(vm.getId());
     }
     
-    
-
     @Override
     public boolean start() {
         _executor.scheduleAtFixedRate(new TransitionTask(), 
_pingInterval.value(), _pingInterval.value(), TimeUnit.SECONDS);
         _executor.scheduleAtFixedRate(new CleanupTask(), 
_pingInterval.value()*2, _pingInterval.value()*2, TimeUnit.SECONDS);
+
+        // cancel jobs left-over from last run
         cancelWorkItems(_nodeId);
         
         return true;
@@ -1801,42 +1801,42 @@ public class VirtualMachineManagerImpl extends 
ManagerBase implements VirtualMac
 
     protected void cancelWorkItems(long nodeId) {
         /*
-                GlobalLock scanLock = 
GlobalLock.getInternLock("vmmgr.cancel.workitem");
+            GlobalLock scanLock = 
GlobalLock.getInternLock("vmmgr.cancel.workitem");
 
-                try {
-                    if (scanLock.lock(3)) {
-                        try {
-                            List<VmWorkJobVO> works = 
_workDao.listWorkInProgressFor(nodeId);
-                            for (VmWorkJobVO work : works) {
-                                s_logger.info("Handling unfinished work item: 
" + work);
-                                try {
-                                    VMInstanceVO vm = 
_vmDao.findById(work.getInstanceId());
-                                    if (vm != null) {
-                                        if (work.getType() == State.Starting) {
-                                            _haMgr.scheduleRestart(vm, true);
-                                            
work.setManagementServerId(_nodeId);
-                                            _workDao.update(work.getId(), 
work);
-                                        } else if (work.getType() == 
State.Stopping) {
-                                            _haMgr.scheduleStop(vm, 
vm.getHostId(), WorkType.CheckStop);
-                                            
work.setManagementServerId(_nodeId);
-                                            _workDao.update(work.getId(), 
work);
-                                        } else if (work.getType() == 
State.Migrating) {
-                                            _haMgr.scheduleMigration(vm);
-                                            work.setStep(Step.Done);
-                                            _workDao.update(work.getId(), 
work);
-                                        }
+            try {
+                if (scanLock.lock(3)) {
+                    try {
+                        List<VmWorkJobVO> works = 
_workDao.listWorkInProgressFor(nodeId);
+                        for (VmWorkJobVO work : works) {
+                            s_logger.info("Handling unfinished work item: " + 
work);
+                            try {
+                                VMInstanceVO vm = 
_vmDao.findById(work.getInstanceId());
+                                if (vm != null) {
+                                    if (work.getType() == State.Starting) {
+                                        _haMgr.scheduleRestart(vm, true);
+                                        work.setManagementServerId(_nodeId);
+                                        _workDao.update(work.getId(), work);
+                                    } else if (work.getType() == 
State.Stopping) {
+                                        _haMgr.scheduleStop(vm, 
vm.getHostId(), WorkType.CheckStop);
+                                        work.setManagementServerId(_nodeId);
+                                        _workDao.update(work.getId(), work);
+                                    } else if (work.getType() == 
State.Migrating) {
+                                        _haMgr.scheduleMigration(vm);
+                                        work.setStep(Step.Done);
+                                        _workDao.update(work.getId(), work);
                                     }
-                                } catch (Exception e) {
-                                    s_logger.error("Error while handling " + 
work, e);
                                 }
+                            } catch (Exception e) {
+                                s_logger.error("Error while handling " + work, 
e);
                             }
-                        } finally {
-                            scanLock.unlock();
                         }
+                    } finally {
+                        scanLock.unlock();
                     }
-                } finally {
-                    scanLock.releaseRef();
                 }
+            } finally {
+                scanLock.releaseRef();
+            }
         */
     }
 
@@ -3475,7 +3475,7 @@ public class VirtualMachineManagerImpl extends 
ManagerBase implements VirtualMac
                }
                _alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, 
vm.getDataCenterId(), vm.getPodIdToDeployIn(),
                                VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() 
+ "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + " -> 
Running) from out-of-context transition. VM network environment may need to be 
reset");
-                       break;
+               break;
                
        case Destroyed :
        case Expunging :
@@ -3501,30 +3501,29 @@ public class VirtualMachineManagerImpl extends 
ManagerBase implements VirtualMac
     
     private void handlePowerOffReportWithNoPendingJobsOnVM(VMInstanceVO vm) {
 
-       // TODO :
        //      1) handle left-over transitional VM states
        //      2) handle out of sync stationary states, schedule force-stop to 
release resources
        //
        switch(vm.getState()) {
        case Starting :
-               break;
-               
-       case Running :
-               break;
-               
        case Stopping :
-               break;
-               
        case Stopped :
-               break;
+       case Migrating :
+               try {
+                       stateTransitTo(vm, 
VirtualMachine.Event.FollowAgentPowerOffReport, vm.getPowerHostId());
+               } catch(NoTransitionException e) {
+                       s_logger.warn("Unexpected VM state transition 
exception, race-condition?", e);
+               }
+               _alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, 
vm.getDataCenterId(), vm.getPodIdToDeployIn(),
+                               VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() 
+ "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + " -> 
Stopped) from out-of-context transition.");
+               // TODO: we need to forcely release all resource allocation
+               break;
                
+       case Running :
        case Destroyed :
        case Expunging :
                break;
                
-       case Migrating :
-               break;
-               
        case Error :
        default :
                break;
@@ -3582,79 +3581,99 @@ public class VirtualMachineManagerImpl extends 
ManagerBase implements VirtualMac
     
     
     // VMs that in transitional state without recent power state report
-    @DB
     private List<Long> listStalledVMInTransitionStateOnUpHost(long hostId, 
Date cutTime) {
        String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE 
h.status = 'UP' " +
                      "AND h.id = ? AND i.power_state_update_time < ? AND 
i.host_id = h.id " +
                             "AND (i.state ='Starting' OR i.state='Stopping' OR 
i.state='Migrating') " +
-                            "AND i.id NOT IN (SELECT vm_instance_id FROM 
vm_work_job)";
+                            "AND i.id NOT IN (SELECT w.vm_instance_id FROM 
vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
        
        List<Long> l = new ArrayList<Long>();
-        Transaction txn = Transaction.currentTxn();;
-        PreparedStatement pstmt = null;
-        try {
-            pstmt = txn.prepareAutoCloseStatement(sql);
-            
-            pstmt.setLong(1, hostId);
-               pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
-            ResultSet rs = pstmt.executeQuery();
-            while(rs.next()) {
-               l.add(rs.getLong(1));
-            }
-        } catch (SQLException e) {
-        } catch (Throwable e) {
-        }
+       Transaction txn = null;
+       try {
+               txn = Transaction.open(Transaction.CLOUD_DB);
+       
+               PreparedStatement pstmt = null;
+               try {
+                   pstmt = txn.prepareAutoCloseStatement(sql);
+                   
+                   pstmt.setLong(1, hostId);
+                       pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
+                       pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal());
+                   ResultSet rs = pstmt.executeQuery();
+                   while(rs.next()) {
+                       l.add(rs.getLong(1));
+                   }
+               } catch (SQLException e) {
+               } catch (Throwable e) {
+               }
+        
+       } finally {
+               if(txn != null)
+                       txn.close();
+       }
         return l;
     }
     
     // VMs that in transitional state and recently have power state update
-    @DB
     private List<Long> listVMInTransitionStateWithRecentReportOnUpHost(long 
hostId, Date cutTime) {
        String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE 
h.status = 'UP' " +
                      "AND h.id = ? AND i.power_state_update_time > ? AND 
i.host_id = h.id " +
                             "AND (i.state ='Starting' OR i.state='Stopping' OR 
i.state='Migrating') " +
-                            "AND i.id NOT IN (SELECT vm_instance_id FROM 
vm_work_job)";
+                            "AND i.id NOT IN (SELECT w.vm_instance_id FROM 
vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
        
        List<Long> l = new ArrayList<Long>();
-        Transaction txn = Transaction.currentTxn();;
-        PreparedStatement pstmt = null;
-        try {
-            pstmt = txn.prepareAutoCloseStatement(sql);
-            
-            pstmt.setLong(1, hostId);
-               pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
-            ResultSet rs = pstmt.executeQuery();
-            while(rs.next()) {
-               l.add(rs.getLong(1));
-            }
-        } catch (SQLException e) {
-        } catch (Throwable e) {
-        }
-        return l;
+       Transaction txn = null;
+       try {
+               txn = Transaction.open(Transaction.CLOUD_DB);
+               PreparedStatement pstmt = null;
+               try {
+                   pstmt = txn.prepareAutoCloseStatement(sql);
+                   
+                   pstmt.setLong(1, hostId);
+                       pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
+                       pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal());
+                   ResultSet rs = pstmt.executeQuery();
+                   while(rs.next()) {
+                       l.add(rs.getLong(1));
+                   }
+               } catch (SQLException e) {
+               } catch (Throwable e) {
+               }
+               return l;
+       } finally {
+               if(txn != null)
+                       txn.close();
+       }
     }
     
-    @DB
     private List<Long> listStalledVMInTransitionStateOnDisconnectedHosts(Date 
cutTime) {
        String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE 
h.status != 'UP' " +
                  "AND i.power_state_update_time < ? AND i.host_id = h.id " +
                             "AND (i.state ='Starting' OR i.state='Stopping' OR 
i.state='Migrating') " +
-                            "AND i.id NOT IN (SELECT vm_instance_id FROM 
vm_work_job)";
+                            "AND i.id NOT IN (SELECT w.vm_instance_id FROM 
vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
        
        List<Long> l = new ArrayList<Long>();
-       Transaction txn = Transaction.currentTxn();;
-       PreparedStatement pstmt = null;
+       Transaction txn = null;
        try {
-              pstmt = txn.prepareAutoCloseStatement(sql);
-              
-              pstmt.setString(1, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
-              ResultSet rs = pstmt.executeQuery();
-              while(rs.next()) {
-               l.add(rs.getLong(1));
-              }
-       } catch (SQLException e) {
-       } catch (Throwable e) {
+               txn = Transaction.open(Transaction.CLOUD_DB);
+               PreparedStatement pstmt = null;
+               try {
+                      pstmt = txn.prepareAutoCloseStatement(sql);
+                      
+                      pstmt.setString(1, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
+                      pstmt.setInt(2, JobInfo.Status.IN_PROGRESS.ordinal());
+                      ResultSet rs = pstmt.executeQuery();
+                      while(rs.next()) {
+                       l.add(rs.getLong(1));
+                      }
+               } catch (SQLException e) {
+               } catch (Throwable e) {
+               }
+               return l;
+       } finally {
+               if(txn != null)
+                       txn.close();
        }
-       return l;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0bfc817b/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java
----------------------------------------------------------------------
diff --git 
a/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java 
b/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java
index 6361a23..f353357 100644
--- a/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java
+++ b/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java
@@ -44,12 +44,14 @@ public class VmWorkJobDaoImpl extends 
GenericDaoBase<VmWorkJobVO, Long> implemen
        @PostConstruct
        public void init() {
                PendingWorkJobSearch = createSearchBuilder();
+               PendingWorkJobSearch.and("jobStatus", 
PendingWorkJobSearch.entity().getStatus(), Op.EQ);
                PendingWorkJobSearch.and("vmType", 
PendingWorkJobSearch.entity().getVmType(), Op.EQ);
                PendingWorkJobSearch.and("vmInstanceId", 
PendingWorkJobSearch.entity().getVmInstanceId(), Op.EQ);
                PendingWorkJobSearch.and("step", 
PendingWorkJobSearch.entity().getStep(), Op.NEQ);
                PendingWorkJobSearch.done();
 
                PendingWorkJobByCommandSearch = createSearchBuilder();
+               PendingWorkJobByCommandSearch.and("jobStatus", 
PendingWorkJobByCommandSearch.entity().getStatus(), Op.EQ);
                PendingWorkJobByCommandSearch.and("vmType", 
PendingWorkJobByCommandSearch.entity().getVmType(), Op.EQ);
                PendingWorkJobByCommandSearch.and("vmInstanceId", 
PendingWorkJobByCommandSearch.entity().getVmInstanceId(), Op.EQ);
                PendingWorkJobByCommandSearch.and("step", 
PendingWorkJobByCommandSearch.entity().getStep(), Op.NEQ);
@@ -58,7 +60,7 @@ public class VmWorkJobDaoImpl extends 
GenericDaoBase<VmWorkJobVO, Long> implemen
                
                ExpungeWorkJobSearch = createSearchBuilder();
                ExpungeWorkJobSearch.and("lastUpdated", 
ExpungeWorkJobSearch.entity().getLastUpdated(), Op.LT);
-               ExpungeWorkJobSearch.and("status", 
ExpungeWorkJobSearch.entity().getStatus(), Op.NEQ);
+               ExpungeWorkJobSearch.and("jobStatus", 
ExpungeWorkJobSearch.entity().getStatus(), Op.NEQ);
                ExpungeWorkJobSearch.done();
        }
        
@@ -66,9 +68,9 @@ public class VmWorkJobDaoImpl extends 
GenericDaoBase<VmWorkJobVO, Long> implemen
     public VmWorkJobVO findPendingWorkJob(VirtualMachine.Type type, long 
instanceId) {
                
                SearchCriteria<VmWorkJobVO> sc = PendingWorkJobSearch.create();
+               sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
                sc.setParameters("vmType", type);
                sc.setParameters("vmInstanceId", instanceId);
-               sc.setParameters("step", Step.Done);
                
                Filter filter = new Filter(VmWorkJobVO.class, "created", true, 
null, null);
                List<VmWorkJobVO> result = this.listBy(sc, filter);
@@ -82,9 +84,9 @@ public class VmWorkJobDaoImpl extends 
GenericDaoBase<VmWorkJobVO, Long> implemen
     public List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, 
long instanceId) {
                
                SearchCriteria<VmWorkJobVO> sc = PendingWorkJobSearch.create();
+               sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
                sc.setParameters("vmType", type);
                sc.setParameters("vmInstanceId", instanceId);
-               sc.setParameters("step", Step.Done);
                
                Filter filter = new Filter(VmWorkJobVO.class, "created", true, 
null, null);
                return this.listBy(sc, filter);
@@ -94,9 +96,9 @@ public class VmWorkJobDaoImpl extends 
GenericDaoBase<VmWorkJobVO, Long> implemen
     public List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, 
long instanceId, String jobCmd) {
                
                SearchCriteria<VmWorkJobVO> sc = 
PendingWorkJobByCommandSearch.create();
+               sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
                sc.setParameters("vmType", type);
                sc.setParameters("vmInstanceId", instanceId);
-               sc.setParameters("step", Step.Done);
                sc.setParameters("cmd", jobCmd);
                
                Filter filter = new Filter(VmWorkJobVO.class, "created", true, 
null, null);
@@ -115,7 +117,7 @@ public class VmWorkJobDaoImpl extends 
GenericDaoBase<VmWorkJobVO, Long> implemen
     public void expungeCompletedWorkJobs(Date cutDate) {
                SearchCriteria<VmWorkJobVO> sc = ExpungeWorkJobSearch.create();
                sc.setParameters("lastUpdated",cutDate);
-        sc.setParameters("status", JobInfo.Status.IN_PROGRESS);
+        sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
                
                expunge(sc);
        }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0bfc817b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
----------------------------------------------------------------------
diff --git 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
index 9b1eda6..fb3845c 100644
--- 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
+++ 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
@@ -180,7 +180,7 @@ public class AsyncJobDaoImpl extends 
GenericDaoBase<AsyncJobVO, Long> implements
        public void resetJobProcess(long msid, int jobResultCode, String 
jobResultMessage) {
         String sql = "UPDATE async_job SET job_status=" + 
JobInfo.Status.FAILED.ordinal() + ", job_result_code=" + jobResultCode
                 + ", job_result='" + jobResultMessage + "' where job_status=" 
+ JobInfo.Status.IN_PROGRESS.ordinal()
-                + " AND (job_complete_msid=? OR (job_complete_msid IS NULL AND 
job_init_msid=?))";
+                + " AND (job_executing_msid=? OR (job_executing_msid IS NULL 
AND job_init_msid=?))";
                
         Transaction txn = Transaction.currentTxn();
         PreparedStatement pstmt = null;

Reply via email to