Thanks Sönke, you're spot-on. I don't want MM2 to wait for Connect features that don't exist yet, especially if MM2 is the primary use case for them. Moreover, I think MM2 can drive and inform some of these features, which only makes sense if we adopt MM2 first.
Ryanne On Fri, Dec 14, 2018, 9:03 AM Sönke Liebau <soenke.lie...@opencore.com.invalid wrote: > Hi Jun, > > I believe Ryanne's idea is to run multiple workers per MM cluster-node, one > per target cluster. So in essence you'd specify three clusters in the MM > config and MM would then instantiate one worker per cluster. Every MM > connector would then be deployed to the appropriate (internal) worker that > is configured for the cluster in question. Thus there would be no changes > necessary to the Connect framework itself, everything would be handled by a > new layer around existing Connect code (probably a sibling implementation > to the DistributedHerder if I understood him correctly). Ryanne, please > correct/expand if I misunderstood your intentions. > > To briefly summarize the discussion that Ryanne and I had around this > earlier, my opinion was that the extra layer could potentially be avoided > by extending Connect instead, which would benefit all connectors. > > My proposal was to add a configuration option to the worker config that > allows defining "external clusters" which can then be referenced from the > connector config. > > For example: > > # Core cluster config stays the same and is used for status, config and > offsets as usual > bootstrap.servers=localkafka1:9092,localkafka2:9092 > > # Allow defining extra remote clusters > > externalcluster.kafka_europe.bootstrap.servers=europekafka1:9092,europekafka2:9092 > externalcluster.kafka_europe.security.protocol=SSL > > externalcluster.kafka_europe.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks > ... > > externalcluster.kafka_asia.bootstrap.servers=asiakafka1:9092,asiakafka2:9092 > > > When starting a connector you could now reference these pre-configured > clusters in the config: > { > "name": "file-source", > "config": { > "connector.class": "FileStreamSource", > "file": "/tmp/test.txt", > "topic": "connect-test", > "name": "file-source", > "cluster": "kafka_asia" > } > } > > When omitting the "cluster" parameter current behavior of Connect remains > unchanged. This way we could address multiple remote clusters from within a > single worker without adding the extra layer for MirrorMaker. I believe > that this could be done without major structural changes to the Connect > codebase, but I freely admit that this opinion is based on 10 minutes > poking through the code not any real expertise. > > Ryanne's main concern with this approach was that there are additional > worker setting that apply to all connectors and that no truly universal > approach would be feasible while running a single worker per Connect node. > Also he feels that from a development perspective it would be preferable to > have independent MM code and contribute applicable features back to > Connect. > While I agree that this would make development of MM easier it will also > create a certain amount of extra code (can probably be kept at a minimum, > but still) that could be avoided by using "vanilla" Connect for MM. > > I hope I summarized your views accurately Ryanne, if not please feel free > to correct me! > > Best regards, > Sönke > > > On Fri, Dec 14, 2018 at 1:55 AM Jun Rao <j...@confluent.io> wrote: > > > Hi, Ryanne, > > > > Regarding the single connect cluster model, yes, the co-existence of a > MM2 > > REST API and the nearly identical Connect API is one of my concerns. > > Implementation wise, my understanding is that the producer URL in a > > SourceTask is always obtained from the connect worker's configuration. > So, > > not sure how you would customize the producer URL for individual > SourceTask > > w/o additional support from the Connect framework. > > > > Thanks, > > > > Jun > > > > > > On Mon, Dec 10, 2018 at 1:17 PM Ryanne Dolan <ryannedo...@gmail.com> > > wrote: > > > > > Jun, thanks for your time reviewing the KIP. > > > > > > > In a MirrorSourceConnector, it seems that the offsets of the source > > will > > > be stored in a different cluster from the target cluster? > > > > > > Jan Filipiak raised this issue as well, and suggested that no state be > > > tracked in the source cluster. I've since implemented > > MirrorSourceConnector > > > accordingly. And actually, this issue coincides with another major > > weakness > > > of legacy MirrorMaker: "rebalance storm". In both cases, the problem is > > due > > > to MirrorMaker using high-level consumer groups for replication. > > > > > > MM2 does not use consumer groups at all, but instead manages its own > > > partition assignments and offsets. MirrorSourceConnector monitors > > > topic-partitions and assigns them to MirrorSourceTasks directly -- > there > > > are no high-level subscriptions and therefore no rebalances. Likewise, > > > MirrorSourceConnector stores its own offsets in the target cluster, so > no > > > state information is lost if the source cluster disappears. Both of > these > > > features are facilitated by the Connect framework and were inspired by > > > Uber's uReplicator. > > > > > > > If the single connect cluster model is indeed useful, it seems that > we > > > should support it in the general connect framework since it can be > useful > > > for managing other types connectors. > > > > > > Sönke Liebau suggested this as well. I've spent some time looking into > > > this, and I do believe it would be possible to bring these features to > > > Connect in general without breaking the existing APIs. For example, > > maybe a > > > connector config could specify which worker to use as a property like > > > worker.name=foo, and otherwise a default worker would be used. In this > > > case, a "MirrorMaker cluster" would just be a Connect cluster with a > > > pre-configured set of workers. > > > > > > My plan is to contribute MM2 and then help pull features from MM2 into > > > Connect. I don't think it would make sense to prime Connect first, nor > > do I > > > want to propose a bunch of changes to Connect in this one KIP. If the > > > concern is primarily around the co-existence of a MM2 REST API and the > > > nearly identical Connect API, perhaps it would make sense to split off > > the > > > "MirrorMaker clusters" section of this KIP into a separate KIP aimed at > > > Connect in general? Would love to hear your thoughts on this. > > > > > > > Could you provide a bit more details on the content of the heartbeat > > > topic? > > > > > > At present the heartbeat is just a timestamp and the alias of the > cluster > > > of origin. This is more powerful than existing Connector-level metrics, > > as > > > these heartbeats are themselves replicated and can be traced across > > > multiple hops in the replication topology. I'll add this to the KIP. > > > > > > > Also, if this is useful, should we just add it add in the connect > > > framework, instead of just mirror maker? > > > > > > Same deal, I'd love to see this, but I don't think we should try to > prime > > > Connect before adopting MM2. > > > > > > > RemoteClusterUtils. Since this is part of the public interface, could > > you > > > document the public APIs? > > > > > > Will do, thanks. > > > > > > > source.cluster.bootstrap.servers/target.cluster.bootstrap.servers: > > Does a > > > Source/Sink connect need both? > > > > > > Sort of. I'm using this to construct an AdminClient for topic ACL and > > > configuration sync, since the Connect framework doesn't expose it. I > > intend > > > to follow-up KIP-382 with a proposal to expose this info to Connectors. > > > There's also KIP-158, but it deals with topic creation only. > > > > > > Thanks again for the feedback! > > > > > > Ryanne > > > > > > > > > > > > On Fri, Dec 7, 2018 at 6:22 PM Jun Rao <j...@confluent.io> wrote: > > > > > > > Hi, Ryanne, > > > > > > > > Thanks for the KIP. At the high level, this looks like a reasonable > > > > proposal. A few comments below. > > > > > > > > 1. About using a single connector cluster to manage connectors > > accessing > > > > multiple Kafka clusters. It's good that you brought this up. The > > > following > > > > are the tradeoffs that I see. The benefit of using a single connect > > > cluster > > > > is that it simplifies the management. There are a couple of potential > > > > downsides. > > > > (a) In a MirrorSourceConnector, it seems that the offsets of the > source > > > > will be stored in a different cluster from the target cluster? If the > > > data > > > > in the target Kafka cluster is lost (say the whole cluster is wiped > > out), > > > > one has to manually reset the offset to re-mirror the missing data. > (2) > > > If > > > > the offsets are stored in a separate cluster from the produced data, > it > > > > prevents the connector from running features such as EOS since > > currently > > > > EOS doesn't span Kafka clusters. If the single connect cluster model > is > > > > indeed useful, it seems that we should support it in the general > > connect > > > > framework since it can be useful for managing other types connectors. > > > This > > > > could be related to KIP-296 since it allows connector level > > > > producer/consumer customization. > > > > > > > > 2. The heartbeats topic. Could you provide a bit more details on the > > > > content of the heartbeat topic? I am not sure how that's different > from > > > the > > > > connector level metrics. Also, if this is useful, should we just add > it > > > add > > > > in the connect framework, instead of just mirror maker? > > > > > > > > 3. RemoteClusterUtils. Since this is part of the public interface, > > could > > > > you document the public APIs? > > > > > > > > 4. source.cluster.bootstrap.servers/target.cluster.bootstrap.servers: > > > Does > > > > a Source/Sink connect need both? Currently, the producer URL used in > a > > > > SourceWorker always comes from the Worker configuration. Are you > > > proposing > > > > to change that? > > > > > > > > Jun > > > > > > > > On Fri, Dec 7, 2018 at 12:18 PM Ryanne Dolan <ryannedo...@gmail.com> > > > > wrote: > > > > > > > > > Michael, thanks for the comments! > > > > > > > > > > > would like to see support for this to be done by hops, as well > > [...] > > > > > This then allows ring (hops = number of brokers in the ring), mesh > > > (every > > > > > cluster interconnected so hop=1), or even a tree (more fine grained > > > > setup) > > > > > cluster topology. > > > > > > > > > > That's a good idea, though we can do this at the topic level > without > > > > > tagging individual records. A max.hop of 1 would mean "A.topic1" is > > > > > allowed, but not "B.A.topic1". I think the default behavior would > > need > > > to > > > > > be max.hops = 1 to avoid unexpectedly creating a bunch of > D.C.B.A... > > > > topics > > > > > when you create a fully-connected mesh topology. > > > > > > > > > > Looking ahead a bit, I can imagine an external tool computing the > > > > spanning > > > > > tree of topics among a set of clusters based on inter-cluster > > > replication > > > > > lag, and setting up MM2 accordingly. But that's probably outside > the > > > > scope > > > > > of this KIP :) > > > > > > > > > > > ...standalone MirrorMaker connector... > > > > > > ./bin/kafka-mirror-maker-2.sh --consumer consumer.properties > > > > > --producer producer.properties > > > > > > > > > > Eventually, I'd like MM2 to completely replace legacy MM, including > > the > > > > > ./bin/kafka-mirror-maker.sh script. In the meantime, it's a good > idea > > > to > > > > > include a standalone driver. Something like > > > > > ./bin/connect-mirror-maker-standalone.sh with the same high-level > > > > > configuration file. I'll do that, thanks. > > > > > > > > > > > I see no section on providing support for mirror maker Handlers, > > > today > > > > > people can add handlers to have a little extra custom logic if > > needed, > > > > and > > > > > the handler api is public today so should be supported going > forwards > > > so > > > > > people are not on mass re-writing these. > > > > > > > > > > Great point. Connect offers single-message transformations and > > > converters > > > > > for this purpose, but I agree that we should honor the existing API > > if > > > > > possible. This might be as easy as providing an adapter class > between > > > > > connect's Transformation and mirror-maker's Handler. Maybe file a > > Jira > > > > > ticket to track this? > > > > > > > > > > Really appreciate your feedback! > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > On Thu, Dec 6, 2018 at 7:03 PM Michael Pearce < > michael.pea...@ig.com > > > > > > > > wrote: > > > > > > > > > > > Re hops to stop the cycle and to allow a range of multi cluster > > > > > > topologies, see > https://www.rabbitmq.com/federated-exchanges.html > > > > where > > > > > > very similar was done in rabbit. > > > > > > > > > > > > > > > > > > > > > > > > On 12/7/18, 12:47 AM, "Michael Pearce" <michael.pea...@ig.com> > > > wrote: > > > > > > > > > > > > Nice proposal. > > > > > > > > > > > > Some comments. > > > > > > > > > > > > > > > > > > On the section around cycle detection. > > > > > > > > > > > > I would like to see support for this to be done by hops, as > > well > > > > e.g. > > > > > > using approach is to use a header for the number of hops, as the > > mm2 > > > > > > replicates it increases the hop count and you can make the mm2 > > > > > configurable > > > > > > to only produce messages onwards where hops are less than x. > > > > > > This then allows ring (hops = number of brokers in the ring), > > > mesh > > > > > > (every cluster interconnected so hop=1), or even a tree (more > fine > > > > > grained > > > > > > setup) cluster topology. > > > > > > FYI we do this currently with the current mirror maker, > using a > > > > > custom > > > > > > handler. > > > > > > > > > > > > > > > > > > On the section around running a standalone MirrorMaker > > connector > > > > > > > > > > > > I would suggest making this as easy to run as the > mirrormakers > > > are > > > > > > today, with a simple single sh script. > > > > > > I assume this is what is proposed in section "Running > > MirrorMaker > > > > in > > > > > > legacy mode" but I would even do this before MM would be removed, > > > with > > > > a > > > > > -2 > > > > > > varient. > > > > > > e.g. > > > > > > ./bin/kafka-mirror-maker-2.sh --consumer consumer.properties > > > > > > --producer producer.properties > > > > > > > > > > > > Lastly > > > > > > > > > > > > I see no section on providing support for mirror maker > > Handlers, > > > > > today > > > > > > people can add handlers to have a little extra custom logic if > > > needed, > > > > > and > > > > > > the handler api is public today so should be supported going > > forwards > > > > so > > > > > > people are not on mass re-writing these. > > > > > > > > > > > > On 12/5/18, 5:36 PM, "Ryanne Dolan" <ryannedo...@gmail.com> > > > wrote: > > > > > > > > > > > > Sönke, > > > > > > > > > > > > > The only thing that I could come up with is the > > limitation > > > > to a > > > > > > single > > > > > > offset commit interval > > > > > > > > > > > > Yes, and other internal properties, e.g. those used by > the > > > > > internal > > > > > > consumers and producers, which, granted, probably are not > > > often > > > > > > changed > > > > > > from their defaults, but that apply to Connectors across > > the > > > > > > entire cluster. > > > > > > > > > > > > Ryanne > > > > > > > > > > > > On Wed, Dec 5, 2018 at 3:21 AM Sönke Liebau > > > > > > <soenke.lie...@opencore.com.invalid> wrote: > > > > > > > > > > > > > Hi Ryanne, > > > > > > > > > > > > > > when you say "Currently worker configs apply across the > > > > entire > > > > > > cluster, > > > > > > > which is limiting even for use-cases involving a single > > > Kafka > > > > > > cluster.", > > > > > > > may I ask you to elaborate on those limitations a > little? > > > > > > > The only thing that I could come up with is the > > limitation > > > > to a > > > > > > single > > > > > > > offset commit interval value for all running > connectors. > > > > > > > Maybe also the limitation to shared config providers.. > > > > > > > > > > > > > > But you sound like you had painful experiences with > this > > > > > before, > > > > > > maybe > > > > > > > you'd like to share the burden :) > > > > > > > > > > > > > > Best regards, > > > > > > > Sönke > > > > > > > > > > > > > > On Wed, Dec 5, 2018 at 5:15 AM Ryanne Dolan < > > > > > > ryannedo...@gmail.com> wrote: > > > > > > > > > > > > > > > Sönke, > > > > > > > > > > > > > > > > I think so long as we can keep the differences at a > > very > > > > high > > > > > > level (i.e. > > > > > > > > the "control plane"), there is little downside to MM2 > > and > > > > > > Connect > > > > > > > > coexisting. I do expect them to converge to some > > extent, > > > > with > > > > > > features > > > > > > > from > > > > > > > > MM2 being pulled into Connect whenever this is > possible > > > > > > without breaking > > > > > > > > things. > > > > > > > > > > > > > > > > I could definitely see your idea re hierarchies or > > groups > > > > of > > > > > > connectors > > > > > > > > being useful outside MM2. Currently "worker configs" > > > apply > > > > > > across the > > > > > > > > entire cluster, which is limiting even for use-cases > > > > > involving > > > > > > a single > > > > > > > > Kafka cluster. If Connect supported multiple workers > in > > > the > > > > > > same cluster, > > > > > > > > it would start to look a lot like a MM2 cluster. > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > On Tue, Dec 4, 2018 at 3:26 PM Sönke Liebau > > > > > > > > <soenke.lie...@opencore.com.invalid> wrote: > > > > > > > > > > > > > > > > > Hi Ryanne, > > > > > > > > > > > > > > > > > > thanks for your response! > > > > > > > > > > > > > > > > > > It seems like you have already done a lot of > > > > investigation > > > > > > into the > > > > > > > > > existing code and the solution design and all of > what > > > you > > > > > > write makes > > > > > > > > sense > > > > > > > > > to me. Would it potentially be worth adding this to > > the > > > > > KIP, > > > > > > now that > > > > > > > you > > > > > > > > > had to write it up because of me anyway? > > > > > > > > > > > > > > > > > > However, I am afraid that I am still not entirely > > > > convinced > > > > > > of the > > > > > > > > > fundamental benefit this provides over an extended > > > > Connect > > > > > > that has the > > > > > > > > > following functionality: > > > > > > > > > - allow for organizing connectors into a > hierarchical > > > > > > structure - > > > > > > > > > "clusters/us-west/..." > > > > > > > > > - allow defining external Kafka clusters to be used > > by > > > > > > Source and Sink > > > > > > > > > connectors instead of the local cluster > > > > > > > > > > > > > > > > > > Personally I think both of these features are > useful > > > > > > additions to > > > > > > > > Connect, > > > > > > > > > I'll address both separately below. > > > > > > > > > > > > > > > > > > Allowing to structure connectors in a hierarchy > > > > > > > > > Organizing running connectors will grow more > > important > > > as > > > > > > corporate > > > > > > > > > customers adapt Connect and installations grow in > > size. > > > > > > Additionally > > > > > > > this > > > > > > > > > could be useful for ACLs in case they are ever > added > > to > > > > > > Connect, as you > > > > > > > > > could allow specific users access only to specific > > > > > > namespaces (and > > > > > > > until > > > > > > > > > ACLs are added it would facilitate using a reverse > > > proxy > > > > > for > > > > > > the same > > > > > > > > > effect). > > > > > > > > > > > > > > > > > > Allow accessing multiple external clusters > > > > > > > > > The reasoning for this feature is pretty much the > > same > > > as > > > > > > for a central > > > > > > > > > Mirror Maker cluster, if a company has multiple > > > clusters > > > > > for > > > > > > whatever > > > > > > > > > reason but wants to have ingest centralized in one > > > system > > > > > > aka one > > > > > > > Connect > > > > > > > > > cluster they would need the ability to read from > and > > > > write > > > > > > to an > > > > > > > > arbitrary > > > > > > > > > number of Kafka clusters. > > > > > > > > > I haven't really looked at the code, just poked > > around > > > a > > > > > > couple of > > > > > > > > minutes, > > > > > > > > > but it appears like this could be done with fairly > > low > > > > > > effort. My > > > > > > > general > > > > > > > > > idea would be to leave the existing configuration > > > options > > > > > > untouched - > > > > > > > > > Connect will always need a "primary" cluster that > is > > > used > > > > > > for storage > > > > > > > of > > > > > > > > > internal data (config, offsets, status) there is no > > > need > > > > to > > > > > > break > > > > > > > > existing > > > > > > > > > configs. But additionally allow adding named extra > > > > clusters > > > > > > by > > > > > > > specifying > > > > > > > > > options like > > > > > > > > > external.sales_cluster.bootstrap_servers=... > > > > > > > > > external.sales_cluster.ssl.keystore.location=... > > > > > > > > > external.marketing_cluster.bootstrap_servers=... > > > > > > > > > > > > > > > > > > The code for status, offset and config storage is > > > mostly > > > > > > isolated in > > > > > > > the > > > > > > > > > Kafka[Offset|Status|Config]BackingStore classes and > > > could > > > > > > remain pretty > > > > > > > > > much unchanged. > > > > > > > > > > > > > > > > > > Producer and consumer creation for Tasks is done in > > the > > > > > > Worker as of > > > > > > > > > KAFKA-7551 and is isolated in two functions. We > could > > > > add a > > > > > > two more > > > > > > > > > functions with an extra argument for the external > > > cluster > > > > > > name to be > > > > > > > used > > > > > > > > > and return fitting consumers/producers. > > > > > > > > > The source and sink config would then simply gain > an > > > > > > optional setting > > > > > > > to > > > > > > > > > specify the cluster name. > > > > > > > > > > > > > > > > > > I am very sure that I am missing a few large issues > > > with > > > > > > these ideas, > > > > > > > I'm > > > > > > > > > mostly back-of-the-napkin designing here, but it > > might > > > be > > > > > > worth a > > > > > > > second > > > > > > > > > look. > > > > > > > > > > > > > > > > > > Once we decide to diverge into two clusters: > > > MirrorMaker > > > > > and > > > > > > Connect, I > > > > > > > > > think realistically the chance of those two ever > > being > > > > > > merged again > > > > > > > > because > > > > > > > > > they grow back together is practically zero - hence > > my > > > > > > hesitation. > > > > > > > > > > > > > > > > > > ---- > > > > > > > > > > > > > > > > > > All of that being said, I am absolutely happy to > > agree > > > to > > > > > > disagree, I > > > > > > > > think > > > > > > > > > to a certain extent this is down to a question of > > > > personal > > > > > > > > > style/preference. And as this is your baby and you > > have > > > > put > > > > > > a lot more > > > > > > > > > effort and thought into it than I ever will I'll > shut > > > up > > > > > now > > > > > > :) > > > > > > > > > > > > > > > > > > Again, thanks for all your good work! > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > Sönke > > > > > > > > > > > > > > > > > > On Fri, Nov 30, 2018 at 9:00 PM Ryanne Dolan < > > > > > > ryannedo...@gmail.com> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Thanks Sönke. > > > > > > > > > > > > > > > > > > > > > it just feels to me like an awful lot of > Connect > > > > > > functionality > > > > > > > would > > > > > > > > > need > > > > > > > > > > to be reimplemented or at least wrapped > > > > > > > > > > > > > > > > > > > > Connect currently has two drivers, > > ConnectDistributed > > > > and > > > > > > > > > > ConnectStandalone. Both set up a Herder, which > > > manages > > > > > > Workers. I've > > > > > > > > > > implemented a third driver which sets up multiple > > > > > Herders, > > > > > > one for > > > > > > > each > > > > > > > > > > Kafka cluster as specified in a config file. From > > the > > > > > > Herder level > > > > > > > > down, > > > > > > > > > > nothing is changed or duplicated -- it's just > > > Connect. > > > > > > > > > > > > > > > > > > > > For the REST API, Connect wraps a Herder in a > > > > RestServer > > > > > > class, which > > > > > > > > > > creates a Jetty server with a few JAX-RS > resources. > > > One > > > > > of > > > > > > these > > > > > > > > > resources > > > > > > > > > > is ConnectorsResource, which is the real meat of > > the > > > > REST > > > > > > API, > > > > > > > enabling > > > > > > > > > > start, stop, creation, deletion, and > configuration > > of > > > > > > Connectors. > > > > > > > > > > > > > > > > > > > > I've added MirrorRestServer, which wraps a set of > > > > Herders > > > > > > instead of > > > > > > > > one. > > > > > > > > > > The server exposes a single resource, > > > ClustersResource, > > > > > > which is > > > > > > > only a > > > > > > > > > few > > > > > > > > > > lines of code: > > > > > > > > > > > > > > > > > > > > @GET > > > > > > > > > > @Path("/") > > > > > > > > > > public Collection<String> listClusters() { > > > > > > > > > > return clusters.keySet(); > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > @Path("/{cluster}") > > > > > > > > > > public ConnectorsResource > > > > > > > getConnectorsForCluster(@PathParam("cluster") > > > > > > > > > > cluster) { > > > > > > > > > > return new > > > ConnectorsResource(clusters.get(cluster)); > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > (simplified a bit and subject to change) > > > > > > > > > > > > > > > > > > > > The ClustersResource defers to the existing > > > > > > ConnectorsResource, which > > > > > > > > > again > > > > > > > > > > is most of the Connect API. With this in place, I > > can > > > > > make > > > > > > requests > > > > > > > > like: > > > > > > > > > > > > > > > > > > > > GET /clusters > > > > > > > > > > > > > > > > > > > > GET /clusters/us-west/connectors > > > > > > > > > > > > > > > > > > > > PUT /clusters/us-west/connectors/us-east/config > > > > > > > > > > { "topics" : "topic1" } > > > > > > > > > > > > > > > > > > > > etc. > > > > > > > > > > > > > > > > > > > > So on the whole, very little code is involved in > > > > > > implementing > > > > > > > > > "MirrorMaker > > > > > > > > > > clusters". I won't rule out adding additional > > > features > > > > on > > > > > > top of this > > > > > > > > > basic > > > > > > > > > > API, but nothing should require re-implementing > > what > > > is > > > > > > already in > > > > > > > > > Connect. > > > > > > > > > > > > > > > > > > > > > Wouldn't it be a viable alternative to look > into > > > > > > extending Connect > > > > > > > > > itself > > > > > > > > > > > > > > > > > > > > Maybe Connect will evolve to the point where > > Connect > > > > > > clusters and > > > > > > > > > > MirrorMaker clusters are indistinguishable, but I > > > think > > > > > > this is > > > > > > > > unlikely, > > > > > > > > > > since really no use-case outside replication > would > > > > > benefit > > > > > > from the > > > > > > > > added > > > > > > > > > > complexity. Moreover, I think support for > multiple > > > > Kafka > > > > > > clusters > > > > > > > would > > > > > > > > > be > > > > > > > > > > hard to add without significant changes to the > > > existing > > > > > > APIs and > > > > > > > > configs, > > > > > > > > > > which all assume a single Kafka cluster. I think > > > > > > Connect-as-a-Service > > > > > > > > and > > > > > > > > > > Replication-as-a-Service are sufficiently > different > > > > > > use-cases that we > > > > > > > > > > should expect the APIs and configuration files to > > be > > > at > > > > > > least > > > > > > > slightly > > > > > > > > > > different, even if both use the same framework > > > > > underneath. > > > > > > That > > > > > > > said, I > > > > > > > > > do > > > > > > > > > > plan to contribute a few improvements to the > > Connect > > > > > > framework in > > > > > > > > support > > > > > > > > > > of MM2 -- just nothing within the scope of the > > > current > > > > > KIP. > > > > > > > > > > > > > > > > > > > > Thanks again! > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Nov 30, 2018 at 3:47 AM Sönke Liebau > > > > > > > > > > <soenke.lie...@opencore.com.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > Hi Ryanne, > > > > > > > > > > > > > > > > > > > > > > thanks. I missed the remote to remote > replication > > > > > > scenario in my > > > > > > > > train > > > > > > > > > of > > > > > > > > > > > thought, you are right. > > > > > > > > > > > > > > > > > > > > > > That being said I have to admit that I am not > yet > > > > fully > > > > > > on board > > > > > > > with > > > > > > > > > the > > > > > > > > > > > concept, sorry. But I might just be > > > misunderstanding > > > > > > what your > > > > > > > > > intention > > > > > > > > > > > is. Let me try and explain what I think it is > you > > > are > > > > > > trying to do > > > > > > > > and > > > > > > > > > > why > > > > > > > > > > > I am on the fence about that and take it from > > > there. > > > > > > > > > > > > > > > > > > > > > > You want to create an extra mirrormaker driver > > > class > > > > > > which will > > > > > > > take > > > > > > > > > > > multiple clusters as configuration options. > Based > > > on > > > > > > these clusters > > > > > > > > it > > > > > > > > > > will > > > > > > > > > > > then reuse the connect workers and create as > many > > > as > > > > > > necessary to > > > > > > > be > > > > > > > > > able > > > > > > > > > > > to replicate to/from each of those configured > > > > clusters. > > > > > > It will > > > > > > > then > > > > > > > > > > > expose a rest api (since you stated subset of > > > Connect > > > > > > rest api I > > > > > > > > assume > > > > > > > > > > it > > > > > > > > > > > will be a new / own one?) that allows users to > > > send > > > > > > requests like > > > > > > > > > > > "replicate topic a from cluster 1 to cluster 1" > > and > > > > > > start a > > > > > > > connector > > > > > > > > > on > > > > > > > > > > > the relevant worker that can offer this > "route". > > > > > > > > > > > This can be extended to a cluster by starting > > > mirror > > > > > > maker drivers > > > > > > > on > > > > > > > > > > other > > > > > > > > > > > nodes with the same config and it would offer > all > > > the > > > > > > connect > > > > > > > > features > > > > > > > > > of > > > > > > > > > > > balancing restarting in case of failure etc. > > > > > > > > > > > > > > > > > > > > > > If this understanding is correct then it just > > feels > > > > to > > > > > > me like an > > > > > > > > awful > > > > > > > > > > lot > > > > > > > > > > > of Connect functionality would need to be > > > > reimplemented > > > > > > or at least > > > > > > > > > > > wrapped, which potentially could mean > additional > > > > effort > > > > > > for > > > > > > > > maintaining > > > > > > > > > > and > > > > > > > > > > > extending Connect down the line. Wouldn't it > be a > > > > > viable > > > > > > > alternative > > > > > > > > to > > > > > > > > > > > look into extending Connect itself to allow > > > defining > > > > > > "remote > > > > > > > > clusters" > > > > > > > > > > > which can then be specified in the connector > > config > > > > to > > > > > > be used > > > > > > > > instead > > > > > > > > > of > > > > > > > > > > > the local cluster? I imagine that change itself > > > would > > > > > > not be too > > > > > > > > > > extensive, > > > > > > > > > > > the main effort would probably be in coming up > > > with a > > > > > > sensible > > > > > > > config > > > > > > > > > > > structure and ensuring backwards compatibility > > with > > > > > > existing > > > > > > > > connector > > > > > > > > > > > configs. > > > > > > > > > > > This would still allow to use a regular Connect > > > > cluster > > > > > > for an > > > > > > > > > arbitrary > > > > > > > > > > > number of clusters, thus still having a > dedicated > > > > > > MirrorMaker > > > > > > > cluster > > > > > > > > > by > > > > > > > > > > > running only MirrorMaker Connectors in there if > > you > > > > > want > > > > > > the > > > > > > > > > isolation. I > > > > > > > > > > > agree that it would not offer the level of > > > > abstraction > > > > > > around > > > > > > > > > replication > > > > > > > > > > > that your concept would enable to implement, > but > > I > > > > > think > > > > > > if would > > > > > > > be > > > > > > > > > far > > > > > > > > > > > less implementation and maintenance effort. > > > > > > > > > > > > > > > > > > > > > > But again, all of that is based on my, > > potentially > > > > > > flawed, > > > > > > > > > understanding > > > > > > > > > > of > > > > > > > > > > > your proposal, please feel free to correct me > :) > > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > Sönke > > > > > > > > > > > > > > > > > > > > > > On Fri, Nov 30, 2018 at 1:39 AM Ryanne Dolan < > > > > > > > ryannedo...@gmail.com> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Sönke, thanks for the feedback! > > > > > > > > > > > > > > > > > > > > > > > > > the renaming policy [...] can be disabled > > > [...] > > > > > The > > > > > > KIP itself > > > > > > > > > does > > > > > > > > > > > not > > > > > > > > > > > > mention this > > > > > > > > > > > > > > > > > > > > > > > > Good catch. I've updated the KIP to call this > > > out. > > > > > > > > > > > > > > > > > > > > > > > > > "MirrorMaker clusters" I am not sure I > fully > > > > > > understand the > > > > > > > issue > > > > > > > > > you > > > > > > > > > > > > are trying to solve > > > > > > > > > > > > > > > > > > > > > > > > MirrorMaker today is not scalable from an > > > > operational > > > > > > > perspective. > > > > > > > > > > Celia > > > > > > > > > > > > Kung at LinkedIn does a great job of > explaining > > > > this > > > > > > problem [1], > > > > > > > > > which > > > > > > > > > > > has > > > > > > > > > > > > caused LinkedIn to drop MirrorMaker in favor > of > > > > > > Brooklin. With > > > > > > > > > > Brooklin, > > > > > > > > > > > a > > > > > > > > > > > > single cluster, single API, and single UI > > > controls > > > > > > replication > > > > > > > > flows > > > > > > > > > > for > > > > > > > > > > > an > > > > > > > > > > > > entire data center. With MirrorMaker 2.0, the > > > > vision > > > > > > is much the > > > > > > > > > same. > > > > > > > > > > > > > > > > > > > > > > > > If your data center consists of a small > number > > of > > > > > > Kafka clusters > > > > > > > > and > > > > > > > > > an > > > > > > > > > > > > existing Connect cluster, it might make more > > > sense > > > > to > > > > > > re-use the > > > > > > > > > > Connect > > > > > > > > > > > > cluster with MirrorSource/SinkConnectors. > > There's > > > > > > nothing wrong > > > > > > > > with > > > > > > > > > > this > > > > > > > > > > > > approach for small deployments, but this > model > > > also > > > > > > doesn't > > > > > > > scale. > > > > > > > > > This > > > > > > > > > > > is > > > > > > > > > > > > because Connect clusters are built around a > > > single > > > > > > Kafka cluster > > > > > > > -- > > > > > > > > > > what > > > > > > > > > > > I > > > > > > > > > > > > call the "primary" cluster -- and all > > Connectors > > > in > > > > > > the cluster > > > > > > > > must > > > > > > > > > > > either > > > > > > > > > > > > consume from or produce to this single > cluster. > > > If > > > > > you > > > > > > have more > > > > > > > > than > > > > > > > > > > one > > > > > > > > > > > > "active" Kafka cluster in each data center, > > > you'll > > > > > end > > > > > > up needing > > > > > > > > > > > multiple > > > > > > > > > > > > Connect clusters there as well. > > > > > > > > > > > > > > > > > > > > > > > > The problem with Connect clusters for > > replication > > > > is > > > > > > way less > > > > > > > > severe > > > > > > > > > > > > compared to legacy MirrorMaker. Generally you > > > need > > > > > one > > > > > > Connect > > > > > > > > > cluster > > > > > > > > > > > per > > > > > > > > > > > > active Kafka cluster. As you point out, MM2's > > > > > > SinkConnector means > > > > > > > > you > > > > > > > > > > can > > > > > > > > > > > > get away with a single Connect cluster for > > > > topologies > > > > > > that center > > > > > > > > > > around > > > > > > > > > > > a > > > > > > > > > > > > single primary cluster. But each Connector > > within > > > > > each > > > > > > Connect > > > > > > > > > cluster > > > > > > > > > > > must > > > > > > > > > > > > be configured independently, with no > high-level > > > > view > > > > > > of your > > > > > > > > > > replication > > > > > > > > > > > > flows within and between data centers. > > > > > > > > > > > > > > > > > > > > > > > > With MirrorMaker 2.0, a single MirrorMaker > > > cluster > > > > > > manages > > > > > > > > > replication > > > > > > > > > > > > across any number of Kafka clusters. Much > like > > > > > > Brooklin, MM2 does > > > > > > > > the > > > > > > > > > > > work > > > > > > > > > > > > of setting up connectors between clusters as > > > > needed. > > > > > > This > > > > > > > > > > > > Replication-as-a-Service is a huge win for > > larger > > > > > > deployments, as > > > > > > > > > well > > > > > > > > > > as > > > > > > > > > > > > for organizations that haven't adopted > Connect. > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://www.slideshare.net/ConfluentInc/more-data-more-problems-scaling-kafkamirroring-pipelines-at-linkedin > > > > > > > > > > > > > > > > > > > > > > > > Keep the questions coming! Thanks. > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Nov 29, 2018 at 3:30 AM Sönke Liebau > < > > > > > > > > > > soenke.lie...@opencore.com > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > >> Hi Ryanne, > > > > > > > > > > > >> > > > > > > > > > > > >> first of all, thanks for the KIP, great work > > > > overall > > > > > > and much > > > > > > > > > needed I > > > > > > > > > > > >> think! > > > > > > > > > > > >> > > > > > > > > > > > >> I have a small comment on the renaming > policy, > > > in > > > > > one > > > > > > of the > > > > > > > mails > > > > > > > > > on > > > > > > > > > > > >> this thread you mention that this can be > > > disabled > > > > > (to > > > > > > replicate > > > > > > > > > topic1 > > > > > > > > > > > in > > > > > > > > > > > >> cluster A as topic1 on cluster B I assume). > > The > > > > KIP > > > > > > itself does > > > > > > > > not > > > > > > > > > > > mention > > > > > > > > > > > >> this, from reading just the KIP one might > get > > > the > > > > > > assumption > > > > > > > that > > > > > > > > > > > renaming > > > > > > > > > > > >> is mandatory. It might be useful to add a > > > sentence > > > > > or > > > > > > two around > > > > > > > > > > > renaming > > > > > > > > > > > >> policies and what is possible here. I assume > > you > > > > > > intend to make > > > > > > > > > these > > > > > > > > > > > >> pluggable? > > > > > > > > > > > >> > > > > > > > > > > > >> Regarding the latest addition of > "MirrorMaker > > > > > > clusters" I am not > > > > > > > > > sure > > > > > > > > > > I > > > > > > > > > > > >> fully understand the issue you are trying to > > > solve > > > > > > and what > > > > > > > > exactly > > > > > > > > > > > these > > > > > > > > > > > >> scripts will do - but that may just me being > > > dense > > > > > > about it :) > > > > > > > > > > > >> I understand the limitation to a single > source > > > and > > > > > > target > > > > > > > cluster > > > > > > > > > that > > > > > > > > > > > >> Connect imposes, but isn't this worked > around > > by > > > > the > > > > > > fact that > > > > > > > you > > > > > > > > > > have > > > > > > > > > > > >> MirrorSource- and MirrorSinkConnectors and > one > > > > part > > > > > > of the > > > > > > > > equation > > > > > > > > > > will > > > > > > > > > > > >> always be under your control? > > > > > > > > > > > >> The way I understood your intention was that > > > there > > > > > is > > > > > > a > > > > > > > (regular, > > > > > > > > > not > > > > > > > > > > > MM) > > > > > > > > > > > >> Connect Cluster somewhere next to a Kafka > > > Cluster > > > > A > > > > > > and if you > > > > > > > > > deploy > > > > > > > > > > a > > > > > > > > > > > >> MirrorSourceTask to that it will read > messages > > > > from > > > > > a > > > > > > remote > > > > > > > > > cluster B > > > > > > > > > > > and > > > > > > > > > > > >> replicate them into the local cluster A. If > > you > > > > > > deploy a > > > > > > > > > > MirrorSinkTask > > > > > > > > > > > it > > > > > > > > > > > >> will read from local cluster A and replicate > > > into > > > > > > cluster B. > > > > > > > > > > > >> > > > > > > > > > > > >> Since in both causes the configuration for > > > > cluster B > > > > > > will be > > > > > > > > passed > > > > > > > > > > into > > > > > > > > > > > >> the connector in the ConnectorConfig > contained > > > in > > > > > the > > > > > > rest > > > > > > > > request, > > > > > > > > > > > what's > > > > > > > > > > > >> to stop us from starting a third connector > > with > > > a > > > > > > > MirrorSourceTask > > > > > > > > > > > reading > > > > > > > > > > > >> from cluster C? > > > > > > > > > > > >> > > > > > > > > > > > >> I am a bit hesitant about the entire concept > > of > > > > > > having extra > > > > > > > > scripts > > > > > > > > > > to > > > > > > > > > > > >> run an entire separate Connect cluster - I'd > > > much > > > > > > prefer an > > > > > > > option > > > > > > > > > to > > > > > > > > > > > use a > > > > > > > > > > > >> regular connect cluster from an ops point of > > > view. > > > > > Is > > > > > > it maybe > > > > > > > > worth > > > > > > > > > > > >> spending some time investigating whether we > > can > > > > come > > > > > > up with a > > > > > > > > > change > > > > > > > > > > to > > > > > > > > > > > >> connect that enables what MM would need? > > > > > > > > > > > >> > > > > > > > > > > > >> Best regards, > > > > > > > > > > > >> Sönke > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> On Tue, Nov 27, 2018 at 10:02 PM Ryanne > Dolan > > < > > > > > > > > > ryannedo...@gmail.com> > > > > > > > > > > > >> wrote: > > > > > > > > > > > >> > > > > > > > > > > > >>> Hey y'all, I'd like you draw your attention > > to > > > a > > > > > new > > > > > > section in > > > > > > > > > > KIP-382 > > > > > > > > > > > >>> re > > > > > > > > > > > >>> MirrorMaker Clusters: > > > > > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-382:+MirrorMaker+2.0#KIP-382:MirrorMaker2.0-MirrorMakerClusters > > > > > > > > > > > >>> > > > > > > > > > > > >>> A common concern I hear about using Connect > > for > > > > > > replication is > > > > > > > > that > > > > > > > > > > all > > > > > > > > > > > >>> SourceConnectors in a Connect cluster must > > use > > > > the > > > > > > same target > > > > > > > > > Kafka > > > > > > > > > > > >>> cluster, and likewise all SinkConnectors > must > > > use > > > > > > the same > > > > > > > source > > > > > > > > > > Kafka > > > > > > > > > > > >>> cluster. In order to use multiple Kafka > > > clusters > > > > > > from Connect, > > > > > > > > > there > > > > > > > > > > > are > > > > > > > > > > > >>> two possible approaches: > > > > > > > > > > > >>> > > > > > > > > > > > >>> 1) use an intermediate Kafka cluster, K. > > > > > > SourceConnectors (A, > > > > > > > B, > > > > > > > > C) > > > > > > > > > > > write > > > > > > > > > > > >>> to K and SinkConnectors (X, Y, Z) read from > > K. > > > > This > > > > > > enables > > > > > > > flows > > > > > > > > > > like > > > > > > > > > > > A > > > > > > > > > > > >>> -> > > > > > > > > > > > >>> K - > X but means that some topologies > > require > > > > > > extraneous hops, > > > > > > > > and > > > > > > > > > > > means > > > > > > > > > > > >>> that K must be scaled to handle records > from > > > all > > > > > > sources and > > > > > > > > sinks. > > > > > > > > > > > >>> > > > > > > > > > > > >>> 2) use multiple Connect clusters, one for > > each > > > > > > target cluster. > > > > > > > > Each > > > > > > > > > > > >>> cluster > > > > > > > > > > > >>> has multiple SourceConnectors, one for each > > > > source > > > > > > cluster. > > > > > > > This > > > > > > > > > > > enables > > > > > > > > > > > >>> direct replication of A -> X but means > there > > > is a > > > > > > proliferation > > > > > > > > of > > > > > > > > > > > >>> Connect > > > > > > > > > > > >>> clusters, each of which must be managed > > > > separately. > > > > > > > > > > > >>> > > > > > > > > > > > >>> Both options are viable for small > deployments > > > > > > involving a small > > > > > > > > > > number > > > > > > > > > > > of > > > > > > > > > > > >>> Kafka clusters in a small number of data > > > centers. > > > > > > However, > > > > > > > > neither > > > > > > > > > is > > > > > > > > > > > >>> scalable, especially from an operational > > > > > standpoint. > > > > > > > > > > > >>> > > > > > > > > > > > >>> KIP-382 now introduces "MirrorMaker > > clusters", > > > > > which > > > > > > are > > > > > > > distinct > > > > > > > > > > from > > > > > > > > > > > >>> Connect clusters. A single MirrorMaker > > cluster > > > > > > provides > > > > > > > > > > > >>> "Replication-as-a-Service" among any number > > of > > > > > Kafka > > > > > > clusters > > > > > > > > via a > > > > > > > > > > > >>> high-level REST API based on the Connect > API. > > > > Under > > > > > > the hood, > > > > > > > > > > > MirrorMaker > > > > > > > > > > > >>> sets up Connectors between each pair of > Kafka > > > > > > clusters. The > > > > > > > REST > > > > > > > > > API > > > > > > > > > > > >>> enables on-the-fly reconfiguration of each > > > > > > Connector, including > > > > > > > > > > updates > > > > > > > > > > > >>> to > > > > > > > > > > > >>> topic whitelists/blacklists. > > > > > > > > > > > >>> > > > > > > > > > > > >>> To configure MirrorMaker 2.0, you need a > > > > > > configuration file > > > > > > > that > > > > > > > > > > lists > > > > > > > > > > > >>> connection information for each Kafka > cluster > > > > > > (broker lists, > > > > > > > SSL > > > > > > > > > > > settings > > > > > > > > > > > >>> etc). At a minimum, this looks like: > > > > > > > > > > > >>> > > > > > > > > > > > >>> clusters=us-west, us-east > > > > > > > > > > > >>> > > > > > cluster.us-west.broker.list=us-west-kafka-server:9092 > > > > > > > > > > > >>> > > > > > cluster.us-east.broker.list=us-east-kafka-server:9092 > > > > > > > > > > > >>> > > > > > > > > > > > >>> You can specify topic whitelists and other > > > > > > connector-level > > > > > > > > settings > > > > > > > > > > > here > > > > > > > > > > > >>> too, or you can use the REST API to > > > > remote-control > > > > > a > > > > > > running > > > > > > > > > cluster. > > > > > > > > > > > >>> > > > > > > > > > > > >>> I've also updated the KIP with minor > changes > > to > > > > > > bring it in > > > > > > > line > > > > > > > > > with > > > > > > > > > > > the > > > > > > > > > > > >>> current implementation. > > > > > > > > > > > >>> > > > > > > > > > > > >>> Looking forward to your feedback, thanks! > > > > > > > > > > > >>> Ryanne > > > > > > > > > > > >>> > > > > > > > > > > > >>> On Mon, Nov 19, 2018 at 10:26 PM Ryanne > > Dolan < > > > > > > > > > ryannedo...@gmail.com > > > > > > > > > > > > > > > > > > > > > > >>> wrote: > > > > > > > > > > > >>> > > > > > > > > > > > >>> > Dan, you've got it right. ACL sync will > be > > > done > > > > > by > > > > > > MM2 > > > > > > > > > > automatically > > > > > > > > > > > >>> > (unless disabled) according to simple > > rules: > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > - If a principal has READ access on a > topic > > > in > > > > a > > > > > > source > > > > > > > > cluster, > > > > > > > > > > the > > > > > > > > > > > >>> same > > > > > > > > > > > >>> > principal should have READ access on > > > downstream > > > > > > replicated > > > > > > > > topics > > > > > > > > > > > >>> ("remote > > > > > > > > > > > >>> > topics"). > > > > > > > > > > > >>> > - Only MM2 has WRITE access on "remote > > > topics". > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > This covers sync from upstream topics > like > > > > > > "topic1" to > > > > > > > > downstream > > > > > > >