markap14 commented on code in PR #10654:
URL: https://github.com/apache/nifi/pull/10654#discussion_r2627479672


##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java:
##########
@@ -102,60 +103,91 @@ public Future<Void> stopConnector(final ConnectorNode 
connector) {
     }
 
     @Override
-    public void applyUpdate(final ConnectorNode connector) throws 
FlowUpdateException {
+    public void applyUpdate(final ConnectorNode connector, final 
ConnectorUpdateContext context) throws FlowUpdateException {
         final ConnectorState initialDesiredState = connector.getDesiredState();
 
-        // Perform whatever preparation is necessary for the update. Default 
implementation is to stop the connector.
-        connector.prepareForUpdate();
+        // Transition the connector's state to PREPARING_FOR_UPDATE before 
starting the background process.
+        // This allows us to ensure that if we poll and see the state in the 
same state it was in before that
+        // we know the update has already completed (successfully or 
otherwise).
+        connector.transitionStateForUpdating();
 
-        try {
-            // Wait for Connector State to become UPDATING
-            waitForState(connector, ConnectorState.UPDATING, 
Set.of(ConnectorState.PREPARING_FOR_UPDATE));
+        // Update connector in a background thread. This will handle 
transitioning the Connector state appropriately
+        // so that it's clear when the update has completed.
+        lifecycleExecutor.submit(() -> updateConnector(connector, 
initialDesiredState, context));
+    }
 
-            // Apply the update to the connector.
-            connector.applyUpdate();
+    private void updateConnector(final ConnectorNode connector, final 
ConnectorState initialDesiredState, final ConnectorUpdateContext context) {
+        try {
+            // Perform whatever preparation is necessary for the update. 
Default implementation is to stop the connector.
+            connector.prepareForUpdate();
+
+            try {
+                // Wait for Connector State to become UPDATING
+                waitForState(connector, Set.of(ConnectorState.UPDATING), 
Set.of(ConnectorState.PREPARING_FOR_UPDATE));
+
+                // Apply the update to the connector.
+                connector.applyUpdate();
+
+                // Now that the update has been applied, save the flow so that 
the updated configuration is persisted.
+                context.saveFlow();
+
+                // Wait for Connector State to become UPDATED, or to revert to 
the initial desired state because, depending upon timing,
+                // other nodes may have already seen the transition to UPDATED 
and moved the connector back to the initial desired state.
+                final Set<ConnectorState> desirableStates = new HashSet<>();
+                desirableStates.add(initialDesiredState);
+                desirableStates.add(ConnectorState.UPDATED);
+                if (initialDesiredState == ConnectorState.RUNNING) {
+                    desirableStates.add(ConnectorState.STARTING);
+                } else if (initialDesiredState == ConnectorState.STOPPED) {
+                    desirableStates.add(ConnectorState.STOPPING);
+                }
+                waitForState(connector, desirableStates, 
Set.of(ConnectorState.UPDATING));
 
-            // Wait for Connector State to become UPDATED
-            waitForState(connector, ConnectorState.UPDATED, 
Set.of(ConnectorState.UPDATING));
+                // If the initial desired state was RUNNING, start the 
connector again. Otherwise, stop it.
+                // We don't simply leave it be as the prepareForUpdate / 
update may have changed the state of some components.
+                if (initialDesiredState == ConnectorState.RUNNING) {
+                    connector.start(lifecycleExecutor);
+                } else {
+                    connector.stop(lifecycleExecutor);
+                }
 
-            // If the initial desired state was RUNNING, start the connector 
again. Otherwise, stop it.
-            // We don't simply leave it be as the prepareForUpdate / update 
may have changed the state of some components.
-            if (initialDesiredState == ConnectorState.RUNNING) {
-                connector.start(lifecycleExecutor);
-            } else {
-                connector.stop(lifecycleExecutor);
+                // We've updated the state of the connector so save flow again
+                context.saveFlow();
+            } catch (final Exception e) {
+                logger.error("Failed to apply connector update for {}", 
connector, e);
+                connector.abortUpdate(e);
             }
         } catch (final Exception e) {
-            connector.abortUpdate(e);
+            logger.error("Failed to apply connector update for {}", connector, 
e);

Review Comment:
   No, you're right. We should abort in this case also. Will fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to