mcgilman commented on code in PR #10654:
URL: https://github.com/apache/nifi/pull/10654#discussion_r2627263268
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java:
##########
@@ -488,21 +502,45 @@ private StandardConnectorNode createConnectorNode() {
private StandardConnectorNode createConnectorNode(final Connector
connector) {
final ConnectorStateTransition stateTransition = new
StandardConnectorStateTransition("TestConnectorNode");
+ final ConnectorValidationTrigger validationTrigger = new
SynchronousConnectorValidationTrigger();
final StandardConnectorNode node = new StandardConnectorNode(
"test-connector-id",
mock(FlowManager.class),
- extensionManager, null,
+ extensionManager,
+ null,
createConnectorDetails(connector),
"TestConnector",
+ connector.getClass().getCanonicalName(),
new StandardConnectorConfigurationContext(assetManager,
secretsManager),
stateTransition,
- flowContextFactory);
+ flowContextFactory,
+ validationTrigger);
-
node.initializeConnector(mock(FrameworkConnectorInitializationContext.class));
+ // mock secrets manager
+ final SecretsManager secretsManager = mock(SecretsManager.class);
+ when(secretsManager.getAllSecrets()).thenReturn(List.of());
+
when(secretsManager.getSecrets(anySet())).thenReturn(Collections.emptyMap());
+
+ final FrameworkConnectorInitializationContext initializationContext =
mock(FrameworkConnectorInitializationContext.class);
+
when(initializationContext.getSecretsManager()).thenReturn(secretsManager);
+
+ node.initializeConnector(initializationContext);
assertDoesNotThrow(node::loadInitialFlow);
Review Comment:
Not a change in this PR but apparently use of `assertDoesNotThrow` is
against the rules [1].
[1] https://github.com/apache/nifi/blob/NIFI-15258/.cursorrules#L69
##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/components/connector/secrets/TestParameterProviderSecretsManager.java:
##########
@@ -184,6 +189,7 @@ public void
testGetAllSecretsRetrievesSecretsFromAllProviders() {
assertTrue(foundSecret3);
}
+
Review Comment:
Unneeded extra space.
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java:
##########
@@ -102,60 +103,91 @@ public Future<Void> stopConnector(final ConnectorNode
connector) {
}
@Override
- public void applyUpdate(final ConnectorNode connector) throws
FlowUpdateException {
+ public void applyUpdate(final ConnectorNode connector, final
ConnectorUpdateContext context) throws FlowUpdateException {
final ConnectorState initialDesiredState = connector.getDesiredState();
- // Perform whatever preparation is necessary for the update. Default
implementation is to stop the connector.
- connector.prepareForUpdate();
+ // Transition the connector's state to PREPARING_FOR_UPDATE before
starting the background process.
+ // This allows us to ensure that if we poll and see the state in the
same state it was in before that
+ // we know the update has already completed (successfully or
otherwise).
+ connector.transitionStateForUpdating();
- try {
- // Wait for Connector State to become UPDATING
- waitForState(connector, ConnectorState.UPDATING,
Set.of(ConnectorState.PREPARING_FOR_UPDATE));
+ // Update connector in a background thread. This will handle
transitioning the Connector state appropriately
+ // so that it's clear when the update has completed.
+ lifecycleExecutor.submit(() -> updateConnector(connector,
initialDesiredState, context));
+ }
- // Apply the update to the connector.
- connector.applyUpdate();
+ private void updateConnector(final ConnectorNode connector, final
ConnectorState initialDesiredState, final ConnectorUpdateContext context) {
+ try {
+ // Perform whatever preparation is necessary for the update.
Default implementation is to stop the connector.
+ connector.prepareForUpdate();
+
+ try {
+ // Wait for Connector State to become UPDATING
+ waitForState(connector, Set.of(ConnectorState.UPDATING),
Set.of(ConnectorState.PREPARING_FOR_UPDATE));
+
+ // Apply the update to the connector.
+ connector.applyUpdate();
+
+ // Now that the update has been applied, save the flow so that
the updated configuration is persisted.
+ context.saveFlow();
+
+ // Wait for Connector State to become UPDATED, or to revert to
the initial desired state because, depending upon timing,
+ // other nodes may have already seen the transition to UPDATED
and moved the connector back to the initial desired state.
+ final Set<ConnectorState> desirableStates = new HashSet<>();
+ desirableStates.add(initialDesiredState);
+ desirableStates.add(ConnectorState.UPDATED);
+ if (initialDesiredState == ConnectorState.RUNNING) {
+ desirableStates.add(ConnectorState.STARTING);
+ } else if (initialDesiredState == ConnectorState.STOPPED) {
+ desirableStates.add(ConnectorState.STOPPING);
+ }
+ waitForState(connector, desirableStates,
Set.of(ConnectorState.UPDATING));
- // Wait for Connector State to become UPDATED
- waitForState(connector, ConnectorState.UPDATED,
Set.of(ConnectorState.UPDATING));
+ // If the initial desired state was RUNNING, start the
connector again. Otherwise, stop it.
+ // We don't simply leave it be as the prepareForUpdate /
update may have changed the state of some components.
+ if (initialDesiredState == ConnectorState.RUNNING) {
+ connector.start(lifecycleExecutor);
+ } else {
+ connector.stop(lifecycleExecutor);
+ }
- // If the initial desired state was RUNNING, start the connector
again. Otherwise, stop it.
- // We don't simply leave it be as the prepareForUpdate / update
may have changed the state of some components.
- if (initialDesiredState == ConnectorState.RUNNING) {
- connector.start(lifecycleExecutor);
- } else {
- connector.stop(lifecycleExecutor);
+ // We've updated the state of the connector so save flow again
+ context.saveFlow();
+ } catch (final Exception e) {
+ logger.error("Failed to apply connector update for {}",
connector, e);
+ connector.abortUpdate(e);
}
} catch (final Exception e) {
- connector.abortUpdate(e);
+ logger.error("Failed to apply connector update for {}", connector,
e);
Review Comment:
If `prepareForUpdate` fails, could the connector state be left in
`PREPARING_FOR_UPDATE`?
##########
nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/bootstrap.conf:
##########
@@ -30,7 +30,7 @@ java.arg.3=-Xmx512m
java.arg.14=-Djava.awt.headless=true
-#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8002
+java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8002
Review Comment:
Guessing this was leftover from some debugging.
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java:
##########
@@ -102,60 +103,91 @@ public Future<Void> stopConnector(final ConnectorNode
connector) {
}
@Override
- public void applyUpdate(final ConnectorNode connector) throws
FlowUpdateException {
+ public void applyUpdate(final ConnectorNode connector, final
ConnectorUpdateContext context) throws FlowUpdateException {
final ConnectorState initialDesiredState = connector.getDesiredState();
- // Perform whatever preparation is necessary for the update. Default
implementation is to stop the connector.
- connector.prepareForUpdate();
+ // Transition the connector's state to PREPARING_FOR_UPDATE before
starting the background process.
+ // This allows us to ensure that if we poll and see the state in the
same state it was in before that
+ // we know the update has already completed (successfully or
otherwise).
+ connector.transitionStateForUpdating();
- try {
- // Wait for Connector State to become UPDATING
- waitForState(connector, ConnectorState.UPDATING,
Set.of(ConnectorState.PREPARING_FOR_UPDATE));
+ // Update connector in a background thread. This will handle
transitioning the Connector state appropriately
+ // so that it's clear when the update has completed.
+ lifecycleExecutor.submit(() -> updateConnector(connector,
initialDesiredState, context));
+ }
- // Apply the update to the connector.
- connector.applyUpdate();
+ private void updateConnector(final ConnectorNode connector, final
ConnectorState initialDesiredState, final ConnectorUpdateContext context) {
+ try {
+ // Perform whatever preparation is necessary for the update.
Default implementation is to stop the connector.
+ connector.prepareForUpdate();
+
+ try {
+ // Wait for Connector State to become UPDATING
+ waitForState(connector, Set.of(ConnectorState.UPDATING),
Set.of(ConnectorState.PREPARING_FOR_UPDATE));
+
+ // Apply the update to the connector.
+ connector.applyUpdate();
+
+ // Now that the update has been applied, save the flow so that
the updated configuration is persisted.
+ context.saveFlow();
+
+ // Wait for Connector State to become UPDATED, or to revert to
the initial desired state because, depending upon timing,
+ // other nodes may have already seen the transition to UPDATED
and moved the connector back to the initial desired state.
+ final Set<ConnectorState> desirableStates = new HashSet<>();
+ desirableStates.add(initialDesiredState);
+ desirableStates.add(ConnectorState.UPDATED);
+ if (initialDesiredState == ConnectorState.RUNNING) {
+ desirableStates.add(ConnectorState.STARTING);
+ } else if (initialDesiredState == ConnectorState.STOPPED) {
+ desirableStates.add(ConnectorState.STOPPING);
+ }
+ waitForState(connector, desirableStates,
Set.of(ConnectorState.UPDATING));
- // Wait for Connector State to become UPDATED
- waitForState(connector, ConnectorState.UPDATED,
Set.of(ConnectorState.UPDATING));
+ // If the initial desired state was RUNNING, start the
connector again. Otherwise, stop it.
+ // We don't simply leave it be as the prepareForUpdate /
update may have changed the state of some components.
+ if (initialDesiredState == ConnectorState.RUNNING) {
+ connector.start(lifecycleExecutor);
+ } else {
+ connector.stop(lifecycleExecutor);
+ }
- // If the initial desired state was RUNNING, start the connector
again. Otherwise, stop it.
- // We don't simply leave it be as the prepareForUpdate / update
may have changed the state of some components.
- if (initialDesiredState == ConnectorState.RUNNING) {
- connector.start(lifecycleExecutor);
- } else {
- connector.stop(lifecycleExecutor);
+ // We've updated the state of the connector so save flow again
+ context.saveFlow();
+ } catch (final Exception e) {
+ logger.error("Failed to apply connector update for {}",
connector, e);
+ connector.abortUpdate(e);
}
} catch (final Exception e) {
- connector.abortUpdate(e);
+ logger.error("Failed to apply connector update for {}", connector,
e);
Review Comment:
If an exception is thrown, we only want to log it here? No need to rethrow
or somehow report that the update was unsuccessful?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]