[
https://issues.apache.org/jira/browse/NIFI-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16118903#comment-16118903
]
ASF GitHub Bot commented on NIFI-4224:
--------------------------------------
Github user mcgilman commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2051#discussion_r132005935
--- Diff:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
---
@@ -325,6 +441,859 @@ public Response updateProcessGroup(
);
}
+
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{groupId}/variable-registry/update-requests/{updateId}")
+ @ApiOperation(value = "Gets a process group's variable registry",
response = VariableRegistryUpdateRequestEntity.class, authorizations = {
+ @Authorization(value = "Read - /process-groups/{uuid}", type = "")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete
the request because it was invalid. The request should not be retried without
modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to
make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could
not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi
was not in the appropriate state to process it. Retrying the same request later
may be successful.")
+ })
+ public Response getVariableRegistryUpdateRequest(
+ @ApiParam(value = "The process group id.", required = true)
@PathParam("groupId") final String groupId,
+ @ApiParam(value = "The ID of the Variable Registry Update
Request", required = true) @PathParam("updateId") final String updateId) {
+
+ if (groupId == null || updateId == null) {
+ throw new IllegalArgumentException("Group ID and Update ID
must both be specified.");
+ }
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.GET);
+ }
+
+ // authorize access
+ serviceFacade.authorizeAccess(lookup -> {
+ final Authorizable processGroup =
lookup.getProcessGroup(groupId).getAuthorizable();
+ processGroup.authorize(authorizer, RequestAction.READ,
NiFiUserUtils.getNiFiUser());
+ });
+
+ final VariableRegistryUpdateRequest request =
varRegistryUpdateRequests.get(updateId);
+ if (request == null) {
+ throw new ResourceNotFoundException("Could not find a Variable
Registry Update Request with identifier " + updateId);
+ }
+
+ if (!groupId.equals(request.getProcessGroupId())) {
+ throw new ResourceNotFoundException("Could not find a Variable
Registry Update Request with identifier " + updateId + " for Process Group with
identifier " + groupId);
+ }
+
+ final VariableRegistryUpdateRequestEntity entity = new
VariableRegistryUpdateRequestEntity();
+ entity.setId(request.getRequestId());
+
entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request));
+ entity.setUri(generateResourceUri("process-groups", groupId,
"variable-registry", updateId));
+ return generateOkResponse(entity).build();
+ }
+
+
+ @DELETE
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{groupId}/variable-registry/update-requests/{updateId}")
+ @ApiOperation(value = "Deletes an update request for a process group's
variable registry. If the request is not yet complete, it will automatically be
cancelled.",
+ response = VariableRegistryUpdateRequestEntity.class,
authorizations = {
+ @Authorization(value = "Read - /process-groups/{uuid}", type =
"")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete
the request because it was invalid. The request should not be retried without
modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to
make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could
not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi
was not in the appropriate state to process it. Retrying the same request later
may be successful.")
+ })
+ public Response deleteVariableRegistryUpdateRequest(
+ @ApiParam(value = "The process group id.", required = true)
@PathParam("groupId") final String groupId,
+ @ApiParam(value = "The ID of the Variable Registry Update
Request", required = true) @PathParam("updateId") final String updateId) {
+
+ if (groupId == null || updateId == null) {
+ throw new IllegalArgumentException("Group ID and Update ID
must both be specified.");
+ }
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.DELETE);
+ }
+
+ // authorize access
+ serviceFacade.authorizeAccess(lookup -> {
+ final Authorizable processGroup =
lookup.getProcessGroup(groupId).getAuthorizable();
+ processGroup.authorize(authorizer, RequestAction.READ,
NiFiUserUtils.getNiFiUser());
+ });
+
+ final VariableRegistryUpdateRequest request =
varRegistryUpdateRequests.remove(updateId);
+ if (request == null) {
+ throw new ResourceNotFoundException("Could not find a Variable
Registry Update Request with identifier " + updateId);
+ }
+
+ if (!groupId.equals(request.getProcessGroupId())) {
+ throw new ResourceNotFoundException("Could not find a Variable
Registry Update Request with identifier " + updateId + " for Process Group with
identifier " + groupId);
+ }
+
+ request.cancel();
+
+ final VariableRegistryUpdateRequestEntity entity = new
VariableRegistryUpdateRequestEntity();
+ entity.setId(request.getRequestId());
+
entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request));
+ entity.setUri(generateResourceUri("process-groups", groupId,
"variable-registry", updateId));
+ return generateOkResponse(entity).build();
+ }
+
+
+ @PUT
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{id}/variable-registry")
+ @ApiOperation(value = "Updates the contents of a Process Group's
variable Registry", response = VariableRegistryEntity.class, authorizations = {
+ @Authorization(value = "Write - /process-groups/{uuid}", type = "")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete
the request because it was invalid. The request should not be retried without
modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to
make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could
not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi
was not in the appropriate state to process it. Retrying the same request later
may be successful.")
+ })
+ public Response updateVariableRegistry(
+ @Context final HttpServletRequest httpServletRequest,
+ @ApiParam(value = "The process group id.", required = true)
@PathParam("id") final String groupId,
+ @ApiParam(value = "The process group configuration details.",
required = true) final VariableRegistryEntity requestEntity) {
+
+ if (requestEntity == null || requestEntity.getVariableRegistry()
== null) {
+ throw new IllegalArgumentException("Variable Registry details
must be specified.");
+ }
+
+ if (requestEntity.getRevision() == null) {
+ throw new IllegalArgumentException("Revision must be
specified.");
+ }
+
+ // ensure the same id is being used
+ final VariableRegistryDTO registryDto =
requestEntity.getVariableRegistry();
+ if (!groupId.equals(registryDto.getProcessGroupId())) {
+ throw new IllegalArgumentException(String.format("The process
group id (%s) in the request body does "
+ + "not equal the process group id of the requested
resource (%s).", registryDto.getProcessGroupId(), groupId));
+ }
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.PUT, requestEntity);
+ }
+
+ // handle expects request (usually from the cluster manager)
+ final Revision requestRevision = getRevision(requestEntity,
groupId);
+ return withWriteLock(
+ serviceFacade,
+ requestEntity,
+ requestRevision,
+ lookup -> {
+ Authorizable authorizable =
lookup.getProcessGroup(groupId).getAuthorizable();
+ authorizable.authorize(authorizer, RequestAction.WRITE,
NiFiUserUtils.getNiFiUser());
+ },
+ null,
+ (revision, processGroupEntity) -> {
+ // update the process group
+ final VariableRegistryEntity entity =
serviceFacade.updateVariableRegistry(revision, registryDto);
+ populateRemainingVariableRegistryEntityContent(entity);
+
+ return generateOkResponse(entity).build();
+ });
+ }
+
+
+ /**
+ * Updates the variable registry for the specified process group.
+ *
+ * @param httpServletRequest request
+ * @param groupId The id of the process group.
+ * @param requestEntity the Variable Registry Entity
+ * @return A Variable Registry Entry.
+ */
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{id}/variable-registry/update-requests")
+ @ApiOperation(value = "Submits a request to update a process group's
variable registry", response = VariableRegistryUpdateRequestEntity.class,
authorizations = {
+ @Authorization(value = "Write - /process-groups/{uuid}", type = "")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete
the request because it was invalid. The request should not be retried without
modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to
make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could
not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi
was not in the appropriate state to process it. Retrying the same request later
may be successful.")
+ })
+ public Response submitUpdateVariableRegistryRequest(
+ @Context final HttpServletRequest httpServletRequest,
+ @ApiParam(value = "The process group id.", required = true)
@PathParam("id") final String groupId,
+ @ApiParam(value = "The process group configuration details.",
required = true) final VariableRegistryEntity requestEntity) {
+
+ if (requestEntity == null || requestEntity.getVariableRegistry()
== null) {
+ throw new IllegalArgumentException("Variable Registry details
must be specified.");
+ }
+
+ if (requestEntity.getRevision() == null) {
+ throw new IllegalArgumentException("Revision must be
specified.");
+ }
+
+ // In order to update variables in a variable registry, we have to
perform the following steps:
+ // 1. Determine Affected Components (this includes any Processors
and Controller Services and any components that reference an affected
Controller Service).
+ // 1a. Determine ID's of components
+ // 1b. Determine Revision's of associated components
+ // 2. Stop All Affected Processors
+ // 3. Disable All Affected Controller Services
+ // 4. Update the Variables
+ // 5. Re-Enable all Affected Controller Services (services only,
not dependent components)
+ // 6. Re-Enable all Processors that Depended on the Controller
Services
+
+ // Determine the affected components (and their associated
revisions)
+ final VariableRegistryEntity computedEntity =
serviceFacade.populateAffectedComponents(requestEntity.getVariableRegistry());
+ final VariableRegistryDTO computedRegistryDto =
computedEntity.getVariableRegistry();
+ if (computedRegistryDto == null) {
+ throw new ResourceNotFoundException(String.format("Unable to
locate group with id '%s'.", groupId));
+ }
+
+ final Set<AffectedComponentDTO> affectedComponents =
serviceFacade.identifyComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry());
+
+ final Map<String, List<AffectedComponentDTO>>
affectedComponentsByType = affectedComponents.stream()
+ .collect(Collectors.groupingBy(comp ->
comp.getComponentType()));
+
+ final List<AffectedComponentDTO> affectedProcessors =
affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
+ final List<AffectedComponentDTO> affectedServices =
affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
+
+
+ if (isReplicateRequest()) {
+ // update the variable registry
+ final VariableRegistryUpdateRequest updateRequest =
createVariableRegistryUpdateRequest(groupId);
+
updateRequest.getIdentifyRelevantComponentsStep().setComplete(true);
+ final URI originalUri = getAbsolutePath();
+
+ // Submit the task to be run in the background
+ final Runnable taskWrapper = () -> {
+ try {
+ updateVariableRegistryReplicated(groupId, originalUri,
affectedProcessors, affectedServices, updateRequest, requestEntity);
+ } catch (final Exception e) {
+ logger.error("Failed to update variable registry", e);
+ updateRequest.setFailureReason("An unexpected error
has occurred: " + e);
+ }
+ };
+
+ variableRegistryThreadPool.submit(taskWrapper);
+
+ final VariableRegistryUpdateRequestEntity responseEntity = new
VariableRegistryUpdateRequestEntity();
+
responseEntity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest));
+
responseEntity.getRequestDto().setUri(generateResourceUri("process-groups",
groupId, "variable-registry", "update-requests", updateRequest.getRequestId()));
+
+ final URI location =
URI.create(responseEntity.getRequestDto().getUri());
+ return
Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build();
+ }
+
+
+ final Revision requestRevision =
getRevision(requestEntity.getRevision(), requestEntity.getId());
+ return withWriteLock(
+ serviceFacade,
+ requestEntity,
+ requestRevision,
+ lookup -> {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ final Authorizable groupAuthorizable =
lookup.getProcessGroup(groupId).getAuthorizable();
+ groupAuthorizable.authorize(authorizer,
RequestAction.WRITE, user);
+
+ // For every component that is affected, the user must
have READ permissions and WRITE permissions
+ // (because this action requires stopping the component).
+ if (affectedProcessors != null) {
+ for (final AffectedComponentDTO affectedComponent :
affectedProcessors) {
+ final Authorizable authorizable =
lookup.getProcessor(affectedComponent.getComponentId()).getAuthorizable();
+
+ if (!authorizable.isAuthorized(authorizer,
RequestAction.READ, user)) {
--- End diff --
Should use `authorize(...)` here and below. `authorize(...)` will indicate
that the user is attempting to access/modify a component. isAuthorize(...) is
meant to simply check if the user is allowed but does not indicate an access
attempt. In this context, the user is actually modifying, albeit indirectly,
the component.
> Add Variable Registry at Process Group level
> --------------------------------------------
>
> Key: NIFI-4224
> URL: https://issues.apache.org/jira/browse/NIFI-4224
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Core Framework
> Reporter: Mark Payne
> Assignee: Mark Payne
>
> Currently, NiFi exposes a variable registry that is configurable by adding
> the name of a properties file to nifi.properties and then treating the
> referenced properties file as key/value pairs for the variable registry.
> This, however, is very limiting, as it provides a global scope for all
> variables, and it requires a restart of NiFi in order to pick up any updates
> to the file. We should expose a Process Group-level Variable Registry.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)