[
https://issues.apache.org/jira/browse/KAFKA-4930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16347164#comment-16347164
]
ASF GitHub Bot commented on KAFKA-4930:
---------------------------------------
ewencp closed pull request #2755: KAFKA-4930: Added connector name validator …
URL: https://github.com/apache/kafka/pull/2755
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 30802984cac..bb199dde202 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -982,6 +982,40 @@ public String toString() {
}
}
+ public static class NonEmptyStringWithoutControlChars implements Validator
{
+
+ public static NonEmptyStringWithoutControlChars
nonEmptyStringWithoutControlChars() {
+ return new NonEmptyStringWithoutControlChars();
+ }
+
+ @Override
+ public void ensureValid(String name, Object value) {
+ String s = (String) value;
+
+ if (s == null) {
+ // This can happen during creation of the config object due to
no default value being defined for the
+ // name configuration - a missing name parameter is caught
when checking for mandatory parameters,
+ // thus we can ok a null value here
+ return;
+ } else if (s.isEmpty()) {
+ throw new ConfigException(name, value, "String may not be
empty");
+ }
+
+ // Check name string for illegal characters
+ ArrayList<Integer> foundIllegalCharacters = new ArrayList<>();
+
+ for (int i = 0; i < s.length(); i++) {
+ if (Character.isISOControl(s.codePointAt(i))) {
+ foundIllegalCharacters.add(s.codePointAt(i));
+ }
+ }
+
+ if (!foundIllegalCharacters.isEmpty()) {
+ throw new ConfigException(name, value, "String may not contain
control sequences but had the following ASCII chars: " +
Utils.join(foundIllegalCharacters, ", "));
+ }
+ }
+ }
+
public static class ConfigKey {
public final String name;
public final Type type;
diff --git
a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 602147b3114..339c51aa4b1 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -160,6 +160,9 @@ public void testValidators() {
testValidators(Type.LIST, ConfigDef.ValidList.in("1", "2", "3"), "1",
new Object[]{"1", "2", "3"}, new Object[]{"4", "5", "6"});
testValidators(Type.STRING, new ConfigDef.NonNullValidator(), "a", new
Object[]{"abb"}, new Object[] {null});
testValidators(Type.STRING, ConfigDef.CompositeValidator.of(new
ConfigDef.NonNullValidator(), ValidString.in("a", "b")), "a", new Object[]{"a",
"b"}, new Object[] {null, -1, "c"});
+ testValidators(Type.STRING, new
ConfigDef.NonEmptyStringWithoutControlChars(), "defaultname",
+ new Object[]{"test", "name", "test/test", "test\u1234",
"\u1324name\\", "/+%>&):??<&()?-", "+1", "\uD83D\uDE01", "\uF3B1", " test
\n\r", "\n hello \t"},
+ new Object[]{"nontrailing\nnotallowed", "as\u0001cii control
char", "tes\rt", "test\btest", "1\t2", ""});
}
@Test
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index e63d1002a92..aad12c3a4a5 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -37,6 +37,7 @@
import java.util.Map;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static
org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
/**
* <p>
@@ -96,7 +97,7 @@ public Object get(String key) {
public static ConfigDef configDef() {
return new ConfigDef()
- .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC,
COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY)
+ .define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
nonEmptyStringWithoutControlChars(), Importance.HIGH, NAME_DOC, COMMON_GROUP,
1, Width.MEDIUM, NAME_DISPLAY)
.define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH,
CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY)
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT,
atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3,
Width.SHORT, TASK_MAX_DISPLAY)
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null,
Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, 4, Width.SHORT,
KEY_CONVERTER_CLASS_DISPLAY)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 2c031245c06..50d67ce9ce7 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -87,10 +87,11 @@ public ConnectorsResource(Herder herder) {
@Path("/")
public Response createConnector(final @QueryParam("forward") Boolean
forward,
final CreateConnectorRequest
createRequest) throws Throwable {
- String name = createRequest.name();
- if (name.contains("/")) {
- throw new BadRequestException("connector name should not contain
'/'");
- }
+ // Trim leading and trailing whitespaces from the connector name,
replace null with empty string
+ // if no name element present to keep validation within validator
(NonEmptyStringWithoutControlChars
+ // allows null values)
+ String name = createRequest.name() == null ? "" :
createRequest.name().trim();
+
Map<String, String> configs = createRequest.config();
checkAndPutConnectorConfigName(name, configs);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
index f8c4fd690dc..fe1bf264654 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -90,6 +90,14 @@ public void danglingTransformAlias() {
new ConnectorConfig(MOCK_PLUGINS, props);
}
+ @Test(expected = ConfigException.class)
+ public void emptyConnectorName() {
+ Map<String, String> props = new HashMap<>();
+ props.put("name", "");
+ props.put("connector.class", TestConnector.class.getName());
+ new ConnectorConfig(MOCK_PLUGINS, props);
+ }
+
@Test(expected = ConfigException.class)
public void wrongTransformationType() {
Map<String, String> props = new HashMap<>();
@@ -168,5 +176,5 @@ public void multipleTransforms() {
assertEquals(42, ((SimpleTransformation)
transformations.get(0)).magicNumber);
assertEquals(84, ((SimpleTransformation)
transformations.get(1)).magicNumber);
}
-
+
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 89a221835ef..ab4fb0a0f9e 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -67,8 +67,11 @@
// URL construction properly, avoiding //, which will mess up routing in
the REST server
private static final String LEADER_URL = "http://leader:8083/";
private static final String CONNECTOR_NAME = "test";
- private static final String CONNECTOR_NAME_SPECIAL_CHARS =
"t?a=b&c=d\rx=1.1\n>< \t`'\" x%y+z!#$&'()*+,:;=?@[]";
+ private static final String CONNECTOR_NAME_SPECIAL_CHARS =
"ta/b&c=d//\\rx=1þ.1>< `'\" x%y+z!ሴ#$&'(æ)*+,:;=?ñ@[]ÿ";
+ private static final String CONNECTOR_NAME_CONTROL_SEQUENCES1 =
"ta/b&c=drx=1\n.1>< `'\" x%y+z!#$&'()*+,:;=?@[]";
private static final String CONNECTOR2_NAME = "test2";
+ private static final String CONNECTOR_NAME_ALL_WHITESPACES = " \t\n \b";
+ private static final String CONNECTOR_NAME_PADDING_WHITESPACES = " " +
CONNECTOR_NAME + " \n ";
private static final Boolean FORWARD = true;
private static final Map<String, String> CONNECTOR_CONFIG_SPECIAL_CHARS =
new HashMap<>();
static {
@@ -81,6 +84,24 @@
CONNECTOR_CONFIG.put("name", CONNECTOR_NAME);
CONNECTOR_CONFIG.put("sample_config", "test_config");
}
+
+ private static final Map<String, String>
CONNECTOR_CONFIG_CONTROL_SEQUENCES = new HashMap<>();
+ static {
+ CONNECTOR_CONFIG_CONTROL_SEQUENCES.put("name",
CONNECTOR_NAME_CONTROL_SEQUENCES1);
+ CONNECTOR_CONFIG_CONTROL_SEQUENCES.put("sample_config", "test_config");
+ }
+
+ private static final Map<String, String> CONNECTOR_CONFIG_WITHOUT_NAME =
new HashMap<>();
+ static {
+ CONNECTOR_CONFIG_WITHOUT_NAME.put("sample_config", "test_config");
+ }
+
+ private static final Map<String, String> CONNECTOR_CONFIG_WITH_EMPTY_NAME
= new HashMap<>();
+
+ static {
+ CONNECTOR_CONFIG_WITH_EMPTY_NAME.put(ConnectorConfig.NAME_CONFIG, "");
+ CONNECTOR_CONFIG_WITH_EMPTY_NAME.put("sample_config", "test_config");
+ }
private static final List<ConnectorTaskId> CONNECTOR_TASK_NAMES =
Arrays.asList(
new ConnectorTaskId(CONNECTOR_NAME, 0),
new ConnectorTaskId(CONNECTOR_NAME, 1)
@@ -107,6 +128,12 @@ public void setUp() throws NoSuchMethodException {
connectorsResource = new ConnectorsResource(herder);
}
+ private static final Map<String, String> getConnectorConfig(Map<String,
String> mapToClone) {
+ Map<String, String> result = new HashMap<>();
+ result.putAll(mapToClone);
+ return result;
+ }
+
@Test
public void testListConnectors() throws Throwable {
final Capture<Callback<Collection<String>>> cb = Capture.newInstance();
@@ -205,20 +232,59 @@ public void testCreateConnectorExists() throws Throwable {
PowerMock.verifyAll();
}
- @Test(expected = BadRequestException.class)
- public void testCreateConnectorWithASlashInItsName() throws Throwable {
- String badConnectorName = CONNECTOR_NAME + "/" + "test";
+ @Test
+ public void testCreateConnectorNameTrimWhitespaces() throws Throwable {
+ // Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes
it (puts the name in it) and this
+ // will affect later tests
+ Map<String, String> inputConfig =
getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
+ final CreateConnectorRequest bodyIn = new
CreateConnectorRequest(CONNECTOR_NAME_PADDING_WHITESPACES, inputConfig);
+ final CreateConnectorRequest bodyOut = new
CreateConnectorRequest(CONNECTOR_NAME, CONNECTOR_CONFIG);
+
+ final Capture<Callback<Herder.Created<ConnectorInfo>>> cb =
Capture.newInstance();
+ herder.putConnectorConfig(EasyMock.eq(bodyOut.name()),
EasyMock.eq(bodyOut.config()), EasyMock.eq(false), EasyMock.capture(cb));
+ expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(bodyOut.name(), bodyOut.config(), CONNECTOR_TASK_NAMES,
ConnectorType.SOURCE)));
+
+ PowerMock.replayAll();
+
+ connectorsResource.createConnector(FORWARD, bodyIn);
- CreateConnectorRequest body = new
CreateConnectorRequest(badConnectorName,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, badConnectorName));
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCreateConnectorNameAllWhitespaces() throws Throwable {
+ // Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes
it (puts the name in it) and this
+ // will affect later tests
+ Map<String, String> inputConfig =
getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
+ final CreateConnectorRequest bodyIn = new
CreateConnectorRequest(CONNECTOR_NAME_ALL_WHITESPACES, inputConfig);
+ final CreateConnectorRequest bodyOut = new CreateConnectorRequest("",
CONNECTOR_CONFIG_WITH_EMPTY_NAME);
final Capture<Callback<Herder.Created<ConnectorInfo>>> cb =
Capture.newInstance();
- herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME),
EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
- expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES,
- ConnectorType.SOURCE)));
+ herder.putConnectorConfig(EasyMock.eq(bodyOut.name()),
EasyMock.eq(bodyOut.config()), EasyMock.eq(false), EasyMock.capture(cb));
+ expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(bodyOut.name(), bodyOut.config(), CONNECTOR_TASK_NAMES,
ConnectorType.SOURCE)));
PowerMock.replayAll();
- connectorsResource.createConnector(FORWARD, body);
+ connectorsResource.createConnector(FORWARD, bodyIn);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCreateConnectorNoName() throws Throwable {
+ // Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes
it (puts the name in it) and this
+ // will affect later tests
+ Map<String, String> inputConfig =
getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
+ final CreateConnectorRequest bodyIn = new CreateConnectorRequest(null,
inputConfig);
+ final CreateConnectorRequest bodyOut = new CreateConnectorRequest("",
CONNECTOR_CONFIG_WITH_EMPTY_NAME);
+
+ final Capture<Callback<Herder.Created<ConnectorInfo>>> cb =
Capture.newInstance();
+ herder.putConnectorConfig(EasyMock.eq(bodyOut.name()),
EasyMock.eq(bodyOut.config()), EasyMock.eq(false), EasyMock.capture(cb));
+ expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(bodyOut.name(), bodyOut.config(), CONNECTOR_TASK_NAMES,
ConnectorType.SOURCE)));
+
+ PowerMock.replayAll();
+
+ connectorsResource.createConnector(FORWARD, bodyIn);
PowerMock.verifyAll();
}
@@ -341,6 +407,24 @@ public void testCreateConnectorWithSpecialCharsInName()
throws Throwable {
PowerMock.verifyAll();
}
+ @Test
+ public void testCreateConnectorWithControlSequenceInName() throws
Throwable {
+ CreateConnectorRequest body = new
CreateConnectorRequest(CONNECTOR_NAME_CONTROL_SEQUENCES1,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG,
CONNECTOR_NAME_CONTROL_SEQUENCES1));
+
+ final Capture<Callback<Herder.Created<ConnectorInfo>>> cb =
Capture.newInstance();
+
herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME_CONTROL_SEQUENCES1),
EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
+ expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(CONNECTOR_NAME_CONTROL_SEQUENCES1, CONNECTOR_CONFIG,
+ CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
+
+ PowerMock.replayAll();
+
+ String rspLocation = connectorsResource.createConnector(FORWARD,
body).getLocation().toString();
+ String decoded = new URI(rspLocation).getPath();
+ Assert.assertEquals("/connectors/" +
CONNECTOR_NAME_CONTROL_SEQUENCES1, decoded);
+
+ PowerMock.verifyAll();
+ }
+
@Test
public void testPutConnectorConfigWithSpecialCharsInName() throws
Throwable {
final Capture<Callback<Herder.Created<ConnectorInfo>>> cb =
Capture.newInstance();
@@ -358,6 +442,23 @@ public void testPutConnectorConfigWithSpecialCharsInName()
throws Throwable {
PowerMock.verifyAll();
}
+ @Test
+ public void testPutConnectorConfigWithControlSequenceInName() throws
Throwable {
+ final Capture<Callback<Herder.Created<ConnectorInfo>>> cb =
Capture.newInstance();
+
+
herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME_CONTROL_SEQUENCES1),
EasyMock.eq(CONNECTOR_CONFIG_CONTROL_SEQUENCES), EasyMock.eq(true),
EasyMock.capture(cb));
+ expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(CONNECTOR_NAME_CONTROL_SEQUENCES1,
CONNECTOR_CONFIG_CONTROL_SEQUENCES, CONNECTOR_TASK_NAMES,
+ ConnectorType.SINK)));
+
+ PowerMock.replayAll();
+
+ String rspLocation =
connectorsResource.putConnectorConfig(CONNECTOR_NAME_CONTROL_SEQUENCES1,
FORWARD, CONNECTOR_CONFIG_CONTROL_SEQUENCES).getLocation().toString();
+ String decoded = new URI(rspLocation).getPath();
+ Assert.assertEquals("/connectors/" +
CONNECTOR_NAME_CONTROL_SEQUENCES1, decoded);
+
+ PowerMock.verifyAll();
+ }
+
@Test(expected = BadRequestException.class)
public void testPutConnectorConfigNameMismatch() throws Throwable {
Map<String, String> connConfig = new HashMap<>(CONNECTOR_CONFIG);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Connect Rest API allows creating connectors with an empty name - KIP-212
> ------------------------------------------------------------------------
>
> Key: KAFKA-4930
> URL: https://issues.apache.org/jira/browse/KAFKA-4930
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 0.10.2.0
> Reporter: Sönke Liebau
> Assignee: Sönke Liebau
> Priority: Minor
> Fix For: 1.1.0
>
>
> The Connect Rest API allows to deploy connectors with an empty name field,
> which then cannot be removed through the api.
> Sending the following request:
> {code}
> {
> "name": "",
> "config": {
> "connector.class":
> "org.apache.kafka.connect.tools.MockSourceConnector",
> "tasks.max": "1",
> "topics": "test-topic"
>
> }
> }
> {code}
> Results in a connector being deployed which can be seen in the list of
> connectors:
> {code}
> [
> "",
> "testconnector"
> ]{code}
> But cannot be removed via a DELETE call, as the api thinks we are trying to
> delete the /connectors endpoint and declines the request.
> I don't think there is a valid case for the connector name to be empty so
> perhaps we should add a check for this. I am happy to work on this.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)