Joe, Matteo What was the conclusion of this discussion? Was there a conclusion? Has the algorithm used in the implementation changed from the initial draft?
-Ivan On Fri, May 3, 2019 at 4:00 AM Joe F <j...@apache.org> wrote: > > Let me try to explain my suggestion better. > > First, about positions in an ordered stream: Consider a simple stream of > events when there is no identifier on each event about its relative > position in the stream. But after every 'n' events there is a ticker event > carrying a monotonically increasing sequence id. > > For eg: after, say, every 4th event, the stream > generator inserts a ticker into the stream. > Then the stream will be like a, a, a, a, (a1), a, > a, a, a, (a2), a, a, a, a (a3) a .... and so on. > A reader reading can establish it's position based on > these "ticker" events. (like freeway mile markers) > > Assertion 1: A ticker position in the stream is deterministic across all > copies of the stream, if all copies have the same event order . > This means reading can be resumed across copies of the same stream , since > positions are deterministic. For eg: if reader on copy X says I am at the > ticker (a2), then the readers position is at (a2) in every other copy, Y or > Z . The reader can stop at reading at (a8) in copy X and resume at (a8) > in copy Y, and do so without loss of events. > > Second, consider a merge operator that merges 'n' _ordered_ input > streams and produce one output stream. > > The operator can be modeled as being fed with with 'n' input readers and > emitting one output. There is no buffering, If the operator gets an input, > it has to write it to the output before it accepts another input from any > of its feeders. > > This merge operator has 2 properties > (1)Input order: the merge operator maintains input order in the > output > i.e. if input A had (a-x) preceding (a-y), then > the output of the merge operator has (a-x) preceding (a-y). > (2)No output order: different merge operators can produce > arbitrary output orders across the same input feeds > i.e. No assumptions can be made that, in the > output, (a-x) will precede (b-y), [..or (b-z) or (c-y) or (c-z) or ..] > > Assertion 2: The merge output can then be represented as an n-tuple of > 'n' individual input positions; Since each input is an ordered stream, > the position within that input sub-stream is deterministic, and a > combination of positions on all inputs is deterministic. > > It follows that (1) the set of input positions can be transferred from one > operator to another, and (2) the output will not lose events across such > transfers and (3) output order may change across such transfer > > Note that there is no assumption or assertions about the _output_ of the > merge operator. We are only asserting this about the input. > > Example > -------------- > For eg: readers P, Q R, are each reading output of different merge > operators. They all process the same three event streams, one generated > from A, one from B, and one from C. > > Then P's merge operator can be represented as three input stream readers > Pa, Pb, Pc who feed into the merge operator P. The operator for P may > produce a different output than the operator for Q, (say because input > readers may progress at different speeds in each operator), but each input > stream reader position is deterministic (by Assertion 1) > > If P has a position at [ Pa(a8), Pb(b1), Pc(c3)] , Q has an equivalent > position [Qa(a8), Qb(b1), Qc(c1)] for its input readers. > Reader on Q can set up input readers .. Qa to a(8), Qb to b(1) > and Qc to c(3), to start feeding into Q > > ----------- > > These two assertions are the invariants in my suggestion. The rest is about > solving two implementation issues, > > (1) How to map the input n-tuple of the merge operator into a specific > position in the output of the merge operator > eg: how to map [Pa(x), Pb(y) Pc(z)] ===> P(j) > > (2) How to resume a reader across the _outputs_ of two merge operators, > without loss of events (and as little duplication), when there are fed > with the same input, but at different rates. > eg: If [Pa(x), Pb(y) Pc(z)] ==> P(j) , then find > position Q(r) <==== [Pa(x), Pb(y) Pc(z)], > > And my thinking is that these two things can be solved similar to the > existing proposal. > > On Wed, May 1, 2019 at 4:10 PM Matteo Merli <mme...@apache.org> wrote: > > > On Mon, Apr 29, 2019 at 1:57 PM Joe F <joefranc...@gmail.com> wrote: > > > > > > I have suggestions for an alternate solution. > > > > > > If source message-ids were known for replicated messages, a composite > > > cursor can be maintained for replicated subscriptions as an n-tuple. > > Since > > > messages are ordered from a source, it would be possible to restart from > > a > > > known cursor n-tuple in any cluster by a combination of cursor > > > positioning _and_ filtering > > > > Knowing the source message id alone is not enough to establish the > > order relationship across all the clusters. I think that would only > > work in the 2 clusters scenario. > > > > In general, for each message being written in region A, both locally > > published or replicated from other regions, we'd have to associate the > > highest (original from the source) message id. While that could be > > easy in the simplest case (broker maintains hashmap of the highest > > source message id from each region), it becomes more difficult to > > consider failure cases in which we have to reconstruct that hashmap > > from the log. > > > > Also, that would require to modify each message before persisting it, > > in the target region. > > > > Finally, the N-tuple of message ids would have to either: > > * Have a mapping in the broker (local-message -> N-tuple) > > * Be pushed to consumers so that they will ack with the complete context > > > > > > > > A simple way to approximate this is for each cluster to insert its own > > > ticker marks into the topic. A ticker carries the messsage id as the > > > message body. The ticker mark can be inserted every 't ' time interval or > > > every 'n' messages as needed. > > > > > > The n-tuple of the tickers from each cluster is a well-known state that > > > can be re-started anywhere by proper positioning and filtering > > > > > > That is a simpler solution for users to understand and trouble-shoot. It > > > would be resilient to cluster failures, and does NOT require all clusters > > > to be up, to determine cursor position. No cross-cluster > > > communication/ordering is needed. > > > > I don't think this approach can remove the requirement of "all the > > clusters to be up" because the information in A won't have any context > > on what was exchanged between B and C. > > > > One quick example: > > * Message c2 was replicated to B but not A. Then C goes offline. > > * A tuple will be something like (a3, b4, c1) > > * In region B, b4 was actually written *after* c2, but c2 doesn't get > > sent from B to A, since it was already replicated itself. > > * If we do a failover from A to B considering a3 ~= b4 we would be > > missing the message c2 > >