I have suggestions for an alternate solution. If source message-ids were known for replicated messages, a composite cursor can be maintained for replicated subscriptions as an n-tuple. Since messages are ordered from a source, it would be possible to restart from a known cursor n-tuple in any cluster by a combination of cursor positioning _and_ filtering
A simple way to approximate this is for each cluster to insert its own ticker marks into the topic. A ticker carries the messsage id as the message body. The ticker mark can be inserted every 't ' time interval or every 'n' messages as needed. The n-tuple of the tickers from each cluster is a well-known state that can be re-started anywhere by proper positioning and filtering That is a simpler solution for users to understand and trouble-shoot. It would be resilient to cluster failures, and does NOT require all clusters to be up, to determine cursor position. No cross-cluster communication/ordering is needed. But it will require skipping messages from specific sources as needed, and storing the n-tuple as part of cursor state Joe On Mon, Mar 25, 2019 at 10:24 PM Sijie Guo <guosi...@gmail.com> wrote: > On Mon, Mar 25, 2019 at 4:14 PM Matteo Merli <mme...@apache.org> wrote: > > > On Sun, Mar 24, 2019 at 9:54 PM Sijie Guo <guosi...@gmail.com> wrote: > > > > > > Ivan, Matteo, thank you for the writeup! > > > > > > I have a few more questions > > > > > > - How does this handle ack individual vs ack cumulatively? It seems it > is > > > ignored at this moment. But it is good to have some discussions around > > how > > > to extend the approach to support them and how easy to do that. > > > > Yes, it's stated that current proposal only replicates the mark-delete > > position. > > (Will clarify it better in the wiki) > > > > Of course the markers approach works well with cumulative acks (since > they > > just moves the mark-delete position), but it will work with > > individual-acks too > > in most of the scenarios. > > > > Keep in mind that in all cases a cluster failover will involve some > number > > of duplicates (configurable with the frequency of the snapshot). > > > > With individual acks, if all messages are acked within a short amount of > > time, > > for example, 1 second, comparable to the snapshot frequency, then there > > will be no problem and no practical difference from the cumulative ack > > scenario. > > > > Conversely, if some messages can stay unacked for much longer amount > > of time, while other messages are being acked, that will lead to a > larger > > amount of duplicates during cluster failover. > > > > Regarding at how support this case better, I replied below in the > > "alternative > > design" answer. > > > > > - Do we need to change the dispatcher, and what are the changes? > > > > This approach does not require any change in the dispatcher code. The > > only change in the consumer handling is to filter out the marker messages > > since they don't need to go back to consumers. > > > > > How does this > > > > > > > - If a region goes down, the approach can't take any snapshots. Does it > > > mean "acknowledge" will be kind of suspended until the the region is > > > brought back? I guess it is related to how dispatcher is changed to > react > > > this snapshot. It it unclear to me from the proposal. It would be good > > if > > > we have more clarifications around it. > > > > First off, to clarify, this issue is only relevant when there are 3 or > > more clusters > > in the replication set. > > > > If one of the cluster is not reachable, the snapshots will not be > > taken. A consumer > > will still keep acknowledging locally but these acks won't be > > replicated in the other > > clusters. Therefore in case of a cluster failover, the subscription > > will be rolled back > > to a much earlier position. > > > > This is not a problem with 2 clusters since if the other cluster is down, > > we > > we cannot failover to it anyway. > > > > The question here will be more about how to fail back. If a snapshot is not > taken, then nothing is *exchanged* > between the clusters. How does this proposal handle failing back? > > In other words, what are the sequences for people to do failover and > failback? > It would be good to have an example to demonstrate the sequences, so that > users will have a clear picture on how to use this feature. > > > > > > When we have 3+ clusters, though, we can only sustain 1 cluster > > failure because, after that, > > the snapshot will not make progress. > > > > Typically, though, the purpose of replicated subscriptions is to have > > the option to fail out > > of a failed cluster, which in this case it will work. > > > > What it won't work would be failing over from A to C when cluster B is > > down. To define "won't work" > > is that consumer will go to C but will find the cursor to an older > > position. No data loss, but potentially > > a big number of dups. > > > > This is to protect for the case of messages exchanged between B and C > > clusters before B cluster > > went down. > > > > In practice we can have operation tools to do a manually override and > > ensure snapshots are > > taken. It would be interesting to see how this feature would be > > getting used and what the > > operational pain points will be, before overthinking these problems > > upfront and dig too much > > in a direction that might not be too relevant in practice. > > > > > Have you guys considered any other alternatives? If you have considered > > > other alternatives, it might be worth listing out the alternatives for > > > comparisons. > > > > (good point will add to the wiki) > > > > Yes, the main other approach would be to attach the "Original-Message-Id" > > when > > replicating a message. > > That would allow to basically keep an additional range-set based on > > the Original-Message-Id > > as well as the local message id. > > > > The main drawbacks of this approach (compared to the proposal) are: > > * It would only work for individual-acks but not for cumulative acks > > * We need to make more changes to propagate the OriginalMessageId to > > consumers, > > so that when it acks we don't have to maintain a mapping > > > > This require some bit of changes in the rangeset tracker. It shouldn't > > be terribly hard to > > do though, and it's a very "localized" change (easy to create > > substantial amount of unit > > tests around this custom data structure). > > > > The idea is that this approach would be a good candidate to extend the > > current proposal > > to support individual-acks on top of the markers to move the > > mark-delete position. > > > > > A lot of the challenges are introduced because messages are interleaved > > > during cross-region replication. Interleaving might be working for some > > > cross-region setup and failover strategies. But it also has challenges > in > > > supporting other features. I think it might be a good time to rethink > the > > > cross-region replication and allow different types of cross-region > > > replication mechanism exists. So we can introduce a new cross-region > > > replication mechanism which avoid interleaving (e.g. via having a > > > per-region partition), so that people can choose which replication > > > mechanism to use based on their cross-region setup and requirements. > > > > > > In a non-interleaved cross-region replication mechanism, we can do > > > precisely entry-to-entry replication at managed ledger level (via BK's > > > LedgerHandleAdv) and maintain 1:1 message id mapping during > replication. > > > Once we can maintain 1:1 message id mapping, the replicated > subscription > > > can be simply done via replicating cursors. This approach can also > > provide > > > a few other benefits like a replicated subscription can selectively > > consume > > > messages from a given region only (local region or a remote region). It > > > also provides more flexibilities for consumers to do region failover. > > > > I think there are few challenges in the per-region approach: > > > > 1. This would remove a fundamental guarantee that currently Pulsar > > replicated topics > > provide: consistency within a single region. > > If we have N logs instead of 1 single log, there's no way to > > replay the log and > > reach the same state, or to have 2 consumers in same region to be > > consistent. > > This would break several applications currently running on Pulsar. > > > > currently Pulsar only guarantees per-region and per partition ordering. > so technically it doesn't really change any behaviors. However if people > needs an aggregated order of messages from multiple regions, then you > required interleaved replication. that's why I didn't propose changing > the current geo-replication mechanism, instead I propose adding a new > replication mechanism. > > > > > 2. The topic metadata size will get multiplied by N, with N being the > > number of clusters. > > With 8 clusters and lot of topics that would break some existing > > deployments. > > > > Correct. For most of deployments, N is bound to 2~3 clusters. So adding a > new replication mechanism > can probably address the problems for most of the deployments. > > > > 3. Even if we specify the same entryId when republishing, we would > > have to ensure > > to use a perfectly mirrored ledger in all the regions. What would > > happen if a > > ledger is fenced in one of the regions? > > > > This would be on top of a very substantial amount of changes in core > parts > > of the code that would have be to tested at scale and lastly to figure > out > > a compatible live migration path. > > > > Sure. That's why I propose as adding a new replication mechanism not > changing original one. > > > > > > > so that people can choose which replication > > > mechanism to use based on their cross-region setup and requirements. > > > > Applications can already chose to keep separated the topic, if they wish > > so. You > > just need to make sure to write to a different topic in each region > > and subscribe > > to all. > > > > Sure. Applications can do so by themselves. > > > > > > > > I believe the current proposal is a much less risky approach that can be > > easily > > implemented on top of the existing replication infrastructure, providing > a > > solution that will benefits a lot of use cases from day 1, with, of > > course, room > > to get improved down the road. > > > > Sure. I didn't mean proposing another alternative. >