Hi Mickael and Ryanne, I updated the KIP to add these methods to the ReplicationPolicy instead of an extra interface to simplify the changes. Please have a look and let me know your thoughts.
Thanks On Tue, Mar 30, 2021 at 7:19 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote: > *(sorry forgot to Replay to All) * > Hi Ryanne, > It's a valid concern, I was trying to separate the concerns of internal > and replicated policy away from each other and to make the code readable as > extending ReplicationPolicy to manage both internal and replicated topic is > a bit odd. Am not against simplifying things out to make ReplicationPolicy > handling both at the end of the day if an MM2 user has a special naming > convention for topics it will be affecting both replicated and MM2 internal > topics. > > For simplifying things we can extend `ReplicationPolicy` to the following > instead of adding an extra class > >> *public interface ReplicationPolicy {* >> String topicSource(String topic); >> String upstreamTopic(String topic); >> >> >> */** Returns heartbeats topic name.*/ String heartbeatsTopic();* >> >> >> >> >> >> * /** Returns the offset-syncs topic for given cluster alias. */ >> String offsetSyncTopic(String targetAlias); /** Returns the name >> checkpoint topic for given cluster alias. */ String >> checkpointTopic(String sourceAlias); * >> >> default String originalTopic(String topic) { >> String upstream = upstreamTopic(topic); >> if (upstream == null) { >> return topic; >> } else { >> return originalTopic(upstream); >> } >> } >> >> >> * /** Internal topics are never replicated. */ >> isInternalTopic(String topic) *//the implementaion will be moved to >> `DefaultReplicationPolicy` to handle both kafka topics and MM2 internal >> topics. >> } >> > > On Fri, Mar 26, 2021 at 3:05 PM Ryanne Dolan <ryannedo...@gmail.com> > wrote: > >> Omnia, have we considered just adding methods to ReplicationPolicy? I'm >> reluctant to add a new class because, as Mickael points out, we'd need to >> carry it around in client code. >> >> Ryanne >> >> On Fri, Feb 19, 2021 at 8:31 AM Mickael Maison <mickael.mai...@gmail.com> >> wrote: >> >>> Hi Omnia, >>> >>> Thanks for the clarifications. >>> >>> - I'm still a bit uneasy with the overlap between these 2 methods as >>> currently `ReplicationPolicy.isInternalTopic` already handles MM2 >>> internal topics. Should we make it only handle Kafka internal topics >>> and `isMM2InternalTopic()` only handle MM2 topics? >>> >>> - I'm not sure I understand what this method is used for. There are no >>> such methods for the other 2 topics (offset-sync and heartbeat). Also >>> what happens if there are other MM2 instances using different naming >>> schemes in the same cluster. Do all instances have to know about the >>> other naming schemes? What are the expected issues if they don't? >>> >>> - RemoteClusterUtils is a client-side utility so it does not have >>> access to the MM2 configuration. Since this new API can affect the >>> name of the checkpoint topic, it will need to be used client-side too >>> so users can find the checkpoint topic name. I had to realized this >>> was the case. >>> >>> Thanks >>> >>> On Mon, Feb 15, 2021 at 9:33 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >>> wrote: >>> > >>> > Hi Mickael, did you have some time to check my answer? >>> > >>> > On Thu, Jan 21, 2021 at 10:10 PM Omnia Ibrahim < >>> o.g.h.ibra...@gmail.com> wrote: >>> >> >>> >> Hi Mickael, >>> >> Thanks for taking another look into the KIP, regards your questions >>> >> >>> >> - I believe we need both "isMM2InternalTopic" and >>> `ReplicationPolicy.isInternalTopic` as `ReplicationPolicy.isInternalTopic` >>> does check if a topic is Kafka internal topic, while `isMM2InternalTopic` >>> is just focusing if a topic is MM2 internal topic or not(which is >>> heartbeat/checkpoint/offset-sync). The fact that the default for MM2 >>> internal topics matches "ReplicationPolicy.isInternalTopic" will not be an >>> accurate assumption anymore once we implement this KIP. >>> >> >>> >> - "isCheckpointTopic" will detect all checkpoint topics for all MM2 >>> instances this is needed for "MirrorClient.checkpointTopics" which >>> originally check if the topic name ends with CHECKPOINTS_TOPIC_SUFFIX. So >>> this method just to keep the same functionality that originally exists in >>> MM2 >>> >> >>> >> - "checkpointTopic" is used in two places 1. At topic creation in >>> "MirrorCheckpointConnector.createInternalTopics" which use >>> "sourceClusterAlias() + CHECKPOINTS_TOPIC_SUFFIX" and 2. At >>> "MirrorClient.remoteConsumerOffsets" which is called by >>> "RemoteClusterUtils.translateOffsets" the cluster alias here referred to >>> as "remoteCluster" where the topic name is "remoteClusterAlias + >>> CHECKPOINTS_TOPIC_SUFFIX" (which is an argument in RemoteClusterUtils, not >>> a config) This why I called the variable cluster instead of source and >>> instead of using the config to figure out the cluster aliases from config >>> as we use checkpoints to keep `RemoteClusterUtils` compatible for existing >>> users. I see a benefit of just read the config a find out the cluster >>> aliases but on the other side, I'm not sure why "RemoteClusterUtils" >>> doesn't get the name of the cluster from the properties instead of an >>> argument, so I decided to keep it just for compatibility. >>> >> >>> >> Hope these answer some of your concerns. >>> >> Best >>> >> Omnia >>> >> >>> >> On Thu, Jan 21, 2021 at 3:37 PM Mickael Maison < >>> mickael.mai...@gmail.com> wrote: >>> >>> >>> >>> Hi Omnia, >>> >>> >>> >>> Thanks for the updates. Sorry for the delay but I have a few more >>> >>> small questions about the API: >>> >>> - Do we really need "isMM2InternalTopic()"? There's already >>> >>> "ReplicationPolicy.isInternalTopic()". If so, we need to explain the >>> >>> difference between these 2 methods. >>> >>> >>> >>> - Is "isCheckpointTopic()" expected to detect all checkpoint topics >>> >>> (for all MM2 instances) or only the ones for this connector instance. >>> >>> If it's the later, I wonder if we could do without the method. As >>> this >>> >>> interface is only called by MM2, we could first call >>> >>> "checkpointTopic()" and check if that's equal to the topic we're >>> >>> checking. If it's the former, we don't really know topic names other >>> >>> MM2 instances may be using! >>> >>> >>> >>> - The 3 methods returning topic names have different APIs: >>> >>> "heartbeatsTopic()" takes no arguments, "offsetSyncTopic()" takes the >>> >>> target cluster alias and "checkpointTopic()" takes "clusterAlias" >>> >>> (which one is it? source or target?). As the interface extends >>> >>> Configurable, maybe we could get rid of all the arguments and use the >>> >>> config to find the cluster aliases. WDYT? >>> >>> >>> >>> These are minor concerns, just making sure I fully understand how the >>> >>> API is expected to be used. Once these are cleared, I'll be happy to >>> >>> vote for this KIP. >>> >>> >>> >>> Thanks >>> >>> >>> >>> On Fri, Jan 8, 2021 at 12:06 PM Omnia Ibrahim < >>> o.g.h.ibra...@gmail.com> wrote: >>> >>> > >>> >>> > Hi Mickael, >>> >>> > Did you get time to review the changes to the KIP? If you okay >>> with it could you vote for the KIP here ttps:// >>> www.mail-archive.com/dev@kafka.apache.org/msg113575.html? >>> >>> > Thanks >>> >>> > >>> >>> > On Thu, Dec 10, 2020 at 2:19 PM Omnia Ibrahim < >>> o.g.h.ibra...@gmail.com> wrote: >>> >>> >> >>> >>> >> Hi Mickael, >>> >>> >> 1) That's right the interface and default implementation will in >>> mirror-connect >>> >>> >> 2) Renaming the interface should be fine too especially if you >>> planning to move other functionality related to the creation there, I can >>> edit this >>> >>> >> >>> >>> >> if you are okay with that please vote for the KIP here >>> https://www.mail-archive.com/dev@kafka.apache.org/msg113575.html >>> >>> >> >>> >>> >> >>> >>> >> Thanks >>> >>> >> Omnia >>> >>> >> On Thu, Dec 10, 2020 at 12:58 PM Mickael Maison < >>> mickael.mai...@gmail.com> wrote: >>> >>> >>> >>> >>> >>> Hi Omnia, >>> >>> >>> >>> >>> >>> Thank you for the reply, it makes sense. >>> >>> >>> >>> >>> >>> A couple more comments: >>> >>> >>> >>> >>> >>> 1) I'm assuming the new interface and default implementation >>> will be >>> >>> >>> in the mirror-client project? as the names of some of these >>> topics are >>> >>> >>> needed by RemoteClusterUtils on the client-side. >>> >>> >>> >>> >>> >>> 2) I'm about to open a KIP to specify where the offset-syncs >>> topic is >>> >>> >>> created by MM2. In restricted environments, we'd prefer MM2 to >>> only >>> >>> >>> have read access to the source cluster and have the offset-syncs >>> on >>> >>> >>> the target cluster. I think allowing to specify the cluster >>> where to >>> >>> >>> create that topic would be a natural extension of the interface >>> you >>> >>> >>> propose here. >>> >>> >>> >>> >>> >>> So I wonder if your interface could be named >>> InternalTopicsPolicy? >>> >>> >>> That's a bit more generic than InternalTopicNamingPolicy. That >>> would >>> >>> >>> also match the configuration setting, >>> internal.topics.policy.class, >>> >>> >>> you're proposing. >>> >>> >>> >>> >>> >>> Thanks >>> >>> >>> >>> >>> >>> On Thu, Dec 3, 2020 at 10:15 PM Omnia Ibrahim < >>> o.g.h.ibra...@gmail.com> wrote: >>> >>> >>> > >>> >>> >>> > Hi Mickael, >>> >>> >>> > Thanks for your feedback! >>> >>> >>> > Regards your question about having more configurations, I >>> considered adding >>> >>> >>> > configuration per each topic however this meant adding more >>> configurations >>> >>> >>> > for MM2 which already have so many, also the more complicated >>> and advanced >>> >>> >>> > replication pattern you have between clusters the more >>> configuration lines >>> >>> >>> > will be added to your MM2 config which isn't going to be >>> pretty if you >>> >>> >>> > don't have the same topics names across your clusters. >>> >>> >>> > >>> >>> >>> > Also, it added more complexity to the implementation as MM2 >>> need to >>> >>> >>> > 1- identify if a topic is checkpoints so we could list the >>> checkpoints >>> >>> >>> > topics in MirrorMaker 2 utils as one cluster could have X >>> numbers >>> >>> >>> > checkpoints topics if it's connected to X clusters, this is >>> done right now >>> >>> >>> > by listing any topic with suffix `.checkpoints.internal`. This >>> could be >>> >>> >>> > done by add `checkpoints.topic.suffix` config but this would >>> make an >>> >>> >>> > assumption that checkpoints will always have a suffix also >>> having a suffix >>> >>> >>> > means that we may need a separator as well to concatenate this >>> suffix with >>> >>> >>> > a prefix to identify source cluster name. >>> >>> >>> > 2- identify if a topic is internal, so it shouldn't be >>> replicated or track >>> >>> >>> > checkpoints for it, right now this is relaying on disallow >>> topics with >>> >>> >>> > `.internal` suffix to be not replicated and not tracked in >>> checkpoints but >>> >>> >>> > with making topics configurable we need a way to define what >>> is an internal >>> >>> >>> > topic. This could be done by making using a list of all >>> internal topics >>> >>> >>> > have been entered to the configuration. >>> >>> >>> > >>> >>> >>> > So having an interface seemed easier and also give more >>> flexibility for >>> >>> >>> > users to define their own topics name, define what is internal >>> topic means, >>> >>> >>> > how to find checkpoints topics and it will be one line config >>> for each >>> >>> >>> > herder, also it more consistence with MM2 code as MM2 config >>> have >>> >>> >>> > TopicFilter, ReplicationPolicy, GroupFilter, etc as interface >>> and they can >>> >>> >>> > be overridden by providing a custom implementation for them or >>> have some >>> >>> >>> > config that change their default implementations. >>> >>> >>> > >>> >>> >>> > Hope this answer your question. I also updated the KIP to add >>> this to the >>> >>> >>> > rejected solutions. >>> >>> >>> > >>> >>> >>> > >>> >>> >>> > On Thu, Dec 3, 2020 at 3:19 PM Mickael Maison < >>> mickael.mai...@gmail.com> >>> >>> >>> > wrote: >>> >>> >>> > >>> >>> >>> > > Hi Omnia, >>> >>> >>> > > >>> >>> >>> > > Thanks for the KIP. Indeed being able to configure MM2's >>> internal >>> >>> >>> > > topic names would be a nice improvement. >>> >>> >>> > > >>> >>> >>> > > Looking at the KIP, I was surprised you propose an interface >>> to allow >>> >>> >>> > > users to specify names. Have you considered making names >>> changeable >>> >>> >>> > > via configurations? If so, we should definitely mention it >>> in the >>> >>> >>> > > rejected alternatives as it's the first method that comes to >>> mind. >>> >>> >>> > > >>> >>> >>> > > I understand an interface gives a lot of flexibility but I'd >>> expect >>> >>> >>> > > topic names to be relatively simple and known in advance in >>> most >>> >>> >>> > > cases. >>> >>> >>> > > >>> >>> >>> > > I've not checked all use cases but something like below felt >>> appropriate: >>> >>> >>> > > clusters = primary,backup >>> >>> >>> > > >>> primary->backup.offsets-sync.topic=backup.mytopic-offsets-sync >>> >>> >>> > > >>> >>> >>> > > On Tue, Dec 1, 2020 at 3:36 PM Omnia Ibrahim < >>> o.g.h.ibra...@gmail.com> >>> >>> >>> > > wrote: >>> >>> >>> > > > >>> >>> >>> > > > Hey everyone, >>> >>> >>> > > > Please take a look at KIP-690: >>> >>> >>> > > > >>> >>> >>> > > > >>> >>> >>> > > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-690%3A+Add+additional+configuration+to+control+MirrorMaker+2+internal+topics+naming+convention >>> >>> >>> > > > >>> >>> >>> > > > Thanks for your feedback and support. >>> >>> >>> > > > >>> >>> >>> > > > Omnia >>> >>> >>> > > > >>> >>> >>> > > >>> >>