Updated Branches: refs/heads/4.2 be17ea8e9 -> e7c8a35c3
CLOUDSTACK-3190: Async jobs actions now trigger event bus messages to be published -AsyncJobManagerImpl to publish async job events when async jobs are created, updated and completed Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/e7c8a35c Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/e7c8a35c Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/e7c8a35c Branch: refs/heads/4.2 Commit: e7c8a35c3aca4722666c6f6ff6bc9caac46e17c3 Parents: be17ea8 Author: Ryan Dietrich <r...@betterservers.com> Authored: Fri Jul 5 17:43:34 2013 +0530 Committer: Murali Reddy <muralimmre...@gmail.com> Committed: Fri Jul 5 17:50:03 2013 +0530 ---------------------------------------------------------------------- api/src/com/cloud/event/EventCategory.java | 1 + server/src/com/cloud/api/ApiServer.java | 1 + .../com/cloud/async/AsyncJobManagerImpl.java | 76 ++++++++++++++++++++ 3 files changed, 78 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/e7c8a35c/api/src/com/cloud/event/EventCategory.java ---------------------------------------------------------------------- diff --git a/api/src/com/cloud/event/EventCategory.java b/api/src/com/cloud/event/EventCategory.java index cee6529..47a1cf1 100644 --- a/api/src/com/cloud/event/EventCategory.java +++ b/api/src/com/cloud/event/EventCategory.java @@ -52,4 +52,5 @@ public class EventCategory { public static final EventCategory ALERT_EVENT = new EventCategory("AlertEvent"); public static final EventCategory USAGE_EVENT = new EventCategory("UsageEvent"); public static final EventCategory RESOURCE_STATE_CHANGE_EVENT = new EventCategory("ResourceStateEvent"); + public static final EventCategory ASYNC_JOB_CHANGE_EVENT = new EventCategory("AsyncJobEvent"); } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/e7c8a35c/server/src/com/cloud/api/ApiServer.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/api/ApiServer.java b/server/src/com/cloud/api/ApiServer.java index 0cd1d61..86b4cdd 100755 --- a/server/src/com/cloud/api/ApiServer.java +++ b/server/src/com/cloud/api/ApiServer.java @@ -502,6 +502,7 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer } params.put("ctxStartEventId", String.valueOf(startEventId)); + params.put("cmdEventType", asyncCmd.getEventType().toString()); ctx.setAccountId(asyncCmd.getEntityOwnerId()); http://git-wip-us.apache.org/repos/asf/cloudstack/blob/e7c8a35c/server/src/com/cloud/async/AsyncJobManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index 0101a8a..602bd85 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.FileInputStream; import java.lang.reflect.Type; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -40,14 +41,22 @@ import org.apache.cloudstack.api.BaseAsyncCmd; import org.apache.cloudstack.api.ServerApiException; import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd; import org.apache.cloudstack.api.response.ExceptionResponse; +import org.apache.cloudstack.framework.events.EventBus; +import org.apache.cloudstack.framework.events.EventBusException; import org.apache.log4j.Logger; import org.apache.log4j.NDC; import org.springframework.stereotype.Component; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; import com.cloud.api.ApiDispatcher; +import com.cloud.api.ApiDBUtils; import com.cloud.api.ApiGsonHelper; import com.cloud.api.ApiSerializerHelper; import com.cloud.async.dao.AsyncJobDao; +import com.cloud.domain.dao.DomainDao; +import com.cloud.domain.Domain; +import com.cloud.domain.DomainVO; + import com.cloud.cluster.ClusterManager; import com.cloud.cluster.ClusterManagerListener; import com.cloud.cluster.ManagementServerHostVO; @@ -55,6 +64,9 @@ import com.cloud.configuration.Config; import com.cloud.configuration.dao.ConfigurationDao; import com.cloud.exception.InvalidParameterValueException; import com.cloud.exception.PermissionDeniedException; +import com.cloud.event.EventCategory; +import com.cloud.event.EventTypes; +import com.cloud.server.ManagementServer; import com.cloud.user.Account; import com.cloud.user.AccountManager; import com.cloud.user.User; @@ -93,6 +105,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, @Inject private AccountDao _accountDao; @Inject private AsyncJobDao _jobDao; @Inject private ConfigurationDao _configDao; + @Inject private DomainDao _domainDao; private long _jobExpireSeconds = 86400; // 1 day private long _jobCancelThresholdSeconds = 3600; // 1 hour (for cancelling the jobs blocking other jobs) @@ -122,6 +135,65 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, return _jobDao.findInstancePendingAsyncJobs(instanceType, accountId); } + private void publishOnEventBus(AsyncJobVO job, String jobEvent) { + EventBus eventBus = null; + try { + eventBus = ComponentContext.getComponent(EventBus.class); + } catch(NoSuchBeanDefinitionException nbe) { + return; // no provider is configured to provide events bus, so just return + } + + // Get the event type from the cmdInfo json string + String info = job.getCmdInfo(); + String cmdEventType; + if ( info == null ) { + cmdEventType = "unknown"; + } else { + String marker = "\"cmdEventType\""; + int begin = info.indexOf(marker); + cmdEventType = info.substring(begin + marker.length() + 2, info.indexOf(",", begin) - 1); + } + + // For some reason, the instanceType / instanceId are not abstract, which means we may get null values. + org.apache.cloudstack.framework.events.Event event = new org.apache.cloudstack.framework.events.Event( + ManagementServer.Name, + EventCategory.ASYNC_JOB_CHANGE_EVENT.getName(), + jobEvent, + ( job.getInstanceType() != null ? job.getInstanceType().toString() : "unknown" ), null); + + User userJobOwner = _accountMgr.getUserIncludingRemoved(job.getUserId()); + Account jobOwner = _accountMgr.getAccount(userJobOwner.getAccountId()); + + Map<String, String> eventDescription = new HashMap<String, String>(); + eventDescription.put("command", job.getCmd()); + eventDescription.put("user", userJobOwner.getUuid()); + eventDescription.put("account", jobOwner.getUuid()); + eventDescription.put("processStatus", "" + job.getProcessStatus()); + eventDescription.put("resultCode", "" + job.getResultCode()); + eventDescription.put("instanceUuid", ApiDBUtils.findJobInstanceUuid(job)); + eventDescription.put("instanceType", ( job.getInstanceType() != null ? job.getInstanceType().toString() : "unknown" ) ); + eventDescription.put("commandEventType", cmdEventType); + eventDescription.put("jobId", job.getUuid()); + + // If the event.accountinfo boolean value is set, get the human readable value for the username / domainname + Map<String, String> configs = _configDao.getConfiguration("management-server", new HashMap<String, String>()); + if ( Boolean.valueOf(configs.get("event.accountinfo")) ) { + DomainVO domain = _domainDao.findById(jobOwner.getDomainId()); + eventDescription.put("username", userJobOwner.getUsername()); + eventDescription.put("domainname", domain.getName()); + } + + event.setDescription(eventDescription); + + try { + eventBus.publish(event); + } catch (EventBusException evx) { + String errMsg = "Failed to publish async job event on the the event bus."; + s_logger.warn(errMsg, evx); + throw new CloudRuntimeException(errMsg); + } + } + @Override public long submitAsyncJob(AsyncJobVO job) { return submitAsyncJob(job, false); @@ -142,6 +214,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, if(s_logger.isDebugEnabled()) { s_logger.debug("submit async job-" + job.getId() + ", details: " + job.toString()); } + publishOnEventBus(job, "submit"); return job.getId(); } catch(Exception e) { txt.rollback(); @@ -176,6 +249,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, job.setStatus(jobStatus); job.setResultCode(resultCode); + publishOnEventBus(job, "complete"); // publish before the instance type and ID are wiped out + // reset attached object job.setInstanceType(null); job.setInstanceId(null); @@ -219,6 +294,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } job.setLastUpdated(DateUtil.currentGMTTime()); _jobDao.update(jobId, job); + publishOnEventBus(job, "update"); txt.commit(); } catch(Exception e) { s_logger.error("Unexpected exception while updating async job-" + jobId + " status: ", e);