C0urante commented on a change in pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#discussion_r420905737



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -1182,28 +1251,47 @@ public Void call() throws Exception {
 
     // Helper for starting a connector with the given name, which will extract 
& parse the config, generate connector
     // context and add to the worker. This needs to be called from within the 
main worker thread for this herder.
-    private boolean startConnector(String connectorName) {
+    // The callback is invoked after the connector has finished startup and 
generated task configs, or failed in the process.
+    private void startConnector(String connectorName, Callback<Void> callback) 
{
         log.info("Starting connector {}", connectorName);
         final Map<String, String> configProps = 
configState.connectorConfig(connectorName);
-        final ConnectorContext ctx = new HerderConnectorContext(this, 
connectorName);
+        final CloseableConnectorContext ctx = new HerderConnectorContext(this, 
connectorName);
         final TargetState initialState = 
configState.targetState(connectorName);
-        boolean started = worker.startConnector(connectorName, configProps, 
ctx, this, initialState);
-
-        // Immediately request configuration since this could be a brand new 
connector. However, also only update those
-        // task configs if they are actually different from the existing ones 
to avoid unnecessary updates when this is
-        // just restoring an existing connector.
-        if (started && initialState == TargetState.STARTED)
-            reconfigureConnectorTasksWithRetry(time.milliseconds(), 
connectorName);
+        final Callback<TargetState> onInitialStateChange = (error, newState) 
-> {
+            if (error != null) {
+                callback.onCompletion(new ConnectException("Failed to start 
connector: " + connectorName), null);
+                return;
+            }
 
-        return started;
+            // newState should be equal to initialState, but use it just in 
case
+            if (newState == TargetState.STARTED) {
+                addRequest(
+                    new Callable<Void>() {
+                        @Override
+                        public Void call() {
+                            // Request configuration since this could be a 
brand new connector. However, also only update those
+                            // task configs if they are actually different 
from the existing ones to avoid unnecessary updates when this is
+                            // just restoring an existing connector.
+                            
reconfigureConnectorTasksWithRetry(time.milliseconds(), connectorName);
+                            callback.onCompletion(null, null);
+                            return null;
+                        }
+                    },
+                    forwardErrorCallback(callback)
+                );
+            } else {
+                callback.onCompletion(null, null);
+            }
+        };
+        worker.startConnector(connectorName, configProps, ctx, this, 
initialState, onInitialStateChange);
     }
 
     private Callable<Void> getConnectorStartingCallable(final String 
connectorName) {
         return new Callable<Void>() {
             @Override
             public Void call() throws Exception {
                 try {
-                    startConnector(connectorName);
+                    startConnector(connectorName, (error, result) -> { });
                 } catch (Throwable t) {
                     log.error("Couldn't instantiate connector " + 
connectorName + " because it has an invalid connector " +

Review comment:
       Since we're catching `Throwable` here, yes, I do think it's still 
technically possible to trigger the log statement. But you're definitely right 
that the actual content of that message should be moved into the callback; 
thinking the log message here in the `catch` block can be made even scarier 
since if something goes wrong here that's really unexpected.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to