[ https://issues.apache.org/jira/browse/KAFKA-7225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16572268#comment-16572268 ]
ASF GitHub Bot commented on KAFKA-7225: --------------------------------------- ewencp closed pull request #5445: KAFKA-7225 - Pretransform validated props URL: https://github.com/apache/kafka/pull/5445 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index b5e0ec2c07b..cadb4e05d9a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -246,6 +246,9 @@ public ConnectorStateInfo connectorStatus(String connName) { @Override public ConfigInfos validateConnectorConfig(Map<String, String> connectorProps) { + if (worker.configTransformer() != null) { + connectorProps = worker.configTransformer().transform(connectorProps); + } String connType = connectorProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); if (connType == null) throw new BadRequestException("Connector config " + connectorProps + " contains no connector type"); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java index 7efb481ac75..1b715c70c76 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java @@ -38,10 +38,16 @@ public WorkerConfigTransformer(Worker worker, Map<String, ConfigProvider> config this.configTransformer = new ConfigTransformer(configProviders); } + public Map<String, String> transform(Map<String, String> configs) { + return transform(null, configs); + } + public Map<String, String> transform(String connectorName, Map<String, String> configs) { if (configs == null) return null; ConfigTransformerResult result = configTransformer.transform(configs); - scheduleReload(connectorName, result.ttls()); + if (connectorName != null) { + scheduleReload(connectorName, result.ttls()); + } return result.data(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index 5728465095a..db3cf273fe7 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -67,6 +67,7 @@ private final String connector = "connector"; @MockStrict private Worker worker; + @MockStrict private WorkerConfigTransformer transformer; @MockStrict private Plugins plugins; @MockStrict private ClassLoader classLoader; @MockStrict private ConfigBackingStore configStore; @@ -261,6 +262,9 @@ private AbstractHerder createConfigValidationHerder(Class<? extends Connector> c EasyMock.expect(herder.generation()).andStubReturn(generation); // Call to validateConnectorConfig + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andStubReturn(plugins); final Connector connector; try { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 911afe7ec2f..a0de8cf14ac 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.WorkerConfigTransformer; import org.apache.kafka.connect.runtime.distributed.DistributedHerder.HerderMetrics; import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; @@ -158,6 +159,7 @@ private DistributedHerder herder; private MockConnectMetrics metrics; @Mock private Worker worker; + @Mock private WorkerConfigTransformer transformer; @Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback; @Mock private Plugins plugins; @@ -356,6 +358,9 @@ public void testCreateConnector() throws Exception { // config validation Connector connectorMock = PowerMock.createMock(SourceConnector.class); + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); @@ -399,6 +404,9 @@ public void testCreateConnectorFailedBasicValidation() throws Exception { // config validation Connector connectorMock = PowerMock.createMock(SourceConnector.class); + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); @@ -444,6 +452,9 @@ public void testCreateConnectorFailedCustomValidation() throws Exception { // config validation Connector connectorMock = PowerMock.createMock(SourceConnector.class); + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); @@ -495,6 +506,9 @@ public void testConnectorNameConflictsWithWorkerGroupId() throws Exception { // config validation Connector connectorMock = PowerMock.createMock(SinkConnector.class); + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); @@ -530,6 +544,9 @@ public void testConnectorNameConflictsWithWorkerGroupId() throws Exception { @Test public void testCreateConnectorAlreadyExists() throws Exception { EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(null); expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); @@ -1339,6 +1356,9 @@ public void testPutConnectorConfig() throws Exception { // config validation Connector connectorMock = PowerMock.createMock(SourceConnector.class); + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes(); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index 5372a3a27a5..b98c15e7014 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.kafka.connect.runtime.TaskStatus; import org.apache.kafka.connect.runtime.Worker; +import org.apache.kafka.connect.runtime.WorkerConfigTransformer; import org.apache.kafka.connect.runtime.WorkerConnector; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; @@ -101,6 +102,7 @@ private Connector connector; @Mock protected Worker worker; + @Mock protected WorkerConfigTransformer transformer; @Mock private Plugins plugins; @Mock private PluginClassLoader pluginLoader; @@ -146,6 +148,9 @@ public void testCreateConnectorFailedBasicValidation() throws Exception { config.remove(ConnectorConfig.NAME_CONFIG); Connector connectorMock = PowerMock.createMock(SourceConnector.class); + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); @@ -171,6 +176,9 @@ public void testCreateConnectorFailedCustomValidation() throws Exception { connector = PowerMock.createMock(BogusSourceConnector.class); Connector connectorMock = PowerMock.createMock(SourceConnector.class); + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); @@ -205,6 +213,9 @@ public void testCreateConnectorAlreadyExists() throws Exception { Connector connectorMock = PowerMock.createMock(SourceConnector.class); expectConfigValidation(connectorMock, true, config, config); + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(2); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); // No new connector is created @@ -565,6 +576,9 @@ public void testCorruptConfig() { ); ConfigDef configDef = new ConfigDef(); configDef.define(key, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, ""); + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(worker.getPlugins()).andStubReturn(plugins); @@ -672,6 +686,9 @@ private void expectConfigValidation( Map<String, String>... configs ) { // config validation + EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); + final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); + EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); if (shouldCreateConnector) { diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py index 37538763337..9d34c480870 100644 --- a/tests/kafkatest/tests/connect/connect_test.py +++ b/tests/kafkatest/tests/connect/connect_test.py @@ -27,6 +27,7 @@ import hashlib import json +import os.path class ConnectStandaloneFileTest(Test): @@ -44,7 +45,8 @@ class ConnectStandaloneFileTest(Test): OFFSETS_FILE = "/mnt/connect.offsets" - TOPIC = "test" + TOPIC = "${file:/mnt/connect/connect-file-external.properties:topic.external}" + TOPIC_TEST = "test" FIRST_INPUT_LIST = ["foo", "bar", "baz"] FIRST_INPUT = "\n".join(FIRST_INPUT_LIST) + "\n" @@ -90,13 +92,18 @@ def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.Jso self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE]) self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE]) - self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC, + self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC_TEST, consumer_timeout_ms=10000) self.zk.start() self.kafka.start() + source_external_props = os.path.join(self.source.PERSISTENT_ROOT, "connect-file-external.properties") + self.source.node.account.create_file(source_external_props, self.render('connect-file-external.properties')) self.source.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-source.properties")]) + + sink_external_props = os.path.join(self.sink.PERSISTENT_ROOT, "connect-file-external.properties") + self.sink.node.account.create_file(sink_external_props, self.render('connect-file-external.properties')) self.sink.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-sink.properties")]) self.source.start() diff --git a/tests/kafkatest/tests/connect/templates/connect-file-external.properties b/tests/kafkatest/tests/connect/templates/connect-file-external.properties new file mode 100644 index 00000000000..8dccd2571f5 --- /dev/null +++ b/tests/kafkatest/tests/connect/templates/connect-file-external.properties @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +topic.external={{ TOPIC_TEST }} diff --git a/tests/kafkatest/tests/connect/templates/connect-standalone.properties b/tests/kafkatest/tests/connect/templates/connect-standalone.properties index a8eaa44832e..cbfe4910682 100644 --- a/tests/kafkatest/tests/connect/templates/connect-standalone.properties +++ b/tests/kafkatest/tests/connect/templates/connect-standalone.properties @@ -31,3 +31,6 @@ offset.storage.file.filename={{ OFFSETS_FILE }} # Reduce the admin client request timeouts so that we don't wait the default 120 sec before failing to connect the admin client request.timeout.ms=30000 + +config.providers=file +config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka Connect ConfigProvider not invoked before validation > ---------------------------------------------------------- > > Key: KAFKA-7225 > URL: https://issues.apache.org/jira/browse/KAFKA-7225 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 2.0.0 > Reporter: Nacho Munoz > Assignee: Robert Yokota > Priority: Minor > Fix For: 2.0.1, 2.1.0 > > > When trying to register a JDBC connector with externalised secrets (e.g. > connection.password) the validation fails and the endpoint returns a 500. I > think that the problem is that the config transformer is not being invoked > before the validation so trying to exercise the credentials against the > database fails. I have checked that publishing the connector configuration > directly to the connect-config topic to skip the validation and restarting > the server is enough to get the connector working so that confirms that we > are just missing to call config transformer before validating the connector. > Please let me know if you need further information. > I'm happy to open a PR to address this issue given that I think that this is > easy enough to fix for a new contributor to the project. So please feel free > to assign the resolution of the bug to me. -- This message was sent by Atlassian JIRA (v7.6.3#76005)