dongjinleekr commented on a change in pull request #11431:
URL: https://github.com/apache/kafka/pull/11431#discussion_r743692427
##########
File path:
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##########
@@ -183,12 +187,18 @@ public MirrorClientConfig clientConfig(String cluster) {
// fill in reasonable defaults
props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2");
- props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG,
"mm2-offsets."
- + sourceAndTarget.source() + ".internal");
- props.putIfAbsent(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG,
"mm2-status."
- + sourceAndTarget.source() + ".internal");
- props.putIfAbsent(DistributedConfig.CONFIG_TOPIC_CONFIG, "mm2-configs."
- + sourceAndTarget.source() + ".internal");
+
+ String separator =
originalsStrings().getOrDefault(REPLICATION_POLICY_SEPARATOR,
REPLICATION_POLICY_SEPARATOR_DEFAULT);
+ if (separator.equals("-")) {
+ throw new ConfigException("You should not use a single dash as a "
+ REPLICATION_POLICY_SEPARATOR);
+ }
+
+ props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG,
"mm2-offsets" + separator
Review comment:
I have also thought over this issue again.
The core of the problem seems to be, there is some gap in our configuration
policy, and it causes the Kafka Connect internal topics may be mirrored into
the other cluster. At first, I thought it could only happen in standalone mode,
but it is not true - my apologies for the confusion.
There are two cases this problem may happen:
1. Using a custom separator in standalone mode after KIP-690 (i.e., AK
3.1.0), what I and @OmniaGM discussed until now.
2. Running MM2 in Kafka Connect with the default `[config, offset,
status].storage.topic` configuration. (what @mimaison just pointed out)
2 may be prevented by `topics` configuration override, and it seems like the
users are already doing so. (Yes, it has been like this since day 1.) So, let's
focus on 1 only.
To prevent those topics from being mirrored with a custom separator, there
are three approaches:
a. Fix `ReplicationPolicy#isInternalTopic` only.
b. Leave the users to override `topics` configuration like Kafka Connect
mode.
c. Make `mm2-[offsets, status, configs].{source}.internal` topics to use
given separator.
Approach a seems unavailable; to determine whether given `mm2-[offsets,
status, configs].{source}.internal` topic is an internal one,
`ReplicationPolicy#isInternalTopic` method should know the source cluster's
alias, since those topics include `{source}` in their name. But actually, it
takes a topic name as a parameter only.
Approach b is good, but it may be not very clear for the users with little
experience.
Removing approach a and b, only c survives - and it is why I thought
applying the separator to those topic names is the only way to make it work
identically with the default separator case (which does not mirror those topics
by default).
How do you think?
##########
File path:
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##########
@@ -183,12 +187,18 @@ public MirrorClientConfig clientConfig(String cluster) {
// fill in reasonable defaults
props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2");
- props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG,
"mm2-offsets."
- + sourceAndTarget.source() + ".internal");
- props.putIfAbsent(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG,
"mm2-status."
- + sourceAndTarget.source() + ".internal");
- props.putIfAbsent(DistributedConfig.CONFIG_TOPIC_CONFIG, "mm2-configs."
- + sourceAndTarget.source() + ".internal");
+
+ String separator =
originalsStrings().getOrDefault(REPLICATION_POLICY_SEPARATOR,
REPLICATION_POLICY_SEPARATOR_DEFAULT);
+ if (separator.equals("-")) {
+ throw new ConfigException("You should not use a single dash as a "
+ REPLICATION_POLICY_SEPARATOR);
+ }
+
+ props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG,
"mm2-offsets" + separator
Review comment:
I have also thought over this issue again.
The core of the problem seems to be, there is some gap in our configuration
policy, and it causes the Kafka Connect internal topics may be mirrored into
the other cluster. At first, I thought it could only happen in standalone mode,
but it is not true - my apologies for the confusion.
There are two cases this problem may happen:
1. Using a custom separator in standalone mode after KIP-690 (i.e., AK
3.1.0), what I and @OmniaGM discussed until now.
2. Running MM2 in Kafka Connect with the default `[config, offset,
status].storage.topic` configuration. (what @mimaison just pointed out)
2 may be prevented by `topics` configuration override, and it seems like the
users are already doing so. (Yes, it has been like this since day 1.) So, let's
focus on 1 only.
To prevent those topics from being mirrored with a custom separator, there
are three approaches:
a. Fix `ReplicationPolicy#isInternalTopic` only.
b. Leave the users to override `topics` configuration like Kafka Connect
mode.
c. Make `mm2-[offsets, status, configs].{source}.internal` topics to use
given separator.
Approach a seems unavailable; to determine whether given `mm2-[offsets,
status, configs].{source}.internal` topic is an internal one,
`ReplicationPolicy#isInternalTopic` method should know the source cluster's
alias, since those topics include `{source}` in their name. But actually, it
takes a topic name as a parameter only.
Approach b is good, but it may be not very clear for the users with little
experience.
Removing approach a and b, only c survives - and it is why I thought
applying the separator to those topic names is the only way to make it work
identically with the default separator case (which does not mirror those topics
by default).
How do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]