bdesert commented on a change in pull request #11401:
URL: https://github.com/apache/kafka/pull/11401#discussion_r743062367
##########
File path:
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
##########
@@ -152,6 +155,48 @@ public void testConfigPropertyFiltering() {
.anyMatch(x -> x.name().equals("min.insync.replicas")), "should
not replicate excluded properties");
}
+ @Test
+ public void testNewTopicConfigs() throws Exception {
+ Map<String, Object> filterConfig = new HashMap<>();
+
filterConfig.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG,
"follower\\.replication\\.throttled\\.replicas, "
+ + "leader\\.replication\\.throttled\\.replicas, "
+ + "message\\.timestamp\\.difference\\.max\\.ms, "
+ + "message\\.timestamp\\.type, "
+ + "unclean\\.leader\\.election\\.enable, "
+ + "min\\.insync\\.replicas,"
+ + "exclude_param.*");
+ DefaultConfigPropertyFilter filter = new DefaultConfigPropertyFilter();
+ filter.configure(filterConfig);
+
+ MirrorSourceConnector connector = spy(new MirrorSourceConnector(new
SourceAndTarget("source", "target"),
+ new DefaultReplicationPolicy(), x -> true, filter));
+
+ final String topic = "testtopic";
+ List<ConfigEntry> entries = new ArrayList<>();
+ entries.add(new ConfigEntry("name-1", "value-1"));
+ entries.add(new ConfigEntry("exclude_param.param1", "value-param1"));
+ entries.add(new ConfigEntry("min.insync.replicas", "2"));
+ Config config = new Config(entries);
+ doReturn(Collections.singletonMap(topic,
config)).when(connector).describeTopicConfigs(any());
+ doAnswer(invocation -> {
+ Map<String, NewTopic> newTopics = invocation.getArgument(0);
+ assertNotNull(newTopics.get("source." + topic));
+ Map<String, String> targetConfig = newTopics.get("source." +
topic).configs();
+
+ // property 'name-1' isn't defined in the exclude filter -> should
be replicated
+ assertNotNull(targetConfig.get("name-1"), "should replicate
properties");
+
+ // this property is in default list, just double check it:
+ String prop1 = "min.insync.replicas";
+ assertNull(targetConfig.get(prop1), "should not replicate excluded
properties " + prop1);
+ // this property is only in exclude filter custom parameter, also
tests regex on the way:
+ String prop2 = "exclude_param.param1";
+ assertNull(targetConfig.get(prop2), "should not replicate excluded
properties " + prop2);
+ return null;
+ }).when(connector).createNewTopics(any());
+ connector.createNewTopics(Collections.singleton(topic),
Collections.singletonMap(topic, 1L));
+ }
Review comment:
agree. with a small correction of `createNewTopics(any(), any())`. since
the `targetConfig()` is being called from overloaded method with two params.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]