Thanks Dong.

> 1. Currently if there is topic created with "." in the topic name, would
it cause correctness issue for this KIP?

Yes, RemoteClusterUtils would be confused by existing topics that have a
period, and MM2 might try to send records to existing topics if they happen
to be prefixed with a source cluster alias.

> Some future work such as "propose a special separator character" has been
suggested in the email thread. Could we document these future work in the
KIP

Good idea, I've added a Future Work section.

> 2. [...] producer will producer to topic_1 in cluster B whereas Consumer
will consume from A.topic_1 in cluster B. [...]

You are correct, if a producer-consumer pair use a normal topic and both
failover, the consumer's topic is renamed, but the producer's topic stays
the same, potentially introducing a disconnect between the producer and
consumer. This is by design. Remember that consumers very often will
consume from a topic and all associated remote topics, e.g. via a regex
subscription, in which case the cross-cluster "logical" topic is unchanged
before and after migration. When a consumer subscribes to records from a
particular cluster only, migration won't change that either. In your
example, the subscription is A's topic_1, and that doesn't change after
migration -- the consumer still receives records only from A (though
presumably A is down during failover). If you want the consumer to see
records from A and/or B (whichever is active at the moment), your
subscription should be ["topic_1", "B.topic_1"], which after migration will
be ["topic_1", "A.topic_1"]. In this way, consumers have control over
whether they are consuming from a particular cluster or from a set of
"active" clusters.

> 3. [...] do we need to specify in the KIP the migration procedure [...]

Good idea. I've added this to Future Work, along with future command-line
tooling.

> user needs to additionally call offsetsForTimes(...) after getting the
timestamp from the checkpoint for each.

Not generally -- all you need is in RemoteClusterUtils. N.B. this KIP does
not change any logic in existing consumers or producers, but it provides
the tools necessary to build cross-cluster consumers and producers on top
of the existing APIs.

> 4. A lot of class names (e.g. MirrorCheckpointConnector) is included in
the Public Interface section. Not sure if it is useful to specify the class
name without specifying the class API and its usage.

Those are implementations of existing interfaces, so there's nothing new to
document.

> whether we should include in the Public Interface section [...] schema of
the checkpoint topic and heartbeat topic

Thanks, I've added links to the respective subheadings.

>  usage of the new scripts (e.g. connect-mirror-maker.sh) with its
arguments

The connect-mirror-maker.sh script has the same arguments as the existing
connect-standalone.sh and connect-distributed.sh scripts, which is to say
they don't really have any arguments except paths to properties files. I
suspect this will change over time, but for now there's nothing new to
document here.

> 5. Usually we treat metrics as public interface.

Good call, I've added a section.

> 6. [...] "subject to change" [...]

Deleted.

> 7.  this API reads the timestamp from the checkpoint topic and return it
to the caller.

No, translateOffsets uses an internal offset stream to translate offsets
directly -- timestamps are not involved in the translation. As Becket
points out, there are additional ways to implement this, including via
intermediate timestamps and/or RecordBatch headers, and MM2 might offer
multiple options down the road. Other similar replication engines do use
timestamps for this. But regardless of the underlying implementation, the
end-to-end operation is offset-to-offset, not timestamp-to-timestamp, so I
think the method name should not change.

> 8.  When a consumer is migrated from A.topic_1 in cluster B to topic_1 in
cluster A, how does consumer determine the start offset for topic_1 in
cluster A?

Good question. In order to migrate "upstream" like this (i.e. from remote
topic -> source topic during failback) you need corresponding checkpoints
going upstream as well. Unlike MM1 and others, MM2 generally operates in
both directions like this, even if data is flowing in only one direction.
For example, you may have an active primary cluster and a passive
(read-only) backup cluster, with data being replicated from
primary->backup. In this case, producers will never write to the read-only
backup cluster, so there is no data flowing from backup->primary. But if
you want to failover _and failback_ between primary and backup clusters,
you'll need checkpoints emitted in both. Otherwise the primary cluster
would not know which offsets were consumed in the backup cluster during
failover.

Whether active/active or active/passive, the MM2 driver automatically sets
up connectors in both directions. I suppose we might want to add a
checkpoint SinkConnector at some point to make this bidirectional
checkpointing easier for those using a single Connect cluster to replicate
two or more Kafka clusters. I'll add this to Future Work.

> 9. Does RemoteClusterUtils.upstreamClusters(...) return clusters that is
no
> longer an upstream cluster? If yes, it seems to reduced the usability of
> the API if user only wants to know the current upstream cluster list. If
> no, could you explain what is the semantics of the API (e.g. when an
> upstream cluster is considered outdated) and how is this API implemented
to
> exclude outdated clusters. It may be useful to briefly explain the
> implementation in the proposed change section for those APIs that are not
> straightforward.

This is an interesting question. I think we can assume that an upstream
cluster is not "outdated" if there is a heartbeat topic (and ostensibly
heartbeat records) from the upstream cluster present on the downstream
cluster. So by definition, the existence of topic "A.heartbeat" means "A"
is an upstream cluster. Now, perhaps MM2 is no longer replicating A -> B
for whatever reason, in which case the replication lag might be very long!
But I think we'd still consider it an upstream cluster.

If you changed the replication topology at your organization s.t. A -> B is
no longer a valid flow, I'd imagine you'd want to clean up any "A.xyz"
topics, including "A.heartbeat". But until then, RemoteClusterUtils would
consider A to be upstream.

> 10. [...] Not sure how is high watermark used here [...]

I might be abusing this term, especially in the context of Kafka where it
means other things as well, but I mean to say that offset translation is
not one-to-one, but rather it simply returns some recent offset that is
guaranteed to not come after the most recent checkpoint.

> 11. [...] Should we include the Java doc [...]

Done.

> 12. [...] Not sure if we need three releases.

My thinking is I'd like to release MM2 as soon as possible, but I'm in no
hurry to remove MM1. I'd be open to deprecating parts of MM1 in the first
release, but I don't want an Indiana Jones-style swap of
./bin/kafka-mirror-maker.sh in the near future. I suspect many early
adopters of MM2 will leverage existing Connect clusters to run MM2's
Connectors, which is a use case that doesn't really overlap with existing
MM1 deployments, so I think there is a strong argument for letting them
coexist for a bit.

> 13. [...] specify the log compaction key [...]

Done.

Thanks Dong for the feedback!

Ryanne



On Sun, Jan 13, 2019 at 1:37 AM Dong Lin <lindon...@gmail.com> wrote:

> Hey Ryanne,
>
> Sorry I am late here. Thanks much for all the work! After reading through
> the latest KIP and all the previous discussion, I have some questions
> below:
>
> 1. Currently if there is topic created with "." in the topic name, would it
> cause correctness issue for this KIP? For example, will consumer reads from
> the topic that is not intended, and will API such
> as RemoteClusterUtils.upstreamClusters(...) return wrong string as the
> upstream cluster? If there is correctness issue which reduces the
> usability, we probably should mark it in the Java doc and fix these issues
> before considering the feature as ready and safe to use. Some future work
> such as "propose a special separator character" has been suggested in the
> email thread. Could we document these future work in the KIP if the work is
> necessary for MM2 to be used reliably?
>
> 2. Suppose topic_1 is replicated from cluster A to cluster B. And we have a
> pair of producer and consumer that produces and consumes from topic_1 in
> cluster A. Let's say cluster A crashed and we need to migrate this pair of
> consumer to use cluster B. According to the discussion in the email thread,
> producer will producer to topic_1 in cluster B whereas Consumer will
> consume from A.topic_1 in cluster B. It means that the message produced by
> this producer will not longer be consumed by this consumer. Would this be
> an issue?
>
> 3. Given that the "mechanism to migrate producers or consumers between
> mirrored clusters" is listed as the motivation of this KIP, do we need to
> specify in the KIP the migration procedure which we have discussed in depth
> in the email thread? For example, user needs to additionally
> call offsetsForTimes(...) after getting the timestamp from the checkpoint
> for each.
>
> 4. A lot of class names (e.g. MirrorCheckpointConnector) is included in the
> Public Interface section. Not sure if it is useful to specify the class
> name without specifying the class API and its usage. My understanding is
> that Public Interface section should include 1) minimum amount of the
> information that user can read in order to use the feature provided by the
> KIP; and 2) anything whose change incurs compatibility concern and thus KIP
> discussion. So I am wondering whether we should include in the Public
> Interface section 1) the class API for those classes listed in the section
> (or remove from the KIP if we have not decided the class API); 2) the usage
> of the new scripts (e.g. connect-mirror-maker.sh) with its arguments (e.g.
> this
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%3A+Support+replicas+movement+between+log+directories#KIP-113:Supportreplicasmovementbetweenlogdirectories-Scripts
> >
> example); and 3) schema of the checkpoint topic and heartbeat topic which
> is currently in the Proposed Change section.
>
> 5. It is said in the motivation section that the design includes new
> metrics such as end-to-end replication latency across multiple data
> centers/clusters. Usually we treat metrics as public interface. Could we
> also specify these metrics (e.g. definition, type) in Public Interface
> section similar to this
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-237%3A+More+Controller+Health+Metrics#KIP-237:MoreControllerHealthMetrics-PublicInterfaces
> >
> example.
>
> 6. The KIP says that the RemoteClusterUtils is "subject to change". Are we
> going to have a future KIP to discuss and finalize these APIs before
> committing any code that implements the API? If so, it may be cleaner to
> specify this or even remove this class from the Public Interface section,
> and specify the future KIP in this KIP.
>
> 7. If we are going to determine the API for RemoteClusterUtils in this KIP,
> then I have one comment regarding the
> RemoteClusterUtils.translateOffsets(...). If I understand the discussion in
> the email thread correctly, this API reads the timestamp from the
> checkpoint topic and return it to the caller. If so, it seems more
> intuitive to call this e.g. checkpointTimestamp(...). And the doc probably
> should say "Find the timestamp of the last committed messaged in the source
> cluster...". Otherwise, could you briefly explain in the KIP how this API
> is implemented.
>
> 8. It is said in the email discussion that targetClusterAlias is needed in
> order to migrate consumer from the remote topic back to the source topic
> (i.e. fallback). When a consumer is migrated from A.topic_1 in cluster B to
> topic_1 in cluster A, how does consumer determine the start offset for
> topic_1 in cluster A?
>
> 9. Does RemoteClusterUtils.upstreamClusters(...) return clusters that is no
> longer an upstream cluster? If yes, it seems to reduced the usability of
> the API if user only wants to know the current upstream cluster list. If
> no, could you explain what is the semantics of the API (e.g. when an
> upstream cluster is considered outdated) and how is this API implemented to
> exclude outdated clusters. It may be useful to briefly explain the
> implementation in the proposed change section for those APIs that are not
> straightforward.
>
> 10. It is mentioned in the Remote Cluster Utils section that "it will be
> possible to find high water marks that consumers". Not sure how is high
> watermark used here. Can you explain a bit more, e.g. which API does this
> sentence refer to?
>
> 11. The APIs for the interface ReplicationPolicy is specified without
> explanation or Java doc. It is hard to determine how they can be used and
> whether these APIs are necessary. Should we include the Java doc and put
> this interface in the Public Interface section?
>
> 12. The migration section specifies that this API will be carried in three
> separate Kafka release. Not sure if we need three releases. In the first
> release, we implement this KIP and deprecate old API. In the subsequent
> release, we remove the old API. Does this make sense?
>
> 13. Since the newly added checkpoint topic is log-compacted, I am wondering
> whether we need to specify the log compaction key for this topic in the
> Public Interface section.
>
> Thanks,
> Dong
>

Reply via email to