pvillard31 commented on code in PR #11111:
URL: https://github.com/apache/nifi/pull/11111#discussion_r3045115246
##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java:
##########
@@ -1195,19 +1195,37 @@ private void synchronizeProcessors(final ProcessGroup
group, final VersionedProc
final ProcessGroup topLevelGroup)
throws ProcessorInstantiationException {
- for (final VersionedProcessor proposedProcessor :
proposed.getProcessors()) {
- final ProcessorNode processor =
processorsByVersionedId.get(proposedProcessor.getIdentifier());
- if (processor == null) {
- final ProcessorNode added = addProcessor(group,
proposedProcessor, context.getComponentIdGenerator(), topLevelGroup);
- LOG.info("Added {} to {}", added, group);
- } else if
(updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
- updateProcessor(processor, proposedProcessor, topLevelGroup);
- // Any existing component that is modified during
synchronization may have its properties reverted to a pre-migration state,
- // so we then add it to the set to allow migrateProperties to
be called again to get it back to the migrated state
- createdAndModifiedExtensions.add(new
CreatedOrModifiedExtension(processor, getPropertyValues(processor)));
- LOG.info("Updated {}", processor);
- } else {
- processor.setPosition(new
Position(proposedProcessor.getPosition().getX(),
proposedProcessor.getPosition().getY()));
+ final Set<ProcessorNode> stoppedProcessors = new HashSet<>();
+
+ try {
+ for (final VersionedProcessor proposedProcessor :
proposed.getProcessors()) {
+ final ProcessorNode processor =
processorsByVersionedId.get(proposedProcessor.getIdentifier());
+ if (processor == null) {
+ final ProcessorNode added = addProcessor(group,
proposedProcessor, context.getComponentIdGenerator(), topLevelGroup);
+ LOG.info("Added {} to {}", added, group);
+ } else if
(updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
+ final long processorStopDeadline =
System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis();
+ try {
+ final boolean stopped = stopOrTerminate(processor,
processorStopDeadline, syncOptions);
+ if (stopped) {
+ stoppedProcessors.add(processor);
+ }
+ } catch (final TimeoutException |
FlowSynchronizationException e) {
+ throw new
ProcessorInstantiationException(processor.getIdentifier(), e);
+ }
+ updateProcessor(processor, proposedProcessor,
topLevelGroup);
+ // Any existing component that is modified during
synchronization may have its properties reverted to a pre-migration state,
+ // so we then add it to the set to allow migrateProperties
to be called again to get it back to the migrated state
+ createdAndModifiedExtensions.add(new
CreatedOrModifiedExtension(processor, getPropertyValues(processor)));
+ LOG.info("Updated {}", processor);
+ } else {
+ processor.setPosition(new
Position(proposedProcessor.getPosition().getX(),
proposedProcessor.getPosition().getY()));
+ }
+ }
+ } finally {
+ for (final ProcessorNode processor : stoppedProcessors) {
+ processor.getProcessGroup().startProcessor(processor, false);
+ notifyScheduledStateChange((ComponentNode) processor,
syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING);
}
Review Comment:
Shouldn't we do it only if `proposedProcessor.getScheduledState() ==
RUNNING`?
If a processor was running but the proposed state is DISABLED, the sequence
becomes:
1. stopOrTerminate stops it (added to stoppedProcessors)
2. updateProcessor transitions it to DISABLED via transitionComponentState
3. finally block calls startProcessor, restarting a processor that should be
disabled
I believe the single-processor synchronize(ProcessorNode, ...) method
handles this correctly by only restarting when the proposed state is RUNNING.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]