On Sun, Mar 24, 2019 at 9:54 PM Sijie Guo <guosi...@gmail.com> wrote: > > Ivan, Matteo, thank you for the writeup! > > I have a few more questions > > - How does this handle ack individual vs ack cumulatively? It seems it is > ignored at this moment. But it is good to have some discussions around how > to extend the approach to support them and how easy to do that.
Yes, it's stated that current proposal only replicates the mark-delete position. (Will clarify it better in the wiki) Of course the markers approach works well with cumulative acks (since they just moves the mark-delete position), but it will work with individual-acks too in most of the scenarios. Keep in mind that in all cases a cluster failover will involve some number of duplicates (configurable with the frequency of the snapshot). With individual acks, if all messages are acked within a short amount of time, for example, 1 second, comparable to the snapshot frequency, then there will be no problem and no practical difference from the cumulative ack scenario. Conversely, if some messages can stay unacked for much longer amount of time, while other messages are being acked, that will lead to a larger amount of duplicates during cluster failover. Regarding at how support this case better, I replied below in the "alternative design" answer. > - Do we need to change the dispatcher, and what are the changes? This approach does not require any change in the dispatcher code. The only change in the consumer handling is to filter out the marker messages since they don't need to go back to consumers. > - If a region goes down, the approach can't take any snapshots. Does it > mean "acknowledge" will be kind of suspended until the the region is > brought back? I guess it is related to how dispatcher is changed to react > this snapshot. It it unclear to me from the proposal. It would be good if > we have more clarifications around it. First off, to clarify, this issue is only relevant when there are 3 or more clusters in the replication set. If one of the cluster is not reachable, the snapshots will not be taken. A consumer will still keep acknowledging locally but these acks won't be replicated in the other clusters. Therefore in case of a cluster failover, the subscription will be rolled back to a much earlier position. This is not a problem with 2 clusters since if the other cluster is down, we we cannot failover to it anyway. When we have 3+ clusters, though, we can only sustain 1 cluster failure because, after that, the snapshot will not make progress. Typically, though, the purpose of replicated subscriptions is to have the option to fail out of a failed cluster, which in this case it will work. What it won't work would be failing over from A to C when cluster B is down. To define "won't work" is that consumer will go to C but will find the cursor to an older position. No data loss, but potentially a big number of dups. This is to protect for the case of messages exchanged between B and C clusters before B cluster went down. In practice we can have operation tools to do a manually override and ensure snapshots are taken. It would be interesting to see how this feature would be getting used and what the operational pain points will be, before overthinking these problems upfront and dig too much in a direction that might not be too relevant in practice. > Have you guys considered any other alternatives? If you have considered > other alternatives, it might be worth listing out the alternatives for > comparisons. (good point will add to the wiki) Yes, the main other approach would be to attach the "Original-Message-Id" when replicating a message. That would allow to basically keep an additional range-set based on the Original-Message-Id as well as the local message id. The main drawbacks of this approach (compared to the proposal) are: * It would only work for individual-acks but not for cumulative acks * We need to make more changes to propagate the OriginalMessageId to consumers, so that when it acks we don't have to maintain a mapping This require some bit of changes in the rangeset tracker. It shouldn't be terribly hard to do though, and it's a very "localized" change (easy to create substantial amount of unit tests around this custom data structure). The idea is that this approach would be a good candidate to extend the current proposal to support individual-acks on top of the markers to move the mark-delete position. > A lot of the challenges are introduced because messages are interleaved > during cross-region replication. Interleaving might be working for some > cross-region setup and failover strategies. But it also has challenges in > supporting other features. I think it might be a good time to rethink the > cross-region replication and allow different types of cross-region > replication mechanism exists. So we can introduce a new cross-region > replication mechanism which avoid interleaving (e.g. via having a > per-region partition), so that people can choose which replication > mechanism to use based on their cross-region setup and requirements. > > In a non-interleaved cross-region replication mechanism, we can do > precisely entry-to-entry replication at managed ledger level (via BK's > LedgerHandleAdv) and maintain 1:1 message id mapping during replication. > Once we can maintain 1:1 message id mapping, the replicated subscription > can be simply done via replicating cursors. This approach can also provide > a few other benefits like a replicated subscription can selectively consume > messages from a given region only (local region or a remote region). It > also provides more flexibilities for consumers to do region failover. I think there are few challenges in the per-region approach: 1. This would remove a fundamental guarantee that currently Pulsar replicated topics provide: consistency within a single region. If we have N logs instead of 1 single log, there's no way to replay the log and reach the same state, or to have 2 consumers in same region to be consistent. This would break several applications currently running on Pulsar. 2. The topic metadata size will get multiplied by N, with N being the number of clusters. With 8 clusters and lot of topics that would break some existing deployments. 3. Even if we specify the same entryId when republishing, we would have to ensure to use a perfectly mirrored ledger in all the regions. What would happen if a ledger is fenced in one of the regions? This would be on top of a very substantial amount of changes in core parts of the code that would have be to tested at scale and lastly to figure out a compatible live migration path. > so that people can choose which replication > mechanism to use based on their cross-region setup and requirements. Applications can already chose to keep separated the topic, if they wish so. You just need to make sure to write to a different topic in each region and subscribe to all. I believe the current proposal is a much less risky approach that can be easily implemented on top of the existing replication infrastructure, providing a solution that will benefits a lot of use cases from day 1, with, of course, room to get improved down the road.