Github user mcgilman commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2051#discussion_r131998273
--- Diff:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
---
@@ -325,6 +441,859 @@ public Response updateProcessGroup(
);
}
+
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{groupId}/variable-registry/update-requests/{updateId}")
+ @ApiOperation(value = "Gets a process group's variable registry",
response = VariableRegistryUpdateRequestEntity.class, authorizations = {
+ @Authorization(value = "Read - /process-groups/{uuid}", type = "")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete
the request because it was invalid. The request should not be retried without
modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to
make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could
not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi
was not in the appropriate state to process it. Retrying the same request later
may be successful.")
+ })
+ public Response getVariableRegistryUpdateRequest(
+ @ApiParam(value = "The process group id.", required = true)
@PathParam("groupId") final String groupId,
+ @ApiParam(value = "The ID of the Variable Registry Update
Request", required = true) @PathParam("updateId") final String updateId) {
+
+ if (groupId == null || updateId == null) {
+ throw new IllegalArgumentException("Group ID and Update ID
must both be specified.");
+ }
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.GET);
+ }
+
+ // authorize access
+ serviceFacade.authorizeAccess(lookup -> {
+ final Authorizable processGroup =
lookup.getProcessGroup(groupId).getAuthorizable();
+ processGroup.authorize(authorizer, RequestAction.READ,
NiFiUserUtils.getNiFiUser());
+ });
+
+ final VariableRegistryUpdateRequest request =
varRegistryUpdateRequests.get(updateId);
+ if (request == null) {
+ throw new ResourceNotFoundException("Could not find a Variable
Registry Update Request with identifier " + updateId);
+ }
+
+ if (!groupId.equals(request.getProcessGroupId())) {
+ throw new ResourceNotFoundException("Could not find a Variable
Registry Update Request with identifier " + updateId + " for Process Group with
identifier " + groupId);
+ }
+
+ final VariableRegistryUpdateRequestEntity entity = new
VariableRegistryUpdateRequestEntity();
+ entity.setId(request.getRequestId());
+
entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request));
+ entity.setUri(generateResourceUri("process-groups", groupId,
"variable-registry", updateId));
+ return generateOkResponse(entity).build();
+ }
+
+
+ @DELETE
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{groupId}/variable-registry/update-requests/{updateId}")
+ @ApiOperation(value = "Deletes an update request for a process group's
variable registry. If the request is not yet complete, it will automatically be
cancelled.",
+ response = VariableRegistryUpdateRequestEntity.class,
authorizations = {
+ @Authorization(value = "Read - /process-groups/{uuid}", type =
"")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete
the request because it was invalid. The request should not be retried without
modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to
make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could
not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi
was not in the appropriate state to process it. Retrying the same request later
may be successful.")
+ })
+ public Response deleteVariableRegistryUpdateRequest(
+ @ApiParam(value = "The process group id.", required = true)
@PathParam("groupId") final String groupId,
+ @ApiParam(value = "The ID of the Variable Registry Update
Request", required = true) @PathParam("updateId") final String updateId) {
+
+ if (groupId == null || updateId == null) {
+ throw new IllegalArgumentException("Group ID and Update ID
must both be specified.");
+ }
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.DELETE);
+ }
+
+ // authorize access
+ serviceFacade.authorizeAccess(lookup -> {
+ final Authorizable processGroup =
lookup.getProcessGroup(groupId).getAuthorizable();
+ processGroup.authorize(authorizer, RequestAction.READ,
NiFiUserUtils.getNiFiUser());
+ });
+
+ final VariableRegistryUpdateRequest request =
varRegistryUpdateRequests.remove(updateId);
+ if (request == null) {
+ throw new ResourceNotFoundException("Could not find a Variable
Registry Update Request with identifier " + updateId);
+ }
+
+ if (!groupId.equals(request.getProcessGroupId())) {
+ throw new ResourceNotFoundException("Could not find a Variable
Registry Update Request with identifier " + updateId + " for Process Group with
identifier " + groupId);
+ }
+
+ request.cancel();
+
+ final VariableRegistryUpdateRequestEntity entity = new
VariableRegistryUpdateRequestEntity();
+ entity.setId(request.getRequestId());
+
entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request));
+ entity.setUri(generateResourceUri("process-groups", groupId,
"variable-registry", updateId));
+ return generateOkResponse(entity).build();
+ }
+
+
+ @PUT
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{id}/variable-registry")
+ @ApiOperation(value = "Updates the contents of a Process Group's
variable Registry", response = VariableRegistryEntity.class, authorizations = {
+ @Authorization(value = "Write - /process-groups/{uuid}", type = "")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete
the request because it was invalid. The request should not be retried without
modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to
make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could
not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi
was not in the appropriate state to process it. Retrying the same request later
may be successful.")
+ })
+ public Response updateVariableRegistry(
+ @Context final HttpServletRequest httpServletRequest,
+ @ApiParam(value = "The process group id.", required = true)
@PathParam("id") final String groupId,
+ @ApiParam(value = "The process group configuration details.",
required = true) final VariableRegistryEntity requestEntity) {
+
+ if (requestEntity == null || requestEntity.getVariableRegistry()
== null) {
+ throw new IllegalArgumentException("Variable Registry details
must be specified.");
+ }
+
+ if (requestEntity.getRevision() == null) {
+ throw new IllegalArgumentException("Revision must be
specified.");
+ }
+
+ // ensure the same id is being used
+ final VariableRegistryDTO registryDto =
requestEntity.getVariableRegistry();
+ if (!groupId.equals(registryDto.getProcessGroupId())) {
+ throw new IllegalArgumentException(String.format("The process
group id (%s) in the request body does "
+ + "not equal the process group id of the requested
resource (%s).", registryDto.getProcessGroupId(), groupId));
+ }
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.PUT, requestEntity);
+ }
+
+ // handle expects request (usually from the cluster manager)
+ final Revision requestRevision = getRevision(requestEntity,
groupId);
+ return withWriteLock(
+ serviceFacade,
+ requestEntity,
+ requestRevision,
+ lookup -> {
+ Authorizable authorizable =
lookup.getProcessGroup(groupId).getAuthorizable();
+ authorizable.authorize(authorizer, RequestAction.WRITE,
NiFiUserUtils.getNiFiUser());
+ },
+ null,
+ (revision, processGroupEntity) -> {
+ // update the process group
+ final VariableRegistryEntity entity =
serviceFacade.updateVariableRegistry(revision, registryDto);
+ populateRemainingVariableRegistryEntityContent(entity);
+
+ return generateOkResponse(entity).build();
+ });
+ }
+
+
+ /**
+ * Updates the variable registry for the specified process group.
+ *
+ * @param httpServletRequest request
+ * @param groupId The id of the process group.
+ * @param requestEntity the Variable Registry Entity
+ * @return A Variable Registry Entry.
+ */
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{id}/variable-registry/update-requests")
+ @ApiOperation(value = "Submits a request to update a process group's
variable registry", response = VariableRegistryUpdateRequestEntity.class,
authorizations = {
+ @Authorization(value = "Write - /process-groups/{uuid}", type = "")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete
the request because it was invalid. The request should not be retried without
modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to
make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could
not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi
was not in the appropriate state to process it. Retrying the same request later
may be successful.")
+ })
+ public Response submitUpdateVariableRegistryRequest(
+ @Context final HttpServletRequest httpServletRequest,
+ @ApiParam(value = "The process group id.", required = true)
@PathParam("id") final String groupId,
+ @ApiParam(value = "The process group configuration details.",
required = true) final VariableRegistryEntity requestEntity) {
+
+ if (requestEntity == null || requestEntity.getVariableRegistry()
== null) {
+ throw new IllegalArgumentException("Variable Registry details
must be specified.");
+ }
+
+ if (requestEntity.getRevision() == null) {
+ throw new IllegalArgumentException("Revision must be
specified.");
+ }
+
+ // In order to update variables in a variable registry, we have to
perform the following steps:
+ // 1. Determine Affected Components (this includes any Processors
and Controller Services and any components that reference an affected
Controller Service).
+ // 1a. Determine ID's of components
+ // 1b. Determine Revision's of associated components
+ // 2. Stop All Affected Processors
+ // 3. Disable All Affected Controller Services
+ // 4. Update the Variables
+ // 5. Re-Enable all Affected Controller Services (services only,
not dependent components)
+ // 6. Re-Enable all Processors that Depended on the Controller
Services
+
+ // Determine the affected components (and their associated
revisions)
+ final VariableRegistryEntity computedEntity =
serviceFacade.populateAffectedComponents(requestEntity.getVariableRegistry());
+ final VariableRegistryDTO computedRegistryDto =
computedEntity.getVariableRegistry();
+ if (computedRegistryDto == null) {
+ throw new ResourceNotFoundException(String.format("Unable to
locate group with id '%s'.", groupId));
+ }
+
+ final Set<AffectedComponentDTO> affectedComponents =
serviceFacade.identifyComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry());
+
+ final Map<String, List<AffectedComponentDTO>>
affectedComponentsByType = affectedComponents.stream()
+ .collect(Collectors.groupingBy(comp ->
comp.getComponentType()));
+
+ final List<AffectedComponentDTO> affectedProcessors =
affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
+ final List<AffectedComponentDTO> affectedServices =
affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
+
+
+ if (isReplicateRequest()) {
+ // update the variable registry
+ final VariableRegistryUpdateRequest updateRequest =
createVariableRegistryUpdateRequest(groupId);
+
updateRequest.getIdentifyRelevantComponentsStep().setComplete(true);
+ final URI originalUri = getAbsolutePath();
+
+ // Submit the task to be run in the background
+ final Runnable taskWrapper = () -> {
+ try {
+ updateVariableRegistryReplicated(groupId, originalUri,
affectedProcessors, affectedServices, updateRequest, requestEntity);
+ } catch (final Exception e) {
+ logger.error("Failed to update variable registry", e);
+ updateRequest.setFailureReason("An unexpected error
has occurred: " + e);
+ }
+ };
+
+ variableRegistryThreadPool.submit(taskWrapper);
+
+ final VariableRegistryUpdateRequestEntity responseEntity = new
VariableRegistryUpdateRequestEntity();
+
responseEntity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest));
+
responseEntity.getRequestDto().setUri(generateResourceUri("process-groups",
groupId, "variable-registry", "update-requests", updateRequest.getRequestId()));
+
+ final URI location =
URI.create(responseEntity.getRequestDto().getUri());
+ return
Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build();
+ }
+
+
+ final Revision requestRevision =
getRevision(requestEntity.getRevision(), requestEntity.getId());
+ return withWriteLock(
+ serviceFacade,
+ requestEntity,
+ requestRevision,
+ lookup -> {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ final Authorizable groupAuthorizable =
lookup.getProcessGroup(groupId).getAuthorizable();
+ groupAuthorizable.authorize(authorizer,
RequestAction.WRITE, user);
+
+ // For every component that is affected, the user must
have READ permissions and WRITE permissions
+ // (because this action requires stopping the component).
+ if (affectedProcessors != null) {
+ for (final AffectedComponentDTO affectedComponent :
affectedProcessors) {
+ final Authorizable authorizable =
lookup.getProcessor(affectedComponent.getComponentId()).getAuthorizable();
+
+ if (!authorizable.isAuthorized(authorizer,
RequestAction.READ, user)) {
+ throw new AccessDeniedException("User does not
have Read permissions to Processor with ID " +
affectedComponent.getComponentId());
+ }
+ if (!authorizable.isAuthorized(authorizer,
RequestAction.WRITE, user)) {
+ throw new AccessDeniedException("User does not
have Write permissions to Processor with ID " +
affectedComponent.getComponentId());
+ }
+ }
+ }
+
+ if (affectedServices != null) {
+ for (final AffectedComponentDTO affectedComponent :
affectedServices) {
+ final Authorizable authorizable =
lookup.getControllerService(affectedComponent.getComponentId()).getAuthorizable();
+
+ if (!authorizable.isAuthorized(authorizer,
RequestAction.READ, user)) {
+ throw new AccessDeniedException("User does not
have Read permissions to Controller Service with ID " +
affectedComponent.getComponentId());
+ }
+ if (!authorizable.isAuthorized(authorizer,
RequestAction.WRITE, user)) {
+ throw new AccessDeniedException("User does not
have Write permissions to Controller Service with ID " +
affectedComponent.getComponentId());
+ }
+ }
+ }
+ },
+ null,
+ (revision, varRegistryEntity) -> {
+ return updateVariableRegistryLocal(groupId,
affectedProcessors, affectedServices, requestEntity);
+ });
+ }
+
+ private Pause createPause(final VariableRegistryUpdateRequest
updateRequest) {
+ return new Pause() {
+ @Override
+ public boolean pause() {
+ if (updateRequest.isComplete()) {
+ return false;
+ }
+
+ try {
+ Thread.sleep(500);
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+
+ return !updateRequest.isComplete();
+ }
+ };
+ }
+
+ private void updateVariableRegistryReplicated(final String groupId,
final URI originalUri, final Collection<AffectedComponentDTO>
affectedProcessors,
+ final Collection<AffectedComponentDTO> affectedServices,
+ final VariableRegistryUpdateRequest updateRequest, final
VariableRegistryEntity requestEntity) {
+
+ final NiFiProperties properties = getProperties();
+ final Client jerseyClient = WebUtils.createClient(new
DefaultClientConfig(), SslContextFactory.createSslContext(properties));
+ final int connectionTimeout = (int)
FormatUtils.getTimeDuration(properties.getClusterNodeConnectionTimeout(),
TimeUnit.MILLISECONDS);
+ final int readTimeout = (int)
FormatUtils.getTimeDuration(properties.getClusterNodeReadTimeout(),
TimeUnit.MILLISECONDS);
+
jerseyClient.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT,
connectionTimeout);
+
jerseyClient.getProperties().put(ClientConfig.PROPERTY_READ_TIMEOUT,
readTimeout);
+
jerseyClient.getProperties().put(ClientConfig.PROPERTY_FOLLOW_REDIRECTS,
Boolean.TRUE);
+
+ final Pause pause = createPause(updateRequest);
+
+ // stop processors
+ if (affectedProcessors != null) {
+ logger.info("In order to update Variable Registry for Process
Group with ID {}, "
+ + "replicating request to stop {} affected processors",
groupId, affectedProcessors.size());
+
+ scheduleProcessors(groupId, originalUri, jerseyClient,
updateRequest, pause,
+ affectedProcessors, ScheduledState.STOPPED,
updateRequest.getStopProcessorsStep());
+ }
+
+ // disable controller services
+ if (affectedServices != null) {
+ logger.info("In order to update Variable Registry for Process
Group with ID {}, "
+ + "replicating request to stop {} affected Controller
Services", groupId, affectedServices.size());
+
+ activateControllerServices(groupId, originalUri, jerseyClient,
updateRequest, pause,
+ affectedServices, ControllerServiceState.DISABLED,
updateRequest.getDisableServicesStep());
+ }
+
+ // apply updates
+ logger.info("In order to update Variable Registry for Process
Group with ID {}, "
+ + "replicating request to apply updates to variable registry",
groupId);
+ applyVariableRegistryUpdate(groupId, originalUri, jerseyClient,
updateRequest, requestEntity);
+
+ // re-enable controller services
+ if (affectedServices != null) {
+ logger.info("In order to update Variable Registry for Process
Group with ID {}, "
+ + "replicating request to re-enable {} affected services",
groupId, affectedServices.size());
+
+ activateControllerServices(groupId, originalUri, jerseyClient,
updateRequest, pause,
+ affectedServices, ControllerServiceState.ENABLED,
updateRequest.getEnableServicesStep());
+ }
+
+ // restart processors
+ if (affectedProcessors != null) {
+ logger.info("In order to update Variable Registry for Process
Group with ID {}, "
+ + "replicating request to restart {} affected processors",
groupId, affectedProcessors.size());
+
+ scheduleProcessors(groupId, originalUri, jerseyClient,
updateRequest, pause,
+ affectedProcessors, ScheduledState.RUNNING,
updateRequest.getStartProcessorsStep());
+ }
+
+ updateRequest.setComplete(true);
+ }
+
+ /**
+ * Periodically polls the process group with the given ID, waiting for
all processors whose ID's are given to have the given Scheduled State.
+ *
+ * @param client the Jersey Client to use for making the request
+ * @param groupId the ID of the Process Group to poll
+ * @param processorIds the ID of all Processors whose state should be
equal to the given desired state
+ * @param desiredState the desired state for all processors with the
ID's given
+ * @param pause the Pause that can be used to wait between polling
+ * @return <code>true</code> if successful, <code>false</code> if
unable to wait for processors to reach the desired state
+ */
+ private boolean waitForProcessorStatus(final Client client, final URI
originalUri, final String groupId, final Set<String> processorIds, final
ScheduledState desiredState, final Pause pause) {
+ URI groupUri;
+ try {
+ groupUri = new URI(originalUri.getScheme(),
originalUri.getUserInfo(), originalUri.getHost(),
+ originalUri.getPort(), "/nifi-api/flow/process-groups/" +
groupId + "/status", "recursive=true", originalUri.getFragment());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+
+ boolean continuePolling = true;
+ while (continuePolling) {
+ final ClientResponse response =
client.resource(groupUri).header("Content-Type",
"application/json").get(ClientResponse.class);
+ if (response.getStatus() != Status.OK.getStatusCode()) {
+ return false;
+ }
+
+ final ProcessGroupStatusEntity statusEntity =
response.getEntity(ProcessGroupStatusEntity.class);
+ final ProcessGroupStatusDTO statusDto =
statusEntity.getProcessGroupStatus();
+ final ProcessGroupStatusSnapshotDTO statusSnapshotDto =
statusDto.getAggregateSnapshot();
+
+ if (isProcessorStatusEqual(statusSnapshotDto, processorIds,
desiredState)) {
+ logger.debug("All {} processors of interest now have the
desired state of {}", processorIds.size(), desiredState);
+ return true;
+ }
+
+ // Not all of the processors are in the desired state. Pause
for a bit and poll again.
+ continuePolling = pause.pause();
+ }
+
+ return false;
+ }
+
+ /**
+ * Periodically polls the process group with the given ID, waiting for
all processors whose ID's are given to have the given Scheduled State.
+ *
+ * @param groupId the ID of the Process Group to poll
+ * @param processorIds the ID of all Processors whose state should be
equal to the given desired state
+ * @param desiredState the desired state for all processors with the
ID's given
+ * @param pause the Pause that can be used to wait between polling
+ * @return <code>true</code> if successful, <code>false</code> if
unable to wait for processors to reach the desired state
+ */
+ private boolean waitForLocalProcessorStatus(final String groupId,
final Set<String> processorIds, final ScheduledState desiredState, final Pause
pause) {
+ boolean continuePolling = true;
+ while (continuePolling) {
+ final ProcessGroupStatusEntity statusEntity =
serviceFacade.getProcessGroupStatus(groupId, true);
+ final ProcessGroupStatusDTO statusDto =
statusEntity.getProcessGroupStatus();
+ final ProcessGroupStatusSnapshotDTO statusSnapshotDto =
statusDto.getAggregateSnapshot();
+
+ if (isProcessorStatusEqual(statusSnapshotDto, processorIds,
desiredState)) {
+ logger.debug("All {} processors of interest now have the
desired state of {}", processorIds.size(), desiredState);
+ return true;
+ }
+
+ // Not all of the processors are in the desired state. Pause
for a bit and poll again.
+ continuePolling = pause.pause();
+ }
+
+ return false;
+ }
+
+ private boolean isProcessorStatusEqual(final
ProcessGroupStatusSnapshotDTO statusSnapshot, final Set<String> processorIds,
final ScheduledState desiredState) {
+ final String desiredStateName = desiredState.name();
+
+ final boolean allProcessorsMatch =
statusSnapshot.getProcessorStatusSnapshots().stream()
+ .map(entity -> entity.getProcessorStatusSnapshot())
+ .filter(status -> processorIds.contains(status.getId()))
+ .allMatch(status -> {
+ final String runStatus = status.getRunStatus();
+ final boolean stateMatches =
desiredStateName.equalsIgnoreCase(runStatus);
+ if (!stateMatches) {
+ return false;
+ }
+
+ if (desiredState == ScheduledState.STOPPED &&
status.getActiveThreadCount() != 0) {
+ return false;
+ }
+
+ return true;
+ });
+
+ if (!allProcessorsMatch) {
+ return false;
+ }
+
+ for (final ProcessGroupStatusSnapshotEntity childGroupEntity :
statusSnapshot.getProcessGroupStatusSnapshots()) {
+ final ProcessGroupStatusSnapshotDTO childGroupStatus =
childGroupEntity.getProcessGroupStatusSnapshot();
+ final boolean allMatchChildLevel =
isProcessorStatusEqual(childGroupStatus, processorIds, desiredState);
+ if (!allMatchChildLevel) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+
+
+ /**
+ * Periodically polls the process group with the given ID, waiting for
all controller services whose ID's are given to have the given Controller
Service State.
+ *
+ * @param client the Jersey Client to use for making the HTTP Request
+ * @param groupId the ID of the Process Group to poll
+ * @param serviceIds the ID of all Controller Services whose state
should be equal to the given desired state
+ * @param desiredState the desired state for all services with the
ID's given
+ * @param pause the Pause that can be used to wait between polling
+ * @return <code>true</code> if successful, <code>false</code> if
unable to wait for services to reach the desired state
+ */
+ private boolean waitForControllerServiceStatus(final Client client,
final URI originalUri, final String groupId, final Set<String> serviceIds,
final ControllerServiceState desiredState,
+ final Pause pause) {
+ URI groupUri;
+ try {
+ groupUri = new URI(originalUri.getScheme(),
originalUri.getUserInfo(), originalUri.getHost(),
+ originalUri.getPort(), "/nifi-api/flow/process-groups/" +
groupId + "/controller-services",
"includeAncestorGroups=false,includeDescendantGroups=true",
originalUri.getFragment());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+
+ boolean continuePolling = true;
+ while (continuePolling) {
+ final ClientResponse response =
client.resource(groupUri).header("Content-Type",
"application/json").get(ClientResponse.class);
+ if (response.getStatus() != Status.OK.getStatusCode()) {
+ return false;
+ }
+
+ final ControllerServicesEntity controllerServicesEntity =
response.getEntity(ControllerServicesEntity.class);
+ final Set<ControllerServiceEntity> serviceEntities =
controllerServicesEntity.getControllerServices();
+
+ final String desiredStateName = desiredState.name();
+ final boolean allServicesMatch = serviceEntities.stream()
+ .map(entity -> entity.getComponent())
+ .filter(service -> serviceIds.contains(service.getId()))
+ .map(service -> service.getState())
+ .allMatch(state -> state.equals(desiredStateName));
+
+ if (allServicesMatch) {
+ logger.debug("All {} controller services of interest now
have the desired state of {}", serviceIds.size(), desiredState);
+ return true;
+ }
+
+ // Not all of the processors are in the desired state. Pause
for a bit and poll again.
+ continuePolling = pause.pause();
+ }
+
+ return false;
+ }
+
+
+ /**
+ * Periodically polls the process group with the given ID, waiting for
all controller services whose ID's are given to have the given Controller
Service State.
+ *
+ * @param groupId the ID of the Process Group to poll
+ * @param serviceIds the ID of all Controller Services whose state
should be equal to the given desired state
+ * @param desiredState the desired state for all services with the
ID's given
+ * @param pause the Pause that can be used to wait between polling
+ * @param user the user that is retrieving the controller services
+ * @return <code>true</code> if successful, <code>false</code> if
unable to wait for services to reach the desired state
+ */
+ private boolean waitForLocalControllerServiceStatus(final String
groupId, final Set<String> serviceIds, final ControllerServiceState
desiredState, final Pause pause, final NiFiUser user) {
+ boolean continuePolling = true;
+ while (continuePolling) {
+ final Set<ControllerServiceEntity> serviceEntities =
serviceFacade.getControllerServices(groupId, false, true, user);
+
+ final String desiredStateName = desiredState.name();
+ final boolean allServicesMatch = serviceEntities.stream()
+ .map(entity -> entity.getComponent())
+ .filter(service -> serviceIds.contains(service.getId()))
+ .map(service -> service.getState())
+ .allMatch(state -> desiredStateName.equals(state));
+
+ if (allServicesMatch) {
+ logger.debug("All {} controller services of interest now
have the desired state of {}", serviceIds.size(), desiredState);
+ return true;
+ }
+
+ // Not all of the processors are in the desired state. Pause
for a bit and poll again.
+ continuePolling = pause.pause();
+ }
+
+ return false;
+ }
+
+ private VariableRegistryUpdateRequest
createVariableRegistryUpdateRequest(final String groupId) {
+ final VariableRegistryUpdateRequest updateRequest = new
VariableRegistryUpdateRequest(UUID.randomUUID().toString(), groupId);
+
+ // before adding to the request map, purge any old requests. Must
do this by creating a List of ID's
+ // and then removing those ID's one-at-a-time in order to avoid
ConcurrentModificationException.
+ final Date oneMinuteAgo = new Date(System.currentTimeMillis() -
VARIABLE_REGISTRY_UPDATE_REQUEST_EXPIRATION);
+ final List<String> completedRequestIds =
varRegistryUpdateRequests.entrySet().stream()
+ .filter(entry -> entry.getValue().isComplete())
+ .filter(entry ->
entry.getValue().getLastUpdated().before(oneMinuteAgo))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+
+ completedRequestIds.stream().forEach(id ->
varRegistryUpdateRequests.remove(id));
+
+ final int requestCount = varRegistryUpdateRequests.size();
+ if (requestCount > MAX_VARIABLE_REGISTRY_UPDATE_REQUESTS) {
+ throw new IllegalStateException("There are already " +
requestCount + " update requests for variable registries. "
+ + "Cannot issue any more requests until the older ones are
deleted or expire");
+ }
+
+ this.varRegistryUpdateRequests.put(updateRequest.getRequestId(),
updateRequest);
+ return updateRequest;
+ }
+
+ private Response updateVariableRegistryLocal(final String groupId,
final List<AffectedComponentDTO> affectedProcessors, final
List<AffectedComponentDTO> affectedServices,
+ final VariableRegistryEntity requestEntity) {
+
+ final Set<String> affectedProcessorIds = affectedProcessors ==
null ? Collections.emptySet() : affectedProcessors.stream()
+ .map(component -> component.getComponentId())
+ .collect(Collectors.toSet());
+ Map<String, Revision> processorRevisionMap = getRevisions(groupId,
affectedProcessorIds);
+
+ final Set<String> affectedServiceIds = affectedServices == null ?
Collections.emptySet() : affectedServices.stream()
+ .map(component -> component.getComponentId())
+ .collect(Collectors.toSet());
+ Map<String, Revision> serviceRevisionMap = getRevisions(groupId,
affectedServiceIds);
+
+ // update the variable registry
+ final VariableRegistryUpdateRequest updateRequest =
createVariableRegistryUpdateRequest(groupId);
+
updateRequest.getIdentifyRelevantComponentsStep().setComplete(true);
+ final Pause pause = createPause(updateRequest);
+
+ final Revision requestRevision =
getRevision(requestEntity.getRevision(), requestEntity.getId());
+
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ final Runnable updateTask = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // Stop processors
+ performUpdateVariableRegistryStep(groupId,
updateRequest, updateRequest.getStopProcessorsStep(), "Stopping Processors",
+ () -> stopProcessors(user, updateRequest, groupId,
processorRevisionMap, pause));
+
+ // Update revision map because this will have modified
the revisions of our components.
+ final Map<String, Revision>
updatedProcessorRevisionMap = getRevisions(groupId, affectedProcessorIds);
+
+ // Disable controller services
+ performUpdateVariableRegistryStep(groupId,
updateRequest, updateRequest.getDisableServicesStep(), "Disabling Controller
Services",
+ () -> disableControllerServices(user,
updateRequest, groupId, serviceRevisionMap, pause));
+
+ // Update revision map because this will have modified
the revisions of our components.
+ final Map<String, Revision> updatedServiceRevisionMap
= getRevisions(groupId, affectedServiceIds);
+
+ // Apply the updates
+ performUpdateVariableRegistryStep(groupId,
updateRequest, updateRequest.getApplyUpdatesStep(), "Applying updates to
Variable Registry",
+ () -> serviceFacade.updateVariableRegistry(user,
requestRevision, requestEntity.getVariableRegistry()));
+
+ // Re-enable the controller services
+ performUpdateVariableRegistryStep(groupId,
updateRequest, updateRequest.getEnableServicesStep(), "Re-enabling Controller
Services",
+ () -> enableControllerServices(user, groupId,
updatedServiceRevisionMap, pause));
+
+ // Restart processors
+ performUpdateVariableRegistryStep(groupId,
updateRequest, updateRequest.getStartProcessorsStep(), "Restarting Processors",
+ () -> startProcessors(user, groupId,
updatedProcessorRevisionMap, pause));
+
+ // Set complete
+ updateRequest.setComplete(true);
+ updateRequest.setLastUpdated(new Date());
+ } catch (final Exception e) {
+ logger.error("Failed to update Variable Registry for
Proces Group with ID " + groupId, e);
+ updateRequest.setFailureReason("An unexpected error
has occurred: " + e);
+ }
+ }
+ };
+
+ // Submit the task to be run in the background
+ variableRegistryThreadPool.submit(updateTask);
+
+ final VariableRegistryUpdateRequestEntity responseEntity = new
VariableRegistryUpdateRequestEntity();
--- End diff --
Can we set the permissions on this entity for consistency with all of the
other endpoints?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---