C0urante commented on a change in pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#discussion_r421091305
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -536,20 +561,37 @@ private void processConnectorConfigUpdates(Set<String>
connectorConfigUpdates) {
// If we only have connector config updates, we can just bounce the
updated connectors that are
// currently assigned to this worker.
Set<String> localConnectors = assignment == null ?
Collections.<String>emptySet() : new HashSet<>(assignment.connectors());
+ log.trace(
+ "Processing connector config updates; "
+ + "currently-owned connectors are {}, and to-be-updated
connectors are {}",
+ localConnectors,
+ connectorConfigUpdates
+ );
for (String connectorName : connectorConfigUpdates) {
- if (!localConnectors.contains(connectorName))
+ if (!localConnectors.contains(connectorName)) {
+ log.trace(
+ "Skipping config update for connector {} as it is not
owned by this worker",
+ connectorName
+ );
continue;
+ }
boolean remains = configState.contains(connectorName);
log.info("Handling connector-only config update by {} connector
{}",
remains ? "restarting" : "stopping", connectorName);
- worker.stopConnector(connectorName);
+ worker.stopAndAwaitConnector(connectorName);
// The update may be a deletion, so verify we actually need to
restart the connector
if (remains)
- startConnector(connectorName);
+ startConnector(connectorName, (error, result) -> { });
Review comment:
Agh, you're right. Thought I'd committed that but apparently not. Will
add.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]