[
https://issues.apache.org/jira/browse/NIFI-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16118897#comment-16118897
]
ASF GitHub Bot commented on NIFI-4224:
--------------------------------------
Github user mcgilman commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2051#discussion_r131998273
--- Diff:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
---
@@ -325,6 +441,859 @@ public Response updateProcessGroup(
);
}
+
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{groupId}/variable-registry/update-requests/{updateId}")
+ @ApiOperation(value = "Gets a process group's variable registry",
response = VariableRegistryUpdateRequestEntity.class, authorizations = {
+ @Authorization(value = "Read - /process-groups/{uuid}", type = "")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete
the request because it was invalid. The request should not be retried without
modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to
make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could
not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi
was not in the appropriate state to process it. Retrying the same request later
may be successful.")
+ })
+ public Response getVariableRegistryUpdateRequest(
+ @ApiParam(value = "The process group id.", required = true)
@PathParam("groupId") final String groupId,
+ @ApiParam(value = "The ID of the Variable Registry Update
Request", required = true) @PathParam("updateId") final String updateId) {
+
+ if (groupId == null || updateId == null) {
+ throw new IllegalArgumentException("Group ID and Update ID
must both be specified.");
+ }
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.GET);
+ }
+
+ // authorize access
+ serviceFacade.authorizeAccess(lookup -> {
+ final Authorizable processGroup =
lookup.getProcessGroup(groupId).getAuthorizable();
+ processGroup.authorize(authorizer, RequestAction.READ,
NiFiUserUtils.getNiFiUser());
+ });
+
+ final VariableRegistryUpdateRequest request =
varRegistryUpdateRequests.get(updateId);
+ if (request == null) {
+ throw new ResourceNotFoundException("Could not find a Variable
Registry Update Request with identifier " + updateId);
+ }
+
+ if (!groupId.equals(request.getProcessGroupId())) {
+ throw new ResourceNotFoundException("Could not find a Variable
Registry Update Request with identifier " + updateId + " for Process Group with
identifier " + groupId);
+ }
+
+ final VariableRegistryUpdateRequestEntity entity = new
VariableRegistryUpdateRequestEntity();
+ entity.setId(request.getRequestId());
+
entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request));
+ entity.setUri(generateResourceUri("process-groups", groupId,
"variable-registry", updateId));
+ return generateOkResponse(entity).build();
+ }
+
+
+ @DELETE
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{groupId}/variable-registry/update-requests/{updateId}")
+ @ApiOperation(value = "Deletes an update request for a process group's
variable registry. If the request is not yet complete, it will automatically be
cancelled.",
+ response = VariableRegistryUpdateRequestEntity.class,
authorizations = {
+ @Authorization(value = "Read - /process-groups/{uuid}", type =
"")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete
the request because it was invalid. The request should not be retried without
modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to
make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could
not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi
was not in the appropriate state to process it. Retrying the same request later
may be successful.")
+ })
+ public Response deleteVariableRegistryUpdateRequest(
+ @ApiParam(value = "The process group id.", required = true)
@PathParam("groupId") final String groupId,
+ @ApiParam(value = "The ID of the Variable Registry Update
Request", required = true) @PathParam("updateId") final String updateId) {
+
+ if (groupId == null || updateId == null) {
+ throw new IllegalArgumentException("Group ID and Update ID
must both be specified.");
+ }
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.DELETE);
+ }
+
+ // authorize access
+ serviceFacade.authorizeAccess(lookup -> {
+ final Authorizable processGroup =
lookup.getProcessGroup(groupId).getAuthorizable();
+ processGroup.authorize(authorizer, RequestAction.READ,
NiFiUserUtils.getNiFiUser());
+ });
+
+ final VariableRegistryUpdateRequest request =
varRegistryUpdateRequests.remove(updateId);
+ if (request == null) {
+ throw new ResourceNotFoundException("Could not find a Variable
Registry Update Request with identifier " + updateId);
+ }
+
+ if (!groupId.equals(request.getProcessGroupId())) {
+ throw new ResourceNotFoundException("Could not find a Variable
Registry Update Request with identifier " + updateId + " for Process Group with
identifier " + groupId);
+ }
+
+ request.cancel();
+
+ final VariableRegistryUpdateRequestEntity entity = new
VariableRegistryUpdateRequestEntity();
+ entity.setId(request.getRequestId());
+
entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request));
+ entity.setUri(generateResourceUri("process-groups", groupId,
"variable-registry", updateId));
+ return generateOkResponse(entity).build();
+ }
+
+
+ @PUT
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{id}/variable-registry")
+ @ApiOperation(value = "Updates the contents of a Process Group's
variable Registry", response = VariableRegistryEntity.class, authorizations = {
+ @Authorization(value = "Write - /process-groups/{uuid}", type = "")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete
the request because it was invalid. The request should not be retried without
modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to
make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could
not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi
was not in the appropriate state to process it. Retrying the same request later
may be successful.")
+ })
+ public Response updateVariableRegistry(
+ @Context final HttpServletRequest httpServletRequest,
+ @ApiParam(value = "The process group id.", required = true)
@PathParam("id") final String groupId,
+ @ApiParam(value = "The process group configuration details.",
required = true) final VariableRegistryEntity requestEntity) {
+
+ if (requestEntity == null || requestEntity.getVariableRegistry()
== null) {
+ throw new IllegalArgumentException("Variable Registry details
must be specified.");
+ }
+
+ if (requestEntity.getRevision() == null) {
+ throw new IllegalArgumentException("Revision must be
specified.");
+ }
+
+ // ensure the same id is being used
+ final VariableRegistryDTO registryDto =
requestEntity.getVariableRegistry();
+ if (!groupId.equals(registryDto.getProcessGroupId())) {
+ throw new IllegalArgumentException(String.format("The process
group id (%s) in the request body does "
+ + "not equal the process group id of the requested
resource (%s).", registryDto.getProcessGroupId(), groupId));
+ }
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.PUT, requestEntity);
+ }
+
+ // handle expects request (usually from the cluster manager)
+ final Revision requestRevision = getRevision(requestEntity,
groupId);
+ return withWriteLock(
+ serviceFacade,
+ requestEntity,
+ requestRevision,
+ lookup -> {
+ Authorizable authorizable =
lookup.getProcessGroup(groupId).getAuthorizable();
+ authorizable.authorize(authorizer, RequestAction.WRITE,
NiFiUserUtils.getNiFiUser());
+ },
+ null,
+ (revision, processGroupEntity) -> {
+ // update the process group
+ final VariableRegistryEntity entity =
serviceFacade.updateVariableRegistry(revision, registryDto);
+ populateRemainingVariableRegistryEntityContent(entity);
+
+ return generateOkResponse(entity).build();
+ });
+ }
+
+
+ /**
+ * Updates the variable registry for the specified process group.
+ *
+ * @param httpServletRequest request
+ * @param groupId The id of the process group.
+ * @param requestEntity the Variable Registry Entity
+ * @return A Variable Registry Entry.
+ */
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{id}/variable-registry/update-requests")
+ @ApiOperation(value = "Submits a request to update a process group's
variable registry", response = VariableRegistryUpdateRequestEntity.class,
authorizations = {
+ @Authorization(value = "Write - /process-groups/{uuid}", type = "")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete
the request because it was invalid. The request should not be retried without
modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to
make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could
not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi
was not in the appropriate state to process it. Retrying the same request later
may be successful.")
+ })
+ public Response submitUpdateVariableRegistryRequest(
+ @Context final HttpServletRequest httpServletRequest,
+ @ApiParam(value = "The process group id.", required = true)
@PathParam("id") final String groupId,
+ @ApiParam(value = "The process group configuration details.",
required = true) final VariableRegistryEntity requestEntity) {
+
+ if (requestEntity == null || requestEntity.getVariableRegistry()
== null) {
+ throw new IllegalArgumentException("Variable Registry details
must be specified.");
+ }
+
+ if (requestEntity.getRevision() == null) {
+ throw new IllegalArgumentException("Revision must be
specified.");
+ }
+
+ // In order to update variables in a variable registry, we have to
perform the following steps:
+ // 1. Determine Affected Components (this includes any Processors
and Controller Services and any components that reference an affected
Controller Service).
+ // 1a. Determine ID's of components
+ // 1b. Determine Revision's of associated components
+ // 2. Stop All Affected Processors
+ // 3. Disable All Affected Controller Services
+ // 4. Update the Variables
+ // 5. Re-Enable all Affected Controller Services (services only,
not dependent components)
+ // 6. Re-Enable all Processors that Depended on the Controller
Services
+
+ // Determine the affected components (and their associated
revisions)
+ final VariableRegistryEntity computedEntity =
serviceFacade.populateAffectedComponents(requestEntity.getVariableRegistry());
+ final VariableRegistryDTO computedRegistryDto =
computedEntity.getVariableRegistry();
+ if (computedRegistryDto == null) {
+ throw new ResourceNotFoundException(String.format("Unable to
locate group with id '%s'.", groupId));
+ }
+
+ final Set<AffectedComponentDTO> affectedComponents =
serviceFacade.identifyComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry());
+
+ final Map<String, List<AffectedComponentDTO>>
affectedComponentsByType = affectedComponents.stream()
+ .collect(Collectors.groupingBy(comp ->
comp.getComponentType()));
+
+ final List<AffectedComponentDTO> affectedProcessors =
affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
+ final List<AffectedComponentDTO> affectedServices =
affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
+
+
+ if (isReplicateRequest()) {
+ // update the variable registry
+ final VariableRegistryUpdateRequest updateRequest =
createVariableRegistryUpdateRequest(groupId);
+
updateRequest.getIdentifyRelevantComponentsStep().setComplete(true);
+ final URI originalUri = getAbsolutePath();
+
+ // Submit the task to be run in the background
+ final Runnable taskWrapper = () -> {
+ try {
+ updateVariableRegistryReplicated(groupId, originalUri,
affectedProcessors, affectedServices, updateRequest, requestEntity);
+ } catch (final Exception e) {
+ logger.error("Failed to update variable registry", e);
+ updateRequest.setFailureReason("An unexpected error
has occurred: " + e);
+ }
+ };
+
+ variableRegistryThreadPool.submit(taskWrapper);
+
+ final VariableRegistryUpdateRequestEntity responseEntity = new
VariableRegistryUpdateRequestEntity();
+
responseEntity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest));
+
responseEntity.getRequestDto().setUri(generateResourceUri("process-groups",
groupId, "variable-registry", "update-requests", updateRequest.getRequestId()));
+
+ final URI location =
URI.create(responseEntity.getRequestDto().getUri());
+ return
Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build();
+ }
+
+
+ final Revision requestRevision =
getRevision(requestEntity.getRevision(), requestEntity.getId());
+ return withWriteLock(
+ serviceFacade,
+ requestEntity,
+ requestRevision,
+ lookup -> {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ final Authorizable groupAuthorizable =
lookup.getProcessGroup(groupId).getAuthorizable();
+ groupAuthorizable.authorize(authorizer,
RequestAction.WRITE, user);
+
+ // For every component that is affected, the user must
have READ permissions and WRITE permissions
+ // (because this action requires stopping the component).
+ if (affectedProcessors != null) {
+ for (final AffectedComponentDTO affectedComponent :
affectedProcessors) {
+ final Authorizable authorizable =
lookup.getProcessor(affectedComponent.getComponentId()).getAuthorizable();
+
+ if (!authorizable.isAuthorized(authorizer,
RequestAction.READ, user)) {
+ throw new AccessDeniedException("User does not
have Read permissions to Processor with ID " +
affectedComponent.getComponentId());
+ }
+ if (!authorizable.isAuthorized(authorizer,
RequestAction.WRITE, user)) {
+ throw new AccessDeniedException("User does not
have Write permissions to Processor with ID " +
affectedComponent.getComponentId());
+ }
+ }
+ }
+
+ if (affectedServices != null) {
+ for (final AffectedComponentDTO affectedComponent :
affectedServices) {
+ final Authorizable authorizable =
lookup.getControllerService(affectedComponent.getComponentId()).getAuthorizable();
+
+ if (!authorizable.isAuthorized(authorizer,
RequestAction.READ, user)) {
+ throw new AccessDeniedException("User does not
have Read permissions to Controller Service with ID " +
affectedComponent.getComponentId());
+ }
+ if (!authorizable.isAuthorized(authorizer,
RequestAction.WRITE, user)) {
+ throw new AccessDeniedException("User does not
have Write permissions to Controller Service with ID " +
affectedComponent.getComponentId());
+ }
+ }
+ }
+ },
+ null,
+ (revision, varRegistryEntity) -> {
+ return updateVariableRegistryLocal(groupId,
affectedProcessors, affectedServices, requestEntity);
+ });
+ }
+
+ private Pause createPause(final VariableRegistryUpdateRequest
updateRequest) {
+ return new Pause() {
+ @Override
+ public boolean pause() {
+ if (updateRequest.isComplete()) {
+ return false;
+ }
+
+ try {
+ Thread.sleep(500);
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+
+ return !updateRequest.isComplete();
+ }
+ };
+ }
+
+ private void updateVariableRegistryReplicated(final String groupId,
final URI originalUri, final Collection<AffectedComponentDTO>
affectedProcessors,
+ final Collection<AffectedComponentDTO> affectedServices,
+ final VariableRegistryUpdateRequest updateRequest, final
VariableRegistryEntity requestEntity) {
+
+ final NiFiProperties properties = getProperties();
+ final Client jerseyClient = WebUtils.createClient(new
DefaultClientConfig(), SslContextFactory.createSslContext(properties));
+ final int connectionTimeout = (int)
FormatUtils.getTimeDuration(properties.getClusterNodeConnectionTimeout(),
TimeUnit.MILLISECONDS);
+ final int readTimeout = (int)
FormatUtils.getTimeDuration(properties.getClusterNodeReadTimeout(),
TimeUnit.MILLISECONDS);
+
jerseyClient.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT,
connectionTimeout);
+
jerseyClient.getProperties().put(ClientConfig.PROPERTY_READ_TIMEOUT,
readTimeout);
+
jerseyClient.getProperties().put(ClientConfig.PROPERTY_FOLLOW_REDIRECTS,
Boolean.TRUE);
+
+ final Pause pause = createPause(updateRequest);
+
+ // stop processors
+ if (affectedProcessors != null) {
+ logger.info("In order to update Variable Registry for Process
Group with ID {}, "
+ + "replicating request to stop {} affected processors",
groupId, affectedProcessors.size());
+
+ scheduleProcessors(groupId, originalUri, jerseyClient,
updateRequest, pause,
+ affectedProcessors, ScheduledState.STOPPED,
updateRequest.getStopProcessorsStep());
+ }
+
+ // disable controller services
+ if (affectedServices != null) {
+ logger.info("In order to update Variable Registry for Process
Group with ID {}, "
+ + "replicating request to stop {} affected Controller
Services", groupId, affectedServices.size());
+
+ activateControllerServices(groupId, originalUri, jerseyClient,
updateRequest, pause,
+ affectedServices, ControllerServiceState.DISABLED,
updateRequest.getDisableServicesStep());
+ }
+
+ // apply updates
+ logger.info("In order to update Variable Registry for Process
Group with ID {}, "
+ + "replicating request to apply updates to variable registry",
groupId);
+ applyVariableRegistryUpdate(groupId, originalUri, jerseyClient,
updateRequest, requestEntity);
+
+ // re-enable controller services
+ if (affectedServices != null) {
+ logger.info("In order to update Variable Registry for Process
Group with ID {}, "
+ + "replicating request to re-enable {} affected services",
groupId, affectedServices.size());
+
+ activateControllerServices(groupId, originalUri, jerseyClient,
updateRequest, pause,
+ affectedServices, ControllerServiceState.ENABLED,
updateRequest.getEnableServicesStep());
+ }
+
+ // restart processors
+ if (affectedProcessors != null) {
+ logger.info("In order to update Variable Registry for Process
Group with ID {}, "
+ + "replicating request to restart {} affected processors",
groupId, affectedProcessors.size());
+
+ scheduleProcessors(groupId, originalUri, jerseyClient,
updateRequest, pause,
+ affectedProcessors, ScheduledState.RUNNING,
updateRequest.getStartProcessorsStep());
+ }
+
+ updateRequest.setComplete(true);
+ }
+
+ /**
+ * Periodically polls the process group with the given ID, waiting for
all processors whose ID's are given to have the given Scheduled State.
+ *
+ * @param client the Jersey Client to use for making the request
+ * @param groupId the ID of the Process Group to poll
+ * @param processorIds the ID of all Processors whose state should be
equal to the given desired state
+ * @param desiredState the desired state for all processors with the
ID's given
+ * @param pause the Pause that can be used to wait between polling
+ * @return <code>true</code> if successful, <code>false</code> if
unable to wait for processors to reach the desired state
+ */
+ private boolean waitForProcessorStatus(final Client client, final URI
originalUri, final String groupId, final Set<String> processorIds, final
ScheduledState desiredState, final Pause pause) {
+ URI groupUri;
+ try {
+ groupUri = new URI(originalUri.getScheme(),
originalUri.getUserInfo(), originalUri.getHost(),
+ originalUri.getPort(), "/nifi-api/flow/process-groups/" +
groupId + "/status", "recursive=true", originalUri.getFragment());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+
+ boolean continuePolling = true;
+ while (continuePolling) {
+ final ClientResponse response =
client.resource(groupUri).header("Content-Type",
"application/json").get(ClientResponse.class);
+ if (response.getStatus() != Status.OK.getStatusCode()) {
+ return false;
+ }
+
+ final ProcessGroupStatusEntity statusEntity =
response.getEntity(ProcessGroupStatusEntity.class);
+ final ProcessGroupStatusDTO statusDto =
statusEntity.getProcessGroupStatus();
+ final ProcessGroupStatusSnapshotDTO statusSnapshotDto =
statusDto.getAggregateSnapshot();
+
+ if (isProcessorStatusEqual(statusSnapshotDto, processorIds,
desiredState)) {
+ logger.debug("All {} processors of interest now have the
desired state of {}", processorIds.size(), desiredState);
+ return true;
+ }
+
+ // Not all of the processors are in the desired state. Pause
for a bit and poll again.
+ continuePolling = pause.pause();
+ }
+
+ return false;
+ }
+
+ /**
+ * Periodically polls the process group with the given ID, waiting for
all processors whose ID's are given to have the given Scheduled State.
+ *
+ * @param groupId the ID of the Process Group to poll
+ * @param processorIds the ID of all Processors whose state should be
equal to the given desired state
+ * @param desiredState the desired state for all processors with the
ID's given
+ * @param pause the Pause that can be used to wait between polling
+ * @return <code>true</code> if successful, <code>false</code> if
unable to wait for processors to reach the desired state
+ */
+ private boolean waitForLocalProcessorStatus(final String groupId,
final Set<String> processorIds, final ScheduledState desiredState, final Pause
pause) {
+ boolean continuePolling = true;
+ while (continuePolling) {
+ final ProcessGroupStatusEntity statusEntity =
serviceFacade.getProcessGroupStatus(groupId, true);
+ final ProcessGroupStatusDTO statusDto =
statusEntity.getProcessGroupStatus();
+ final ProcessGroupStatusSnapshotDTO statusSnapshotDto =
statusDto.getAggregateSnapshot();
+
+ if (isProcessorStatusEqual(statusSnapshotDto, processorIds,
desiredState)) {
+ logger.debug("All {} processors of interest now have the
desired state of {}", processorIds.size(), desiredState);
+ return true;
+ }
+
+ // Not all of the processors are in the desired state. Pause
for a bit and poll again.
+ continuePolling = pause.pause();
+ }
+
+ return false;
+ }
+
+ private boolean isProcessorStatusEqual(final
ProcessGroupStatusSnapshotDTO statusSnapshot, final Set<String> processorIds,
final ScheduledState desiredState) {
+ final String desiredStateName = desiredState.name();
+
+ final boolean allProcessorsMatch =
statusSnapshot.getProcessorStatusSnapshots().stream()
+ .map(entity -> entity.getProcessorStatusSnapshot())
+ .filter(status -> processorIds.contains(status.getId()))
+ .allMatch(status -> {
+ final String runStatus = status.getRunStatus();
+ final boolean stateMatches =
desiredStateName.equalsIgnoreCase(runStatus);
+ if (!stateMatches) {
+ return false;
+ }
+
+ if (desiredState == ScheduledState.STOPPED &&
status.getActiveThreadCount() != 0) {
+ return false;
+ }
+
+ return true;
+ });
+
+ if (!allProcessorsMatch) {
+ return false;
+ }
+
+ for (final ProcessGroupStatusSnapshotEntity childGroupEntity :
statusSnapshot.getProcessGroupStatusSnapshots()) {
+ final ProcessGroupStatusSnapshotDTO childGroupStatus =
childGroupEntity.getProcessGroupStatusSnapshot();
+ final boolean allMatchChildLevel =
isProcessorStatusEqual(childGroupStatus, processorIds, desiredState);
+ if (!allMatchChildLevel) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+
+
+ /**
+ * Periodically polls the process group with the given ID, waiting for
all controller services whose ID's are given to have the given Controller
Service State.
+ *
+ * @param client the Jersey Client to use for making the HTTP Request
+ * @param groupId the ID of the Process Group to poll
+ * @param serviceIds the ID of all Controller Services whose state
should be equal to the given desired state
+ * @param desiredState the desired state for all services with the
ID's given
+ * @param pause the Pause that can be used to wait between polling
+ * @return <code>true</code> if successful, <code>false</code> if
unable to wait for services to reach the desired state
+ */
+ private boolean waitForControllerServiceStatus(final Client client,
final URI originalUri, final String groupId, final Set<String> serviceIds,
final ControllerServiceState desiredState,
+ final Pause pause) {
+ URI groupUri;
+ try {
+ groupUri = new URI(originalUri.getScheme(),
originalUri.getUserInfo(), originalUri.getHost(),
+ originalUri.getPort(), "/nifi-api/flow/process-groups/" +
groupId + "/controller-services",
"includeAncestorGroups=false,includeDescendantGroups=true",
originalUri.getFragment());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+
+ boolean continuePolling = true;
+ while (continuePolling) {
+ final ClientResponse response =
client.resource(groupUri).header("Content-Type",
"application/json").get(ClientResponse.class);
+ if (response.getStatus() != Status.OK.getStatusCode()) {
+ return false;
+ }
+
+ final ControllerServicesEntity controllerServicesEntity =
response.getEntity(ControllerServicesEntity.class);
+ final Set<ControllerServiceEntity> serviceEntities =
controllerServicesEntity.getControllerServices();
+
+ final String desiredStateName = desiredState.name();
+ final boolean allServicesMatch = serviceEntities.stream()
+ .map(entity -> entity.getComponent())
+ .filter(service -> serviceIds.contains(service.getId()))
+ .map(service -> service.getState())
+ .allMatch(state -> state.equals(desiredStateName));
+
+ if (allServicesMatch) {
+ logger.debug("All {} controller services of interest now
have the desired state of {}", serviceIds.size(), desiredState);
+ return true;
+ }
+
+ // Not all of the processors are in the desired state. Pause
for a bit and poll again.
+ continuePolling = pause.pause();
+ }
+
+ return false;
+ }
+
+
+ /**
+ * Periodically polls the process group with the given ID, waiting for
all controller services whose ID's are given to have the given Controller
Service State.
+ *
+ * @param groupId the ID of the Process Group to poll
+ * @param serviceIds the ID of all Controller Services whose state
should be equal to the given desired state
+ * @param desiredState the desired state for all services with the
ID's given
+ * @param pause the Pause that can be used to wait between polling
+ * @param user the user that is retrieving the controller services
+ * @return <code>true</code> if successful, <code>false</code> if
unable to wait for services to reach the desired state
+ */
+ private boolean waitForLocalControllerServiceStatus(final String
groupId, final Set<String> serviceIds, final ControllerServiceState
desiredState, final Pause pause, final NiFiUser user) {
+ boolean continuePolling = true;
+ while (continuePolling) {
+ final Set<ControllerServiceEntity> serviceEntities =
serviceFacade.getControllerServices(groupId, false, true, user);
+
+ final String desiredStateName = desiredState.name();
+ final boolean allServicesMatch = serviceEntities.stream()
+ .map(entity -> entity.getComponent())
+ .filter(service -> serviceIds.contains(service.getId()))
+ .map(service -> service.getState())
+ .allMatch(state -> desiredStateName.equals(state));
+
+ if (allServicesMatch) {
+ logger.debug("All {} controller services of interest now
have the desired state of {}", serviceIds.size(), desiredState);
+ return true;
+ }
+
+ // Not all of the processors are in the desired state. Pause
for a bit and poll again.
+ continuePolling = pause.pause();
+ }
+
+ return false;
+ }
+
+ private VariableRegistryUpdateRequest
createVariableRegistryUpdateRequest(final String groupId) {
+ final VariableRegistryUpdateRequest updateRequest = new
VariableRegistryUpdateRequest(UUID.randomUUID().toString(), groupId);
+
+ // before adding to the request map, purge any old requests. Must
do this by creating a List of ID's
+ // and then removing those ID's one-at-a-time in order to avoid
ConcurrentModificationException.
+ final Date oneMinuteAgo = new Date(System.currentTimeMillis() -
VARIABLE_REGISTRY_UPDATE_REQUEST_EXPIRATION);
+ final List<String> completedRequestIds =
varRegistryUpdateRequests.entrySet().stream()
+ .filter(entry -> entry.getValue().isComplete())
+ .filter(entry ->
entry.getValue().getLastUpdated().before(oneMinuteAgo))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+
+ completedRequestIds.stream().forEach(id ->
varRegistryUpdateRequests.remove(id));
+
+ final int requestCount = varRegistryUpdateRequests.size();
+ if (requestCount > MAX_VARIABLE_REGISTRY_UPDATE_REQUESTS) {
+ throw new IllegalStateException("There are already " +
requestCount + " update requests for variable registries. "
+ + "Cannot issue any more requests until the older ones are
deleted or expire");
+ }
+
+ this.varRegistryUpdateRequests.put(updateRequest.getRequestId(),
updateRequest);
+ return updateRequest;
+ }
+
+ private Response updateVariableRegistryLocal(final String groupId,
final List<AffectedComponentDTO> affectedProcessors, final
List<AffectedComponentDTO> affectedServices,
+ final VariableRegistryEntity requestEntity) {
+
+ final Set<String> affectedProcessorIds = affectedProcessors ==
null ? Collections.emptySet() : affectedProcessors.stream()
+ .map(component -> component.getComponentId())
+ .collect(Collectors.toSet());
+ Map<String, Revision> processorRevisionMap = getRevisions(groupId,
affectedProcessorIds);
+
+ final Set<String> affectedServiceIds = affectedServices == null ?
Collections.emptySet() : affectedServices.stream()
+ .map(component -> component.getComponentId())
+ .collect(Collectors.toSet());
+ Map<String, Revision> serviceRevisionMap = getRevisions(groupId,
affectedServiceIds);
+
+ // update the variable registry
+ final VariableRegistryUpdateRequest updateRequest =
createVariableRegistryUpdateRequest(groupId);
+
updateRequest.getIdentifyRelevantComponentsStep().setComplete(true);
+ final Pause pause = createPause(updateRequest);
+
+ final Revision requestRevision =
getRevision(requestEntity.getRevision(), requestEntity.getId());
+
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ final Runnable updateTask = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // Stop processors
+ performUpdateVariableRegistryStep(groupId,
updateRequest, updateRequest.getStopProcessorsStep(), "Stopping Processors",
+ () -> stopProcessors(user, updateRequest, groupId,
processorRevisionMap, pause));
+
+ // Update revision map because this will have modified
the revisions of our components.
+ final Map<String, Revision>
updatedProcessorRevisionMap = getRevisions(groupId, affectedProcessorIds);
+
+ // Disable controller services
+ performUpdateVariableRegistryStep(groupId,
updateRequest, updateRequest.getDisableServicesStep(), "Disabling Controller
Services",
+ () -> disableControllerServices(user,
updateRequest, groupId, serviceRevisionMap, pause));
+
+ // Update revision map because this will have modified
the revisions of our components.
+ final Map<String, Revision> updatedServiceRevisionMap
= getRevisions(groupId, affectedServiceIds);
+
+ // Apply the updates
+ performUpdateVariableRegistryStep(groupId,
updateRequest, updateRequest.getApplyUpdatesStep(), "Applying updates to
Variable Registry",
+ () -> serviceFacade.updateVariableRegistry(user,
requestRevision, requestEntity.getVariableRegistry()));
+
+ // Re-enable the controller services
+ performUpdateVariableRegistryStep(groupId,
updateRequest, updateRequest.getEnableServicesStep(), "Re-enabling Controller
Services",
+ () -> enableControllerServices(user, groupId,
updatedServiceRevisionMap, pause));
+
+ // Restart processors
+ performUpdateVariableRegistryStep(groupId,
updateRequest, updateRequest.getStartProcessorsStep(), "Restarting Processors",
+ () -> startProcessors(user, groupId,
updatedProcessorRevisionMap, pause));
+
+ // Set complete
+ updateRequest.setComplete(true);
+ updateRequest.setLastUpdated(new Date());
+ } catch (final Exception e) {
+ logger.error("Failed to update Variable Registry for
Proces Group with ID " + groupId, e);
+ updateRequest.setFailureReason("An unexpected error
has occurred: " + e);
+ }
+ }
+ };
+
+ // Submit the task to be run in the background
+ variableRegistryThreadPool.submit(updateTask);
+
+ final VariableRegistryUpdateRequestEntity responseEntity = new
VariableRegistryUpdateRequestEntity();
--- End diff --
Can we set the permissions on this entity for consistency with all of the
other endpoints?
> Add Variable Registry at Process Group level
> --------------------------------------------
>
> Key: NIFI-4224
> URL: https://issues.apache.org/jira/browse/NIFI-4224
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Core Framework
> Reporter: Mark Payne
> Assignee: Mark Payne
>
> Currently, NiFi exposes a variable registry that is configurable by adding
> the name of a properties file to nifi.properties and then treating the
> referenced properties file as key/value pairs for the variable registry.
> This, however, is very limiting, as it provides a global scope for all
> variables, and it requires a restart of NiFi in order to pick up any updates
> to the file. We should expose a Process Group-level Variable Registry.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)