Thanks Omnia. lgtm! Ryanne
On Thu, Apr 29, 2021 at 10:50 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote: > I updated the KIP > > On Thu, Apr 29, 2021 at 4:43 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> > wrote: > >> Sure, this would make it easier, we can make these functions returns the >> original behaviour (<clusterAlias>.checkpoints.internal, >> "mm2-offset-syncs.<clusterAlias>.internal", heartbeat) without any >> customisation using `replication.policy.separator` and use the separator in >> the DefaultReplicationPolicy >> >> On Wed, Apr 28, 2021 at 1:31 AM Ryanne Dolan <ryannedo...@gmail.com> >> wrote: >> >>> Thanks Omnia, makes sense to me. >>> >>> > Customers who have their customised ReplicationPolicy will need to add >>> the definition of their internal topics naming convention >>> >>> I wonder should we include default impls in the interface to avoid that >>> requirement? >>> >>> Ryanne >>> >>> On Sun, Apr 25, 2021, 2:20 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >>> wrote: >>> >>>> Hi Mickael and Ryanne, >>>> I updated the KIP to add these methods to the ReplicationPolicy instead >>>> of an extra interface to simplify the changes. Please have a look and let >>>> me know your thoughts. >>>> >>>> Thanks >>>> >>>> On Tue, Mar 30, 2021 at 7:19 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >>>> wrote: >>>> >>>>> *(sorry forgot to Replay to All) * >>>>> Hi Ryanne, >>>>> It's a valid concern, I was trying to separate the concerns of >>>>> internal and replicated policy away from each other and to make the code >>>>> readable as extending ReplicationPolicy to manage both internal and >>>>> replicated topic is a bit odd. Am not against simplifying things out to >>>>> make ReplicationPolicy handling both at the end of the day if an MM2 user >>>>> has a special naming convention for topics it will be affecting both >>>>> replicated and MM2 internal topics. >>>>> >>>>> For simplifying things we can extend `ReplicationPolicy` to the >>>>> following instead of adding an extra class >>>>> >>>>>> *public interface ReplicationPolicy {* >>>>>> String topicSource(String topic); >>>>>> String upstreamTopic(String topic); >>>>>> >>>>>> >>>>>> */** Returns heartbeats topic name.*/ String heartbeatsTopic();* >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> * /** Returns the offset-syncs topic for given cluster alias. */ >>>>>> String offsetSyncTopic(String targetAlias); /** Returns the name >>>>>> checkpoint topic for given cluster alias. */ String >>>>>> checkpointTopic(String sourceAlias); * >>>>>> >>>>>> default String originalTopic(String topic) { >>>>>> String upstream = upstreamTopic(topic); >>>>>> if (upstream == null) { >>>>>> return topic; >>>>>> } else { >>>>>> return originalTopic(upstream); >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> * /** Internal topics are never replicated. */ >>>>>> isInternalTopic(String topic) *//the implementaion will be moved to >>>>>> `DefaultReplicationPolicy` to handle both kafka topics and MM2 internal >>>>>> topics. >>>>>> } >>>>>> >>>>> >>>>> On Fri, Mar 26, 2021 at 3:05 PM Ryanne Dolan <ryannedo...@gmail.com> >>>>> wrote: >>>>> >>>>>> Omnia, have we considered just adding methods to ReplicationPolicy? >>>>>> I'm reluctant to add a new class because, as Mickael points out, we'd >>>>>> need >>>>>> to carry it around in client code. >>>>>> >>>>>> Ryanne >>>>>> >>>>>> On Fri, Feb 19, 2021 at 8:31 AM Mickael Maison < >>>>>> mickael.mai...@gmail.com> wrote: >>>>>> >>>>>>> Hi Omnia, >>>>>>> >>>>>>> Thanks for the clarifications. >>>>>>> >>>>>>> - I'm still a bit uneasy with the overlap between these 2 methods as >>>>>>> currently `ReplicationPolicy.isInternalTopic` already handles MM2 >>>>>>> internal topics. Should we make it only handle Kafka internal topics >>>>>>> and `isMM2InternalTopic()` only handle MM2 topics? >>>>>>> >>>>>>> - I'm not sure I understand what this method is used for. There are >>>>>>> no >>>>>>> such methods for the other 2 topics (offset-sync and heartbeat). Also >>>>>>> what happens if there are other MM2 instances using different naming >>>>>>> schemes in the same cluster. Do all instances have to know about the >>>>>>> other naming schemes? What are the expected issues if they don't? >>>>>>> >>>>>>> - RemoteClusterUtils is a client-side utility so it does not have >>>>>>> access to the MM2 configuration. Since this new API can affect the >>>>>>> name of the checkpoint topic, it will need to be used client-side too >>>>>>> so users can find the checkpoint topic name. I had to realized this >>>>>>> was the case. >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> On Mon, Feb 15, 2021 at 9:33 PM Omnia Ibrahim < >>>>>>> o.g.h.ibra...@gmail.com> wrote: >>>>>>> > >>>>>>> > Hi Mickael, did you have some time to check my answer? >>>>>>> > >>>>>>> > On Thu, Jan 21, 2021 at 10:10 PM Omnia Ibrahim < >>>>>>> o.g.h.ibra...@gmail.com> wrote: >>>>>>> >> >>>>>>> >> Hi Mickael, >>>>>>> >> Thanks for taking another look into the KIP, regards your >>>>>>> questions >>>>>>> >> >>>>>>> >> - I believe we need both "isMM2InternalTopic" and >>>>>>> `ReplicationPolicy.isInternalTopic` as >>>>>>> `ReplicationPolicy.isInternalTopic` >>>>>>> does check if a topic is Kafka internal topic, while >>>>>>> `isMM2InternalTopic` >>>>>>> is just focusing if a topic is MM2 internal topic or not(which is >>>>>>> heartbeat/checkpoint/offset-sync). The fact that the default for MM2 >>>>>>> internal topics matches "ReplicationPolicy.isInternalTopic" will not be >>>>>>> an >>>>>>> accurate assumption anymore once we implement this KIP. >>>>>>> >> >>>>>>> >> - "isCheckpointTopic" will detect all checkpoint topics for all >>>>>>> MM2 instances this is needed for "MirrorClient.checkpointTopics" which >>>>>>> originally check if the topic name ends with CHECKPOINTS_TOPIC_SUFFIX. >>>>>>> So >>>>>>> this method just to keep the same functionality that originally exists >>>>>>> in >>>>>>> MM2 >>>>>>> >> >>>>>>> >> - "checkpointTopic" is used in two places 1. At topic creation in >>>>>>> "MirrorCheckpointConnector.createInternalTopics" which use >>>>>>> "sourceClusterAlias() + CHECKPOINTS_TOPIC_SUFFIX" and 2. At >>>>>>> "MirrorClient.remoteConsumerOffsets" which is called by >>>>>>> "RemoteClusterUtils.translateOffsets" the cluster alias here referred >>>>>>> to >>>>>>> as "remoteCluster" where the topic name is "remoteClusterAlias + >>>>>>> CHECKPOINTS_TOPIC_SUFFIX" (which is an argument in RemoteClusterUtils, >>>>>>> not >>>>>>> a config) This why I called the variable cluster instead of source and >>>>>>> instead of using the config to figure out the cluster aliases from >>>>>>> config >>>>>>> as we use checkpoints to keep `RemoteClusterUtils` compatible for >>>>>>> existing >>>>>>> users. I see a benefit of just read the config a find out the cluster >>>>>>> aliases but on the other side, I'm not sure why "RemoteClusterUtils" >>>>>>> doesn't get the name of the cluster from the properties instead of an >>>>>>> argument, so I decided to keep it just for compatibility. >>>>>>> >> >>>>>>> >> Hope these answer some of your concerns. >>>>>>> >> Best >>>>>>> >> Omnia >>>>>>> >> >>>>>>> >> On Thu, Jan 21, 2021 at 3:37 PM Mickael Maison < >>>>>>> mickael.mai...@gmail.com> wrote: >>>>>>> >>> >>>>>>> >>> Hi Omnia, >>>>>>> >>> >>>>>>> >>> Thanks for the updates. Sorry for the delay but I have a few more >>>>>>> >>> small questions about the API: >>>>>>> >>> - Do we really need "isMM2InternalTopic()"? There's already >>>>>>> >>> "ReplicationPolicy.isInternalTopic()". If so, we need to explain >>>>>>> the >>>>>>> >>> difference between these 2 methods. >>>>>>> >>> >>>>>>> >>> - Is "isCheckpointTopic()" expected to detect all checkpoint >>>>>>> topics >>>>>>> >>> (for all MM2 instances) or only the ones for this connector >>>>>>> instance. >>>>>>> >>> If it's the later, I wonder if we could do without the method. >>>>>>> As this >>>>>>> >>> interface is only called by MM2, we could first call >>>>>>> >>> "checkpointTopic()" and check if that's equal to the topic we're >>>>>>> >>> checking. If it's the former, we don't really know topic names >>>>>>> other >>>>>>> >>> MM2 instances may be using! >>>>>>> >>> >>>>>>> >>> - The 3 methods returning topic names have different APIs: >>>>>>> >>> "heartbeatsTopic()" takes no arguments, "offsetSyncTopic()" >>>>>>> takes the >>>>>>> >>> target cluster alias and "checkpointTopic()" takes "clusterAlias" >>>>>>> >>> (which one is it? source or target?). As the interface extends >>>>>>> >>> Configurable, maybe we could get rid of all the arguments and >>>>>>> use the >>>>>>> >>> config to find the cluster aliases. WDYT? >>>>>>> >>> >>>>>>> >>> These are minor concerns, just making sure I fully understand >>>>>>> how the >>>>>>> >>> API is expected to be used. Once these are cleared, I'll be >>>>>>> happy to >>>>>>> >>> vote for this KIP. >>>>>>> >>> >>>>>>> >>> Thanks >>>>>>> >>> >>>>>>> >>> On Fri, Jan 8, 2021 at 12:06 PM Omnia Ibrahim < >>>>>>> o.g.h.ibra...@gmail.com> wrote: >>>>>>> >>> > >>>>>>> >>> > Hi Mickael, >>>>>>> >>> > Did you get time to review the changes to the KIP? If you okay >>>>>>> with it could you vote for the KIP here ttps:// >>>>>>> www.mail-archive.com/dev@kafka.apache.org/msg113575.html? >>>>>>> >>> > Thanks >>>>>>> >>> > >>>>>>> >>> > On Thu, Dec 10, 2020 at 2:19 PM Omnia Ibrahim < >>>>>>> o.g.h.ibra...@gmail.com> wrote: >>>>>>> >>> >> >>>>>>> >>> >> Hi Mickael, >>>>>>> >>> >> 1) That's right the interface and default implementation will >>>>>>> in mirror-connect >>>>>>> >>> >> 2) Renaming the interface should be fine too especially if >>>>>>> you planning to move other functionality related to the creation there, >>>>>>> I >>>>>>> can edit this >>>>>>> >>> >> >>>>>>> >>> >> if you are okay with that please vote for the KIP here >>>>>>> https://www.mail-archive.com/dev@kafka.apache.org/msg113575.html >>>>>>> >>> >> >>>>>>> >>> >> >>>>>>> >>> >> Thanks >>>>>>> >>> >> Omnia >>>>>>> >>> >> On Thu, Dec 10, 2020 at 12:58 PM Mickael Maison < >>>>>>> mickael.mai...@gmail.com> wrote: >>>>>>> >>> >>> >>>>>>> >>> >>> Hi Omnia, >>>>>>> >>> >>> >>>>>>> >>> >>> Thank you for the reply, it makes sense. >>>>>>> >>> >>> >>>>>>> >>> >>> A couple more comments: >>>>>>> >>> >>> >>>>>>> >>> >>> 1) I'm assuming the new interface and default implementation >>>>>>> will be >>>>>>> >>> >>> in the mirror-client project? as the names of some of these >>>>>>> topics are >>>>>>> >>> >>> needed by RemoteClusterUtils on the client-side. >>>>>>> >>> >>> >>>>>>> >>> >>> 2) I'm about to open a KIP to specify where the offset-syncs >>>>>>> topic is >>>>>>> >>> >>> created by MM2. In restricted environments, we'd prefer MM2 >>>>>>> to only >>>>>>> >>> >>> have read access to the source cluster and have the >>>>>>> offset-syncs on >>>>>>> >>> >>> the target cluster. I think allowing to specify the cluster >>>>>>> where to >>>>>>> >>> >>> create that topic would be a natural extension of the >>>>>>> interface you >>>>>>> >>> >>> propose here. >>>>>>> >>> >>> >>>>>>> >>> >>> So I wonder if your interface could be named >>>>>>> InternalTopicsPolicy? >>>>>>> >>> >>> That's a bit more generic than InternalTopicNamingPolicy. >>>>>>> That would >>>>>>> >>> >>> also match the configuration setting, >>>>>>> internal.topics.policy.class, >>>>>>> >>> >>> you're proposing. >>>>>>> >>> >>> >>>>>>> >>> >>> Thanks >>>>>>> >>> >>> >>>>>>> >>> >>> On Thu, Dec 3, 2020 at 10:15 PM Omnia Ibrahim < >>>>>>> o.g.h.ibra...@gmail.com> wrote: >>>>>>> >>> >>> > >>>>>>> >>> >>> > Hi Mickael, >>>>>>> >>> >>> > Thanks for your feedback! >>>>>>> >>> >>> > Regards your question about having more configurations, I >>>>>>> considered adding >>>>>>> >>> >>> > configuration per each topic however this meant adding >>>>>>> more configurations >>>>>>> >>> >>> > for MM2 which already have so many, also the more >>>>>>> complicated and advanced >>>>>>> >>> >>> > replication pattern you have between clusters the more >>>>>>> configuration lines >>>>>>> >>> >>> > will be added to your MM2 config which isn't going to be >>>>>>> pretty if you >>>>>>> >>> >>> > don't have the same topics names across your clusters. >>>>>>> >>> >>> > >>>>>>> >>> >>> > Also, it added more complexity to the implementation as >>>>>>> MM2 need to >>>>>>> >>> >>> > 1- identify if a topic is checkpoints so we could list the >>>>>>> checkpoints >>>>>>> >>> >>> > topics in MirrorMaker 2 utils as one cluster could have X >>>>>>> numbers >>>>>>> >>> >>> > checkpoints topics if it's connected to X clusters, this >>>>>>> is done right now >>>>>>> >>> >>> > by listing any topic with suffix `.checkpoints.internal`. >>>>>>> This could be >>>>>>> >>> >>> > done by add `checkpoints.topic.suffix` config but this >>>>>>> would make an >>>>>>> >>> >>> > assumption that checkpoints will always have a suffix also >>>>>>> having a suffix >>>>>>> >>> >>> > means that we may need a separator as well to concatenate >>>>>>> this suffix with >>>>>>> >>> >>> > a prefix to identify source cluster name. >>>>>>> >>> >>> > 2- identify if a topic is internal, so it shouldn't be >>>>>>> replicated or track >>>>>>> >>> >>> > checkpoints for it, right now this is relaying on disallow >>>>>>> topics with >>>>>>> >>> >>> > `.internal` suffix to be not replicated and not tracked in >>>>>>> checkpoints but >>>>>>> >>> >>> > with making topics configurable we need a way to define >>>>>>> what is an internal >>>>>>> >>> >>> > topic. This could be done by making using a list of all >>>>>>> internal topics >>>>>>> >>> >>> > have been entered to the configuration. >>>>>>> >>> >>> > >>>>>>> >>> >>> > So having an interface seemed easier and also give more >>>>>>> flexibility for >>>>>>> >>> >>> > users to define their own topics name, define what is >>>>>>> internal topic means, >>>>>>> >>> >>> > how to find checkpoints topics and it will be one line >>>>>>> config for each >>>>>>> >>> >>> > herder, also it more consistence with MM2 code as MM2 >>>>>>> config have >>>>>>> >>> >>> > TopicFilter, ReplicationPolicy, GroupFilter, etc as >>>>>>> interface and they can >>>>>>> >>> >>> > be overridden by providing a custom implementation for >>>>>>> them or have some >>>>>>> >>> >>> > config that change their default implementations. >>>>>>> >>> >>> > >>>>>>> >>> >>> > Hope this answer your question. I also updated the KIP to >>>>>>> add this to the >>>>>>> >>> >>> > rejected solutions. >>>>>>> >>> >>> > >>>>>>> >>> >>> > >>>>>>> >>> >>> > On Thu, Dec 3, 2020 at 3:19 PM Mickael Maison < >>>>>>> mickael.mai...@gmail.com> >>>>>>> >>> >>> > wrote: >>>>>>> >>> >>> > >>>>>>> >>> >>> > > Hi Omnia, >>>>>>> >>> >>> > > >>>>>>> >>> >>> > > Thanks for the KIP. Indeed being able to configure MM2's >>>>>>> internal >>>>>>> >>> >>> > > topic names would be a nice improvement. >>>>>>> >>> >>> > > >>>>>>> >>> >>> > > Looking at the KIP, I was surprised you propose an >>>>>>> interface to allow >>>>>>> >>> >>> > > users to specify names. Have you considered making names >>>>>>> changeable >>>>>>> >>> >>> > > via configurations? If so, we should definitely mention >>>>>>> it in the >>>>>>> >>> >>> > > rejected alternatives as it's the first method that >>>>>>> comes to mind. >>>>>>> >>> >>> > > >>>>>>> >>> >>> > > I understand an interface gives a lot of flexibility but >>>>>>> I'd expect >>>>>>> >>> >>> > > topic names to be relatively simple and known in advance >>>>>>> in most >>>>>>> >>> >>> > > cases. >>>>>>> >>> >>> > > >>>>>>> >>> >>> > > I've not checked all use cases but something like below >>>>>>> felt appropriate: >>>>>>> >>> >>> > > clusters = primary,backup >>>>>>> >>> >>> > > >>>>>>> primary->backup.offsets-sync.topic=backup.mytopic-offsets-sync >>>>>>> >>> >>> > > >>>>>>> >>> >>> > > On Tue, Dec 1, 2020 at 3:36 PM Omnia Ibrahim < >>>>>>> o.g.h.ibra...@gmail.com> >>>>>>> >>> >>> > > wrote: >>>>>>> >>> >>> > > > >>>>>>> >>> >>> > > > Hey everyone, >>>>>>> >>> >>> > > > Please take a look at KIP-690: >>>>>>> >>> >>> > > > >>>>>>> >>> >>> > > > >>>>>>> >>> >>> > > >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-690%3A+Add+additional+configuration+to+control+MirrorMaker+2+internal+topics+naming+convention >>>>>>> >>> >>> > > > >>>>>>> >>> >>> > > > Thanks for your feedback and support. >>>>>>> >>> >>> > > > >>>>>>> >>> >>> > > > Omnia >>>>>>> >>> >>> > > > >>>>>>> >>> >>> > > >>>>>>> >>>>>>