Thanks Jun. > 103. My point was that the MirrorMakerConnector can die while the Heartbeat connector is still alive. So, one can't solely rely on Heartbeat for monitoring?
Each cluster will have a heartbeat topic produced by MirrorHeartbeatConnector, which doesn't have an associated "source" other than time. This topic gets picked up by downstream MirrorSourceConnectors and replicated like A.heartbeat. So the heartbeat topic itself isn't particular useful for monitoring, but the downstream A.heartbeat shows that heartbeats are being replicated successfully from A -> B. If a MirrorSourceConnector fails while replicating A -> B, you'd still see heartbeats in cluster B, but not A.heartbeat. 105. You're correct, you don't need to know "B" in order to go from A's topic1 to B's A.topic1, i.e. migrating downstream. But you need to know "B" to go from A's B.topic1 to B's topic1. In the latter case, you are consuming a remote topic to begin with, and then migrating to the source cluster, i.e. migrating upstream. N.B. you strip the "B" prefix in this case, rather than add the "A" prefix. And you can't just strip all prefixes, because you could be migrating from e.g. A's C.topic1 to B's C.topic1, i.e. migrating "laterally", if you will. I suppose we could break this out into multiple methods (upstream, downstream, lateral etc), but I think that would add a lot more complexity and confusion to the API. By providing both A and B, the single method can always figure out what to do. 107. done Thanks, Ryanne On Wed, Jan 9, 2019 at 6:11 PM Jun Rao <j...@confluent.io> wrote: > Hi, Ryanne, > > 103. My point was that the MirrorMakerConnector can die while the > Heartbeat connector is still alive. So, one can't solely rely on Heartbeat > for monitoring? > > 105. Hmm, maybe I don't understand how this is done. Let's say we replica > topic1 from cluster A to cluster B. My understanding is that to translate > the offset from A to B for a consumer group, we read A.checkpoint file in > cluster B to get the timestamp of the last checkpointed offset, call > consumer.offsetsForTimes() on A.topic1 in cluster B to translate the > timestamp to a local offset, and return <A.topic1, translated offset>. Is > that right? If so, in all steps, we don't need to know the > targetClusterAlias B. We just need to know the connection string to > cluster B, which targetConsumerConfig provides. > > 107. Thanks. Could you add that description to the KIP? > > Thanks, > > Jun > > On Mon, Jan 7, 2019 at 3:50 PM Ryanne Dolan <ryannedo...@gmail.com> wrote: > >> Thanks Jun, I've updated the KIP as requested. Brief notes below: >> >> 100. added "...out-of-the-box (without custom handlers)..." >> >> 101. done. Good idea to include a MessageFormatter. >> >> 102. done. >> >> > 103. [...] why is Heartbeat a separate connector? >> >> Heartbeats themselves are replicated via MirrorSource/SinkConnector, so >> if replication stops, you'll stop seeing heartbeats in downstream clusters. >> I've updated the KIP to make this clearer and have added a bullet to >> Rejected Alternatives. >> >> 104. added "heartbeat.retention.ms", "checkpoint.retention.ms", thanks. >> The heartbeat topic doesn't need to be compacted. >> >> > 105. [...] I am not sure why targetClusterAlias is useful >> >> In order to map A's B.topic1 to B's topic1, we need to know B. >> >> > 106. [...] should the following properties be prefixed with "consumer." >> >> No, they are part of Connect's worker config. >> >> > 107. So, essentially it's running multiple logical connect clusters on >> the same shared worker nodes? >> >> Correct. Rather than configure each Connector and Worker and Herder >> individually, a single top-level configuration file is used. And instead of >> running a bunch of separate worker processes on each node, a single process >> runs multiple workers. This is implemented using a separate driver based on >> ConnectDistributed, but which runs multiple DistributedHerders. Each >> DistributedHerder uses a different Kafka cluster for coordination -- they >> are completely separate apart from running in the same process. >> >> Thanks for helping improve the doc! >> Ryanne >> >> On Fri, Jan 4, 2019 at 10:33 AM Jun Rao <j...@confluent.io> wrote: >> >>> Hi, Ryanne, >>> >>> Thanks for KIP. Still have a few more comments below. >>> >>> 100. "This is not possible with MirrorMaker today -- records would be >>> replicated back and forth indefinitely, and the topics in either cluster >>> would be merged inconsistently between clusters. " This is not 100% true >>> since MM can do the topic renaming through MirrorMakerMessageHandler. >>> >>> 101. For both Heartbeat and checkpoint, could you define the full schema, >>> including the field type? Also how are they serialized into the Kafka >>> topic? Is it JSON or sth else? For convenience, it would be useful to >>> provide a built-in MessageFormatter so that one can read each topic's >>> data >>> using tools like ConsoleConsumer. >>> >>> 102. For the public Heartbeat and Checkpoint class, could you list the >>> public methods in each class? >>> >>> 103. I am wondering why is Heartbeat a separate connector? A MirrorMaker >>> connector can die independent of the Heartbeat connector, which seems to >>> defeat the purpose of heartbeat. >>> >>> 104. Is the Heartbeat topic also a compacted topic? If not, how long is >>> it >>> retained for? >>> >>> 105. For the following, I am not sure why targetClusterAlias is useful? >>> The >>> checkpoint file seems to only include sourceClusterAlias. >>> >>> Map<TopicPartition, Long> translateOffsets(Map<?, ?> >>> targetConsumerConfig, >>> String sourceClusterAlias, String targetClusterAlias, String >>> remoteGroupId) >>> >>> 106. In the configuration example, should the following properties be >>> prefixed with "consumer."? >>> key.converter >>> <https://cwiki.apache.org/confluence/display/KAFKA/key.converter> = >>> org.apache.kafka.connect.converters.ByteArrayConverter >>> < >>> https://cwiki.apache.org/confluence/display/KAFKA/org.apache.kafka.connect.converters.ByteArrayConverter >>> > >>> value.converter >>> <https://cwiki.apache.org/confluence/display/KAFKA/value.converter> = >>> org.apache.kafka.connect.converters.ByteArrayConverter >>> < >>> https://cwiki.apache.org/confluence/display/KAFKA/org.apache.kafka.connect.converters.ByteArrayConverter >>> > >>> >>> 107. Could you add a bit more description on how connect-mirror-maker.sh >>> is >>> implemented? My understanding is that it will start as many as >>> separate DistributedHerder as the Kafka clusters? So, essentially it's >>> running multiple logical connect clusters on the same shared worker >>> nodes? >>> >>> Thanks, >>> >>> Jun >>> >>> >>> On Thu, Dec 20, 2018 at 5:23 PM Srinivas Reddy < >>> srinivas96all...@gmail.com> >>> wrote: >>> >>> > +1 (non binding) >>> > >>> > Thank you Ryan for the KIP, let me know if you need support in >>> implementing >>> > it. >>> > >>> > - >>> > Srinivas >>> > >>> > - Typed on tiny keys. pls ignore typos.{mobile app} >>> > >>> > >>> > On Fri, 21 Dec, 2018, 08:26 Ryanne Dolan <ryannedo...@gmail.com wrote: >>> > >>> > > Thanks for the votes so far! >>> > > >>> > > Due to recent discussions, I've removed the high-level REST API from >>> the >>> > > KIP. >>> > > >>> > > On Thu, Dec 20, 2018 at 12:42 PM Paul Davidson < >>> pdavid...@salesforce.com >>> > > >>> > > wrote: >>> > > >>> > > > +1 >>> > > > >>> > > > Would be great to see the community build on the basic approach we >>> took >>> > > > with Mirus. Thanks Ryanne. >>> > > > >>> > > > On Thu, Dec 20, 2018 at 9:01 AM Andrew Psaltis < >>> > psaltis.and...@gmail.com >>> > > > >>> > > > wrote: >>> > > > >>> > > > > +1 >>> > > > > >>> > > > > Really looking forward to this and to helping in any way I can. >>> > Thanks >>> > > > for >>> > > > > kicking this off Ryanne. >>> > > > > >>> > > > > On Thu, Dec 20, 2018 at 10:18 PM Andrew Otto <o...@wikimedia.org >>> > >>> > > wrote: >>> > > > > >>> > > > > > +1 >>> > > > > > >>> > > > > > This looks like a huge project! Wikimedia would be very >>> excited to >>> > > have >>> > > > > > this. Thanks! >>> > > > > > >>> > > > > > On Thu, Dec 20, 2018 at 9:52 AM Ryanne Dolan < >>> > ryannedo...@gmail.com> >>> > > > > > wrote: >>> > > > > > >>> > > > > > > Hey y'all, please vote to adopt KIP-382 by replying +1 to >>> this >>> > > > thread. >>> > > > > > > >>> > > > > > > For your reference, here are the highlights of the proposal: >>> > > > > > > >>> > > > > > > - Leverages the Kafka Connect framework and ecosystem. >>> > > > > > > - Includes both source and sink connectors. >>> > > > > > > - Includes a high-level driver that manages connectors in a >>> > > dedicated >>> > > > > > > cluster. >>> > > > > > > - High-level REST API abstracts over connectors between >>> multiple >>> > > > Kafka >>> > > > > > > clusters. >>> > > > > > > - Detects new topics, partitions. >>> > > > > > > - Automatically syncs topic configuration between clusters. >>> > > > > > > - Manages downstream topic ACL. >>> > > > > > > - Supports "active/active" cluster pairs, as well as any >>> number >>> > of >>> > > > > active >>> > > > > > > clusters. >>> > > > > > > - Supports cross-data center replication, aggregation, and >>> other >>> > > > > complex >>> > > > > > > topologies. >>> > > > > > > - Provides new metrics including end-to-end replication >>> latency >>> > > > across >>> > > > > > > multiple data centers/clusters. >>> > > > > > > - Emits offsets required to migrate consumers between >>> clusters. >>> > > > > > > - Tooling for offset translation. >>> > > > > > > - MirrorMaker-compatible legacy mode. >>> > > > > > > >>> > > > > > > Thanks, and happy holidays! >>> > > > > > > Ryanne >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > > >>> > > > -- >>> > > > Paul Davidson >>> > > > Principal Engineer, Ajna Team >>> > > > Big Data & Monitoring >>> > > > >>> > > >>> > >>> >>