On Sun, Mar 24, 2019 at 9:54 PM Sijie Guo <guosi...@gmail.com> wrote:
>
> Ivan, Matteo, thank you for the writeup!
>
> I have a few more questions
>
> - How does this handle ack individual vs ack cumulatively? It seems it is
> ignored at this moment. But it is good to have some discussions around how
> to extend the approach to support them and how easy to do that.

Yes, it's stated that current proposal only replicates the mark-delete position.
(Will clarify it better in the wiki)

Of course the markers approach works well with cumulative acks (since they
just moves the mark-delete position), but it will work with individual-acks too
in most of the scenarios.

Keep in mind that in all cases a cluster failover will involve some number
of duplicates (configurable with the frequency of the snapshot).

With individual acks, if all messages are acked within a short amount of time,
for example, 1 second, comparable to the snapshot frequency, then there
will be no problem and no practical difference from the cumulative ack
scenario.

Conversely, if some messages can stay unacked for much longer amount
of time,  while other messages are being acked, that will lead to a larger
amount of duplicates during cluster failover.

Regarding at how support this case better, I replied below in the "alternative
design" answer.

> - Do we need to change the dispatcher, and what are the changes?

This approach does not require any change in the dispatcher code. The
only change in the consumer handling is to filter out the marker messages
since they don't need to go back to consumers.

> - If a region goes down, the approach can't take any snapshots. Does it
> mean "acknowledge" will be kind of suspended until the the region is
> brought back? I guess it is related to how dispatcher is changed to react
> this snapshot.  It it unclear to me from the proposal. It would be good if
> we have more clarifications around it.

First off, to clarify, this issue is only relevant when there are 3 or
more clusters
in the replication set.

If one of the cluster is not reachable, the snapshots will not be
taken. A consumer
will still keep acknowledging locally but these acks won't be
replicated in the other
clusters. Therefore in case of a cluster failover, the subscription
will be rolled back
to a much earlier position.

This is not a problem with 2 clusters since if the other cluster is down, we
we cannot failover to it anyway.

When we have 3+ clusters, though, we can only sustain 1 cluster
failure because, after that,
the snapshot will not make progress.

Typically, though, the purpose of replicated subscriptions is to have
the option to fail out
of a failed cluster, which in this case it will work.

What it won't work would be failing over from A to C when cluster B is
down. To define "won't work"
is that consumer will go to C but will find the cursor to an older
position. No data loss, but potentially
a big number of dups.

This is to protect for the case of messages exchanged between B and C
clusters before B cluster
went down.

In practice we can have operation tools to do a manually override and
ensure snapshots are
taken. It would be interesting to see how this feature would be
getting used and what the
operational pain points will be, before overthinking these problems
upfront and dig too much
in a direction that might not be too relevant in practice.

> Have you guys considered any other alternatives? If you have considered
> other alternatives, it might be worth listing out the alternatives for
> comparisons.

(good point will add to the wiki)

Yes, the main other approach would be to attach the "Original-Message-Id" when
replicating a message.
That would allow to basically keep an additional range-set based on
the Original-Message-Id
as well as the local message id.

The main drawbacks of this approach (compared to the proposal) are:
  * It would only work for individual-acks but not for cumulative acks
  * We need to make more changes to propagate the OriginalMessageId to
consumers,
     so that when it acks we don't have to maintain a mapping

This require some bit of changes in the rangeset tracker. It shouldn't
be terribly hard to
do though, and it's a very "localized" change (easy to create
substantial amount of unit
tests around this custom data structure).

The idea is that this approach would be a good candidate to extend the
current proposal
to support individual-acks on top of the markers to move the
mark-delete position.

> A lot of the challenges are introduced because messages are interleaved
> during cross-region replication. Interleaving might be working for some
> cross-region setup and failover strategies. But it also has challenges in
> supporting other features. I think it might be a good time to rethink the
> cross-region replication and allow different types of cross-region
> replication mechanism exists. So we can introduce a new cross-region
> replication mechanism which avoid interleaving (e.g. via having a
> per-region partition), so that people can choose which replication
> mechanism to use based on their cross-region setup and requirements.
>
> In a non-interleaved cross-region replication mechanism, we can do
> precisely entry-to-entry replication at managed ledger level (via BK's
> LedgerHandleAdv) and maintain 1:1 message id mapping during replication.
> Once we can maintain 1:1 message id mapping, the replicated subscription
> can be simply done via replicating cursors. This approach can also provide
> a few other benefits like a replicated subscription can selectively consume
> messages from a given region only (local region or a remote region). It
> also provides more flexibilities for consumers to do region failover.

I think there are few challenges in the per-region approach:

 1. This would remove a fundamental guarantee that currently Pulsar
replicated topics
     provide: consistency within a single region.
     If we have N logs instead of 1 single log, there's no way to
replay the log and
     reach the same state, or to have 2 consumers in same region to be
consistent.
     This would break several applications currently running on Pulsar.
 2. The topic metadata size will get multiplied by N, with N being the
number of clusters.
     With 8 clusters and lot of topics that would break some existing
deployments.
 3. Even if we specify the same entryId when republishing, we would
have to ensure
     to use a perfectly mirrored ledger in all the regions. What would
happen if a
     ledger is fenced in one of the regions?

This would be on top of a very substantial amount of changes in core parts
of the code that would have be to tested at scale and lastly to figure out
a compatible live migration path.

>  so that people can choose which replication
> mechanism to use based on their cross-region setup and requirements.

Applications can already chose to keep separated the topic, if they wish so. You
just need to make sure to write to a different topic in each region
and subscribe
to all.


I believe the current proposal is a much less risky approach that can be easily
implemented on top of the existing replication infrastructure, providing a
solution that will benefits a lot of use cases from day 1, with, of course, room
to get improved down the road.

Reply via email to