[ 
https://issues.apache.org/jira/browse/KAFKA-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17108867#comment-17108867
 ] 

Chris Egerton commented on KAFKA-9981:
--------------------------------------

[~ryannedolan] I think these configuration updates come from the connector 
requesting task reconfiguration from the framework: 
[https://github.com/apache/kafka/blob/62fa8fc9a95d738780d1f73d2d758d7329828feb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L232]

 

In distributed mode, this causes the framework to generate new task configs 
from the connector and then, if they've changed, try to write them to the 
config topic. However, only the leader is allowed to write directly to the 
config topic, so if the connector is hosted on a follower node, then the node 
has to forward those configs to the leader via the REST API: 
[https://github.com/apache/kafka/blob/62fa8fc9a95d738780d1f73d2d758d7329828feb/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1316-L1340]

 

The endpoint for receiving these task configs was the subject of KIP-507, which 
sought to close a security loophole that it presented at the time. You can see 
the code for that internal endpoint here: 
[https://github.com/apache/kafka/blob/62fa8fc9a95d738780d1f73d2d758d7329828feb/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java#L268-L278]

> Running a dedicated mm2 cluster with more than one nodes,When the 
> configuration is updated the task is not aware and will lose the update 
> operation.
> ----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-9981
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9981
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 2.4.0, 2.5.0, 2.4.1
>            Reporter: victor
>            Priority: Major
>
> DistributedHerder.reconfigureConnector induction config update as follows:
> {code:java}
> if (changed) {
>     List<Map<String, String>> rawTaskProps = reverseTransform(connName, 
> configState, taskProps);
>     if (isLeader()) {
>         configBackingStore.putTaskConfigs(connName, rawTaskProps);
>         cb.onCompletion(null, null);
>     } else {
>         // We cannot forward the request on the same thread because this 
> reconfiguration can happen as a result of connector
>         // addition or removal. If we blocked waiting for the response from 
> leader, we may be kicked out of the worker group.
>         forwardRequestExecutor.submit(new Runnable() {
>             @Override
>             public void run() {
>                 try {
>                     String leaderUrl = leaderUrl();
>                     if (leaderUrl == null || leaderUrl.trim().isEmpty()) {
>                         cb.onCompletion(new ConnectException("Request to 
> leader to " +
>                                 "reconfigure connector tasks failed " +
>                                 "because the URL of the leader's REST 
> interface is empty!"), null);
>                         return;
>                     }
>                     String reconfigUrl = RestServer.urlJoin(leaderUrl, 
> "/connectors/" + connName + "/tasks");
>                     log.trace("Forwarding task configurations for connector 
> {} to leader", connName);
>                     RestClient.httpRequest(reconfigUrl, "POST", null, 
> rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm);
>                     cb.onCompletion(null, null);
>                 } catch (ConnectException e) {
>                     log.error("Request to leader to reconfigure connector 
> tasks failed", e);
>                     cb.onCompletion(e, null);
>                 }
>             }
>         });
>     }
> }
> {code}
> KafkaConfigBackingStore task checks for configuration updates,such as topic 
> whitelist update.If KafkaConfigBackingStore task is not running on leader 
> node,an HTTP request will be send to notify the leader of the configuration 
> update.However,dedicated mm2 cluster does not have the HTTP server turned 
> on,so the request will fail to be sent,causing the update operation to be 
> lost.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to