kevdoran commented on code in PR #11111:
URL: https://github.com/apache/nifi/pull/11111#discussion_r3045236271
##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java:
##########
@@ -1195,19 +1195,37 @@ private void synchronizeProcessors(final ProcessGroup
group, final VersionedProc
final ProcessGroup topLevelGroup)
throws ProcessorInstantiationException {
- for (final VersionedProcessor proposedProcessor :
proposed.getProcessors()) {
- final ProcessorNode processor =
processorsByVersionedId.get(proposedProcessor.getIdentifier());
- if (processor == null) {
- final ProcessorNode added = addProcessor(group,
proposedProcessor, context.getComponentIdGenerator(), topLevelGroup);
- LOG.info("Added {} to {}", added, group);
- } else if
(updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
- updateProcessor(processor, proposedProcessor, topLevelGroup);
- // Any existing component that is modified during
synchronization may have its properties reverted to a pre-migration state,
- // so we then add it to the set to allow migrateProperties to
be called again to get it back to the migrated state
- createdAndModifiedExtensions.add(new
CreatedOrModifiedExtension(processor, getPropertyValues(processor)));
- LOG.info("Updated {}", processor);
- } else {
- processor.setPosition(new
Position(proposedProcessor.getPosition().getX(),
proposedProcessor.getPosition().getY()));
+ final Set<ProcessorNode> stoppedProcessors = new HashSet<>();
+
+ try {
+ for (final VersionedProcessor proposedProcessor :
proposed.getProcessors()) {
+ final ProcessorNode processor =
processorsByVersionedId.get(proposedProcessor.getIdentifier());
+ if (processor == null) {
+ final ProcessorNode added = addProcessor(group,
proposedProcessor, context.getComponentIdGenerator(), topLevelGroup);
+ LOG.info("Added {} to {}", added, group);
+ } else if
(updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
+ final long processorStopDeadline =
System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis();
+ try {
+ final boolean stopped = stopOrTerminate(processor,
processorStopDeadline, syncOptions);
+ if (stopped) {
+ stoppedProcessors.add(processor);
+ }
+ } catch (final TimeoutException |
FlowSynchronizationException e) {
+ throw new
ProcessorInstantiationException(processor.getIdentifier(), e);
+ }
+ updateProcessor(processor, proposedProcessor,
topLevelGroup);
+ // Any existing component that is modified during
synchronization may have its properties reverted to a pre-migration state,
+ // so we then add it to the set to allow migrateProperties
to be called again to get it back to the migrated state
+ createdAndModifiedExtensions.add(new
CreatedOrModifiedExtension(processor, getPropertyValues(processor)));
+ LOG.info("Updated {}", processor);
+ } else {
+ processor.setPosition(new
Position(proposedProcessor.getPosition().getX(),
proposedProcessor.getPosition().getY()));
+ }
+ }
+ } finally {
+ for (final ProcessorNode processor : stoppedProcessors) {
+ processor.getProcessGroup().startProcessor(processor, false);
+ notifyScheduledStateChange((ComponentNode) processor,
syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING);
}
Review Comment:
Good catch, this is fixed now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]