Thanks Dong. > 1. Currently if there is topic created with "." in the topic name, would it cause correctness issue for this KIP?
Yes, RemoteClusterUtils would be confused by existing topics that have a period, and MM2 might try to send records to existing topics if they happen to be prefixed with a source cluster alias. > Some future work such as "propose a special separator character" has been suggested in the email thread. Could we document these future work in the KIP Good idea, I've added a Future Work section. > 2. [...] producer will producer to topic_1 in cluster B whereas Consumer will consume from A.topic_1 in cluster B. [...] You are correct, if a producer-consumer pair use a normal topic and both failover, the consumer's topic is renamed, but the producer's topic stays the same, potentially introducing a disconnect between the producer and consumer. This is by design. Remember that consumers very often will consume from a topic and all associated remote topics, e.g. via a regex subscription, in which case the cross-cluster "logical" topic is unchanged before and after migration. When a consumer subscribes to records from a particular cluster only, migration won't change that either. In your example, the subscription is A's topic_1, and that doesn't change after migration -- the consumer still receives records only from A (though presumably A is down during failover). If you want the consumer to see records from A and/or B (whichever is active at the moment), your subscription should be ["topic_1", "B.topic_1"], which after migration will be ["topic_1", "A.topic_1"]. In this way, consumers have control over whether they are consuming from a particular cluster or from a set of "active" clusters. > 3. [...] do we need to specify in the KIP the migration procedure [...] Good idea. I've added this to Future Work, along with future command-line tooling. > user needs to additionally call offsetsForTimes(...) after getting the timestamp from the checkpoint for each. Not generally -- all you need is in RemoteClusterUtils. N.B. this KIP does not change any logic in existing consumers or producers, but it provides the tools necessary to build cross-cluster consumers and producers on top of the existing APIs. > 4. A lot of class names (e.g. MirrorCheckpointConnector) is included in the Public Interface section. Not sure if it is useful to specify the class name without specifying the class API and its usage. Those are implementations of existing interfaces, so there's nothing new to document. > whether we should include in the Public Interface section [...] schema of the checkpoint topic and heartbeat topic Thanks, I've added links to the respective subheadings. > usage of the new scripts (e.g. connect-mirror-maker.sh) with its arguments The connect-mirror-maker.sh script has the same arguments as the existing connect-standalone.sh and connect-distributed.sh scripts, which is to say they don't really have any arguments except paths to properties files. I suspect this will change over time, but for now there's nothing new to document here. > 5. Usually we treat metrics as public interface. Good call, I've added a section. > 6. [...] "subject to change" [...] Deleted. > 7. this API reads the timestamp from the checkpoint topic and return it to the caller. No, translateOffsets uses an internal offset stream to translate offsets directly -- timestamps are not involved in the translation. As Becket points out, there are additional ways to implement this, including via intermediate timestamps and/or RecordBatch headers, and MM2 might offer multiple options down the road. Other similar replication engines do use timestamps for this. But regardless of the underlying implementation, the end-to-end operation is offset-to-offset, not timestamp-to-timestamp, so I think the method name should not change. > 8. When a consumer is migrated from A.topic_1 in cluster B to topic_1 in cluster A, how does consumer determine the start offset for topic_1 in cluster A? Good question. In order to migrate "upstream" like this (i.e. from remote topic -> source topic during failback) you need corresponding checkpoints going upstream as well. Unlike MM1 and others, MM2 generally operates in both directions like this, even if data is flowing in only one direction. For example, you may have an active primary cluster and a passive (read-only) backup cluster, with data being replicated from primary->backup. In this case, producers will never write to the read-only backup cluster, so there is no data flowing from backup->primary. But if you want to failover _and failback_ between primary and backup clusters, you'll need checkpoints emitted in both. Otherwise the primary cluster would not know which offsets were consumed in the backup cluster during failover. Whether active/active or active/passive, the MM2 driver automatically sets up connectors in both directions. I suppose we might want to add a checkpoint SinkConnector at some point to make this bidirectional checkpointing easier for those using a single Connect cluster to replicate two or more Kafka clusters. I'll add this to Future Work. > 9. Does RemoteClusterUtils.upstreamClusters(...) return clusters that is no > longer an upstream cluster? If yes, it seems to reduced the usability of > the API if user only wants to know the current upstream cluster list. If > no, could you explain what is the semantics of the API (e.g. when an > upstream cluster is considered outdated) and how is this API implemented to > exclude outdated clusters. It may be useful to briefly explain the > implementation in the proposed change section for those APIs that are not > straightforward. This is an interesting question. I think we can assume that an upstream cluster is not "outdated" if there is a heartbeat topic (and ostensibly heartbeat records) from the upstream cluster present on the downstream cluster. So by definition, the existence of topic "A.heartbeat" means "A" is an upstream cluster. Now, perhaps MM2 is no longer replicating A -> B for whatever reason, in which case the replication lag might be very long! But I think we'd still consider it an upstream cluster. If you changed the replication topology at your organization s.t. A -> B is no longer a valid flow, I'd imagine you'd want to clean up any "A.xyz" topics, including "A.heartbeat". But until then, RemoteClusterUtils would consider A to be upstream. > 10. [...] Not sure how is high watermark used here [...] I might be abusing this term, especially in the context of Kafka where it means other things as well, but I mean to say that offset translation is not one-to-one, but rather it simply returns some recent offset that is guaranteed to not come after the most recent checkpoint. > 11. [...] Should we include the Java doc [...] Done. > 12. [...] Not sure if we need three releases. My thinking is I'd like to release MM2 as soon as possible, but I'm in no hurry to remove MM1. I'd be open to deprecating parts of MM1 in the first release, but I don't want an Indiana Jones-style swap of ./bin/kafka-mirror-maker.sh in the near future. I suspect many early adopters of MM2 will leverage existing Connect clusters to run MM2's Connectors, which is a use case that doesn't really overlap with existing MM1 deployments, so I think there is a strong argument for letting them coexist for a bit. > 13. [...] specify the log compaction key [...] Done. Thanks Dong for the feedback! Ryanne On Sun, Jan 13, 2019 at 1:37 AM Dong Lin <lindon...@gmail.com> wrote: > Hey Ryanne, > > Sorry I am late here. Thanks much for all the work! After reading through > the latest KIP and all the previous discussion, I have some questions > below: > > 1. Currently if there is topic created with "." in the topic name, would it > cause correctness issue for this KIP? For example, will consumer reads from > the topic that is not intended, and will API such > as RemoteClusterUtils.upstreamClusters(...) return wrong string as the > upstream cluster? If there is correctness issue which reduces the > usability, we probably should mark it in the Java doc and fix these issues > before considering the feature as ready and safe to use. Some future work > such as "propose a special separator character" has been suggested in the > email thread. Could we document these future work in the KIP if the work is > necessary for MM2 to be used reliably? > > 2. Suppose topic_1 is replicated from cluster A to cluster B. And we have a > pair of producer and consumer that produces and consumes from topic_1 in > cluster A. Let's say cluster A crashed and we need to migrate this pair of > consumer to use cluster B. According to the discussion in the email thread, > producer will producer to topic_1 in cluster B whereas Consumer will > consume from A.topic_1 in cluster B. It means that the message produced by > this producer will not longer be consumed by this consumer. Would this be > an issue? > > 3. Given that the "mechanism to migrate producers or consumers between > mirrored clusters" is listed as the motivation of this KIP, do we need to > specify in the KIP the migration procedure which we have discussed in depth > in the email thread? For example, user needs to additionally > call offsetsForTimes(...) after getting the timestamp from the checkpoint > for each. > > 4. A lot of class names (e.g. MirrorCheckpointConnector) is included in the > Public Interface section. Not sure if it is useful to specify the class > name without specifying the class API and its usage. My understanding is > that Public Interface section should include 1) minimum amount of the > information that user can read in order to use the feature provided by the > KIP; and 2) anything whose change incurs compatibility concern and thus KIP > discussion. So I am wondering whether we should include in the Public > Interface section 1) the class API for those classes listed in the section > (or remove from the KIP if we have not decided the class API); 2) the usage > of the new scripts (e.g. connect-mirror-maker.sh) with its arguments (e.g. > this > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%3A+Support+replicas+movement+between+log+directories#KIP-113:Supportreplicasmovementbetweenlogdirectories-Scripts > > > example); and 3) schema of the checkpoint topic and heartbeat topic which > is currently in the Proposed Change section. > > 5. It is said in the motivation section that the design includes new > metrics such as end-to-end replication latency across multiple data > centers/clusters. Usually we treat metrics as public interface. Could we > also specify these metrics (e.g. definition, type) in Public Interface > section similar to this > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-237%3A+More+Controller+Health+Metrics#KIP-237:MoreControllerHealthMetrics-PublicInterfaces > > > example. > > 6. The KIP says that the RemoteClusterUtils is "subject to change". Are we > going to have a future KIP to discuss and finalize these APIs before > committing any code that implements the API? If so, it may be cleaner to > specify this or even remove this class from the Public Interface section, > and specify the future KIP in this KIP. > > 7. If we are going to determine the API for RemoteClusterUtils in this KIP, > then I have one comment regarding the > RemoteClusterUtils.translateOffsets(...). If I understand the discussion in > the email thread correctly, this API reads the timestamp from the > checkpoint topic and return it to the caller. If so, it seems more > intuitive to call this e.g. checkpointTimestamp(...). And the doc probably > should say "Find the timestamp of the last committed messaged in the source > cluster...". Otherwise, could you briefly explain in the KIP how this API > is implemented. > > 8. It is said in the email discussion that targetClusterAlias is needed in > order to migrate consumer from the remote topic back to the source topic > (i.e. fallback). When a consumer is migrated from A.topic_1 in cluster B to > topic_1 in cluster A, how does consumer determine the start offset for > topic_1 in cluster A? > > 9. Does RemoteClusterUtils.upstreamClusters(...) return clusters that is no > longer an upstream cluster? If yes, it seems to reduced the usability of > the API if user only wants to know the current upstream cluster list. If > no, could you explain what is the semantics of the API (e.g. when an > upstream cluster is considered outdated) and how is this API implemented to > exclude outdated clusters. It may be useful to briefly explain the > implementation in the proposed change section for those APIs that are not > straightforward. > > 10. It is mentioned in the Remote Cluster Utils section that "it will be > possible to find high water marks that consumers". Not sure how is high > watermark used here. Can you explain a bit more, e.g. which API does this > sentence refer to? > > 11. The APIs for the interface ReplicationPolicy is specified without > explanation or Java doc. It is hard to determine how they can be used and > whether these APIs are necessary. Should we include the Java doc and put > this interface in the Public Interface section? > > 12. The migration section specifies that this API will be carried in three > separate Kafka release. Not sure if we need three releases. In the first > release, we implement this KIP and deprecate old API. In the subsequent > release, we remove the old API. Does this make sense? > > 13. Since the newly added checkpoint topic is log-compacted, I am wondering > whether we need to specify the log compaction key for this topic in the > Public Interface section. > > Thanks, > Dong >