pvillard31 commented on code in PR #11111:
URL: https://github.com/apache/nifi/pull/11111#discussion_r3045115246


##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java:
##########
@@ -1195,19 +1195,37 @@ private void synchronizeProcessors(final ProcessGroup 
group, final VersionedProc
                                        final ProcessGroup topLevelGroup)
                 throws ProcessorInstantiationException {
 
-        for (final VersionedProcessor proposedProcessor : 
proposed.getProcessors()) {
-            final ProcessorNode processor = 
processorsByVersionedId.get(proposedProcessor.getIdentifier());
-            if (processor == null) {
-                final ProcessorNode added = addProcessor(group, 
proposedProcessor, context.getComponentIdGenerator(), topLevelGroup);
-                LOG.info("Added {} to {}", added, group);
-            } else if 
(updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
-                updateProcessor(processor, proposedProcessor, topLevelGroup);
-                // Any existing component that is modified during 
synchronization may have its properties reverted to a pre-migration state,
-                // so we then add it to the set to allow migrateProperties to 
be called again to get it back to the migrated state
-                createdAndModifiedExtensions.add(new 
CreatedOrModifiedExtension(processor, getPropertyValues(processor)));
-                LOG.info("Updated {}", processor);
-            } else {
-                processor.setPosition(new 
Position(proposedProcessor.getPosition().getX(), 
proposedProcessor.getPosition().getY()));
+        final Set<ProcessorNode> stoppedProcessors = new HashSet<>();
+
+        try {
+            for (final VersionedProcessor proposedProcessor : 
proposed.getProcessors()) {
+                final ProcessorNode processor = 
processorsByVersionedId.get(proposedProcessor.getIdentifier());
+                if (processor == null) {
+                    final ProcessorNode added = addProcessor(group, 
proposedProcessor, context.getComponentIdGenerator(), topLevelGroup);
+                    LOG.info("Added {} to {}", added, group);
+                } else if 
(updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
+                    final long processorStopDeadline = 
System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis();
+                    try {
+                        final boolean stopped = stopOrTerminate(processor, 
processorStopDeadline, syncOptions);
+                        if (stopped) {
+                            stoppedProcessors.add(processor);
+                        }
+                    } catch (final TimeoutException | 
FlowSynchronizationException e) {
+                        throw new 
ProcessorInstantiationException(processor.getIdentifier(), e);
+                    }
+                    updateProcessor(processor, proposedProcessor, 
topLevelGroup);
+                    // Any existing component that is modified during 
synchronization may have its properties reverted to a pre-migration state,
+                    // so we then add it to the set to allow migrateProperties 
to be called again to get it back to the migrated state
+                    createdAndModifiedExtensions.add(new 
CreatedOrModifiedExtension(processor, getPropertyValues(processor)));
+                    LOG.info("Updated {}", processor);
+                } else {
+                    processor.setPosition(new 
Position(proposedProcessor.getPosition().getX(), 
proposedProcessor.getPosition().getY()));
+                }
+            }
+        } finally {
+            for (final ProcessorNode processor : stoppedProcessors) {
+                processor.getProcessGroup().startProcessor(processor, false);
+                notifyScheduledStateChange((ComponentNode) processor, 
syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING);
             }

Review Comment:
   Shouldn't we do it only if `proposedProcessor.getScheduledState() == 
RUNNING`?
   
   If a processor was running but the proposed state is DISABLED, the sequence 
becomes:
   1. stopOrTerminate stops it (added to stoppedProcessors)
   2. updateProcessor transitions it to DISABLED via transitionComponentState
   3. finally block calls startProcessor, restarting a processor that should be 
disabled
   
   I believe the single-processor synchronize(ProcessorNode, ...) method 
handles this correctly by only restarting when the proposed state is RUNNING.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to