mimaison commented on a change in pull request #11431:
URL: https://github.com/apache/kafka/pull/11431#discussion_r742038737
##########
File path:
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##########
@@ -183,12 +187,18 @@ public MirrorClientConfig clientConfig(String cluster) {
// fill in reasonable defaults
props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2");
- props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG,
"mm2-offsets."
- + sourceAndTarget.source() + ".internal");
- props.putIfAbsent(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG,
"mm2-status."
- + sourceAndTarget.source() + ".internal");
- props.putIfAbsent(DistributedConfig.CONFIG_TOPIC_CONFIG, "mm2-configs."
- + sourceAndTarget.source() + ".internal");
+
+ String separator =
originalsStrings().getOrDefault(REPLICATION_POLICY_SEPARATOR,
REPLICATION_POLICY_SEPARATOR_DEFAULT);
+ if (separator.equals("-")) {
+ throw new ConfigException("You should not use a single dash as a "
+ REPLICATION_POLICY_SEPARATOR);
+ }
+
+ props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG,
"mm2-offsets" + separator
Review comment:
My bad, I missed the fact that this line is about the Kafka Connect's
internal topics and not MM2's. I think it's fine to keep the code as is.
##########
File path:
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
##########
@@ -50,8 +50,7 @@
public static final Class<?> REPLICATION_POLICY_CLASS_DEFAULT =
DefaultReplicationPolicy.class;
public static final String REPLICATION_POLICY_SEPARATOR =
"replication.policy.separator";
private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator
used in remote topic naming convention.";
- public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT =
- DefaultReplicationPolicy.SEPARATOR_DEFAULT;
+ public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT = ".";
Review comment:
Yeah but I expect the value for this config to come from a class
implementing the interface so in this case `DefaultReplicationPolicy`.
##########
File path:
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##########
@@ -183,12 +187,18 @@ public MirrorClientConfig clientConfig(String cluster) {
// fill in reasonable defaults
props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2");
- props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG,
"mm2-offsets."
- + sourceAndTarget.source() + ".internal");
- props.putIfAbsent(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG,
"mm2-status."
- + sourceAndTarget.source() + ".internal");
- props.putIfAbsent(DistributedConfig.CONFIG_TOPIC_CONFIG, "mm2-configs."
- + sourceAndTarget.source() + ".internal");
+
+ String separator =
originalsStrings().getOrDefault(REPLICATION_POLICY_SEPARATOR,
REPLICATION_POLICY_SEPARATOR_DEFAULT);
+ if (separator.equals("-")) {
+ throw new ConfigException("You should not use a single dash as a "
+ REPLICATION_POLICY_SEPARATOR);
+ }
+
+ props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG,
"mm2-offsets" + separator
Review comment:
My bad, I missed the fact that this line is about the Kafka Connect's
internal topics and not MM2's. If I understand correctly this behaviour is not
new and it's not related to changes made in KIP-690.
I agree it's unfortunate that if users change the default separator, these
Connect internal topics get mirrored in dedicated mode.
In Connect mode, this could also happen. For example, if
`offset.storage.topic` is not set to a value that is caught by
`isInternalTopic()`.
If we wanted to use the separator for Connect's topic, I'd prefer doing it
via a small KIP. As it's likely been like this since day 1, this seems this may
not be a very common use case and I'm not sure we necessarily need to do
something. If a user in dedicated mode wants a different separator, they will
need to also override the Connect topic names to ensure they match the
separator.
##########
File path:
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##########
@@ -183,12 +187,18 @@ public MirrorClientConfig clientConfig(String cluster) {
// fill in reasonable defaults
props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2");
- props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG,
"mm2-offsets."
- + sourceAndTarget.source() + ".internal");
- props.putIfAbsent(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG,
"mm2-status."
- + sourceAndTarget.source() + ".internal");
- props.putIfAbsent(DistributedConfig.CONFIG_TOPIC_CONFIG, "mm2-configs."
- + sourceAndTarget.source() + ".internal");
+
+ String separator =
originalsStrings().getOrDefault(REPLICATION_POLICY_SEPARATOR,
REPLICATION_POLICY_SEPARATOR_DEFAULT);
+ if (separator.equals("-")) {
+ throw new ConfigException("You should not use a single dash as a "
+ REPLICATION_POLICY_SEPARATOR);
+ }
+
+ props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG,
"mm2-offsets" + separator
Review comment:
My bad, I missed the fact that this line is about the Kafka Connect's
internal topics and not MM2's. I think it's fine to keep the code as is.
##########
File path:
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
##########
@@ -50,8 +50,7 @@
public static final Class<?> REPLICATION_POLICY_CLASS_DEFAULT =
DefaultReplicationPolicy.class;
public static final String REPLICATION_POLICY_SEPARATOR =
"replication.policy.separator";
private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator
used in remote topic naming convention.";
- public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT =
- DefaultReplicationPolicy.SEPARATOR_DEFAULT;
+ public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT = ".";
Review comment:
Yeah but I expect the value for this config to come from a class
implementing the interface so in this case `DefaultReplicationPolicy`.
##########
File path:
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##########
@@ -183,12 +187,18 @@ public MirrorClientConfig clientConfig(String cluster) {
// fill in reasonable defaults
props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2");
- props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG,
"mm2-offsets."
- + sourceAndTarget.source() + ".internal");
- props.putIfAbsent(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG,
"mm2-status."
- + sourceAndTarget.source() + ".internal");
- props.putIfAbsent(DistributedConfig.CONFIG_TOPIC_CONFIG, "mm2-configs."
- + sourceAndTarget.source() + ".internal");
+
+ String separator =
originalsStrings().getOrDefault(REPLICATION_POLICY_SEPARATOR,
REPLICATION_POLICY_SEPARATOR_DEFAULT);
+ if (separator.equals("-")) {
+ throw new ConfigException("You should not use a single dash as a "
+ REPLICATION_POLICY_SEPARATOR);
+ }
+
+ props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG,
"mm2-offsets" + separator
Review comment:
My bad, I missed the fact that this line is about the Kafka Connect's
internal topics and not MM2's. If I understand correctly this behaviour is not
new and it's not related to changes made in KIP-690.
I agree it's unfortunate that if users change the default separator, these
Connect internal topics get mirrored in dedicated mode.
In Connect mode, this could also happen. For example, if
`offset.storage.topic` is not set to a value that is caught by
`isInternalTopic()`.
If we wanted to use the separator for Connect's topic, I'd prefer doing it
via a small KIP. As it's likely been like this since day 1, this seems this may
not be a very common use case and I'm not sure we necessarily need to do
something. If a user in dedicated mode wants a different separator, they will
need to also override the Connect topic names to ensure they match the
separator.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]