[ 
https://issues.apache.org/jira/browse/KAFKA-7225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16572268#comment-16572268
 ] 

ASF GitHub Bot commented on KAFKA-7225:
---------------------------------------

ewencp closed pull request #5445: KAFKA-7225 - Pretransform validated props
URL: https://github.com/apache/kafka/pull/5445
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index b5e0ec2c07b..cadb4e05d9a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -246,6 +246,9 @@ public ConnectorStateInfo connectorStatus(String connName) {
 
     @Override
     public ConfigInfos validateConnectorConfig(Map<String, String> 
connectorProps) {
+        if (worker.configTransformer() != null) {
+            connectorProps = 
worker.configTransformer().transform(connectorProps);
+        }
         String connType = 
connectorProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
         if (connType == null)
             throw new BadRequestException("Connector config " + connectorProps 
+ " contains no connector type");
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
index 7efb481ac75..1b715c70c76 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -38,10 +38,16 @@ public WorkerConfigTransformer(Worker worker, Map<String, 
ConfigProvider> config
         this.configTransformer = new ConfigTransformer(configProviders);
     }
 
+    public Map<String, String> transform(Map<String, String> configs) {
+        return transform(null, configs);
+    }
+
     public Map<String, String> transform(String connectorName, Map<String, 
String> configs) {
         if (configs == null) return null;
         ConfigTransformerResult result = configTransformer.transform(configs);
-        scheduleReload(connectorName, result.ttls());
+        if (connectorName != null) {
+            scheduleReload(connectorName, result.ttls());
+        }
         return result.data();
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 5728465095a..db3cf273fe7 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -67,6 +67,7 @@
     private final String connector = "connector";
 
     @MockStrict private Worker worker;
+    @MockStrict private WorkerConfigTransformer transformer;
     @MockStrict private Plugins plugins;
     @MockStrict private ClassLoader classLoader;
     @MockStrict private ConfigBackingStore configStore;
@@ -261,6 +262,9 @@ private AbstractHerder createConfigValidationHerder(Class<? 
extends Connector> c
         EasyMock.expect(herder.generation()).andStubReturn(generation);
 
         // Call to validateConnectorConfig
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
         final Connector connector;
         try {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 911afe7ec2f..a0de8cf14ac 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -34,6 +34,7 @@
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
 import 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.HerderMetrics;
 import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
@@ -158,6 +159,7 @@
     private DistributedHerder herder;
     private MockConnectMetrics metrics;
     @Mock private Worker worker;
+    @Mock private WorkerConfigTransformer transformer;
     @Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback;
     @Mock
     private Plugins plugins;
@@ -356,6 +358,9 @@ public void testCreateConnector() throws Exception {
 
         // config validation
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -399,6 +404,9 @@ public void testCreateConnectorFailedBasicValidation() 
throws Exception {
 
         // config validation
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -444,6 +452,9 @@ public void testCreateConnectorFailedCustomValidation() 
throws Exception {
 
         // config validation
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -495,6 +506,9 @@ public void testConnectorNameConflictsWithWorkerGroupId() 
throws Exception {
 
         // config validation
         Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -530,6 +544,9 @@ public void testConnectorNameConflictsWithWorkerGroupId() 
throws Exception {
     @Test
     public void testCreateConnectorAlreadyExists() throws Exception {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(null);
         expectRebalance(1, Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList());
@@ -1339,6 +1356,9 @@ public void testPutConnectorConfig() throws Exception {
 
         // config validation
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 5372a3a27a5..b98c15e7014 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -36,6 +36,7 @@
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.TaskStatus;
 import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
 import org.apache.kafka.connect.runtime.WorkerConnector;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
@@ -101,6 +102,7 @@
 
     private Connector connector;
     @Mock protected Worker worker;
+    @Mock protected WorkerConfigTransformer transformer;
     @Mock private Plugins plugins;
     @Mock
     private PluginClassLoader pluginLoader;
@@ -146,6 +148,9 @@ public void testCreateConnectorFailedBasicValidation() 
throws Exception {
         config.remove(ConnectorConfig.NAME_CONFIG);
 
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -171,6 +176,9 @@ public void testCreateConnectorFailedCustomValidation() 
throws Exception {
         connector = PowerMock.createMock(BogusSourceConnector.class);
 
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -205,6 +213,9 @@ public void testCreateConnectorAlreadyExists() throws 
Exception {
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, config, config);
 
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(2);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         // No new connector is created
@@ -565,6 +576,9 @@ public void testCorruptConfig() {
         );
         ConfigDef configDef = new ConfigDef();
         configDef.define(key, ConfigDef.Type.STRING, 
ConfigDef.Importance.HIGH, "");
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
@@ -672,6 +686,9 @@ private void expectConfigValidation(
             Map<String, String>... configs
     ) {
         // config validation
+        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
+        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         if (shouldCreateConnector) {
diff --git a/tests/kafkatest/tests/connect/connect_test.py 
b/tests/kafkatest/tests/connect/connect_test.py
index 37538763337..9d34c480870 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -27,6 +27,7 @@
 
 import hashlib
 import json
+import os.path
 
 
 class ConnectStandaloneFileTest(Test):
@@ -44,7 +45,8 @@ class ConnectStandaloneFileTest(Test):
 
     OFFSETS_FILE = "/mnt/connect.offsets"
 
-    TOPIC = "test"
+    TOPIC = 
"${file:/mnt/connect/connect-file-external.properties:topic.external}"
+    TOPIC_TEST = "test"
 
     FIRST_INPUT_LIST = ["foo", "bar", "baz"]
     FIRST_INPUT = "\n".join(FIRST_INPUT_LIST) + "\n"
@@ -90,13 +92,18 @@ def test_file_source_and_sink(self, 
converter="org.apache.kafka.connect.json.Jso
 
         self.source = ConnectStandaloneService(self.test_context, self.kafka, 
[self.INPUT_FILE, self.OFFSETS_FILE])
         self.sink = ConnectStandaloneService(self.test_context, self.kafka, 
[self.OUTPUT_FILE, self.OFFSETS_FILE])
-        self.consumer_validator = ConsoleConsumer(self.test_context, 1, 
self.kafka, self.TOPIC,
+        self.consumer_validator = ConsoleConsumer(self.test_context, 1, 
self.kafka, self.TOPIC_TEST,
                                                   consumer_timeout_ms=10000)
 
         self.zk.start()
         self.kafka.start()
 
+        source_external_props = os.path.join(self.source.PERSISTENT_ROOT, 
"connect-file-external.properties")
+        self.source.node.account.create_file(source_external_props, 
self.render('connect-file-external.properties'))
         self.source.set_configs(lambda node: 
self.render("connect-standalone.properties", node=node), 
[self.render("connect-file-source.properties")])
+
+        sink_external_props = os.path.join(self.sink.PERSISTENT_ROOT, 
"connect-file-external.properties")
+        self.sink.node.account.create_file(sink_external_props, 
self.render('connect-file-external.properties'))
         self.sink.set_configs(lambda node: 
self.render("connect-standalone.properties", node=node), 
[self.render("connect-file-sink.properties")])
 
         self.source.start()
diff --git 
a/tests/kafkatest/tests/connect/templates/connect-file-external.properties 
b/tests/kafkatest/tests/connect/templates/connect-file-external.properties
new file mode 100644
index 00000000000..8dccd2571f5
--- /dev/null
+++ b/tests/kafkatest/tests/connect/templates/connect-file-external.properties
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+topic.external={{ TOPIC_TEST }}
diff --git 
a/tests/kafkatest/tests/connect/templates/connect-standalone.properties 
b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
index a8eaa44832e..cbfe4910682 100644
--- a/tests/kafkatest/tests/connect/templates/connect-standalone.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
@@ -31,3 +31,6 @@ offset.storage.file.filename={{ OFFSETS_FILE }}
 
 # Reduce the admin client request timeouts so that we don't wait the default 
120 sec before failing to connect the admin client
 request.timeout.ms=30000
+
+config.providers=file
+config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Connect ConfigProvider not invoked before validation
> ----------------------------------------------------------
>
>                 Key: KAFKA-7225
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7225
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.0.0
>            Reporter: Nacho Munoz
>            Assignee: Robert Yokota
>            Priority: Minor
>             Fix For: 2.0.1, 2.1.0
>
>
> When trying to register a JDBC connector with externalised secrets (e.g. 
> connection.password) the validation fails and the endpoint returns a 500. I 
> think that the problem is that the config transformer is not being invoked 
> before the validation so trying to exercise the credentials against the 
> database fails. I have checked that publishing the connector configuration 
> directly to the connect-config topic to skip the validation and restarting 
> the server is enough to get the connector working so that confirms that we 
> are just missing to call config transformer before validating the connector. 
> Please let me know if you need further information.
> I'm happy to open a PR to address this issue given that I think that this is 
> easy enough to fix for a new contributor to the project. So please feel free 
> to assign the resolution of the bug to me.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to