Joe, Matteo

What was the conclusion of this discussion? Was there a conclusion?
Has the algorithm used in the implementation changed from the initial
draft?

-Ivan

On Fri, May 3, 2019 at 4:00 AM Joe F <j...@apache.org> wrote:
>
> Let me try to explain my suggestion better.
>
> First, about positions in an ordered stream:  Consider a simple stream  of
> events when there is no identifier  on each event about its relative
> position in the stream. But after every 'n' events there is a ticker event
> carrying a monotonically increasing sequence id.
>
>                       For eg: after, say, every 4th event,  the stream
> generator inserts a ticker  into the stream.
>                       Then the stream will be like a, a, a,  a,  (a1),  a,
> a, a, a, (a2), a, a, a, a (a3) a .... and so on.
>                     A reader reading can establish it's position based on
> these "ticker" events. (like freeway mile markers)
>
> Assertion 1: A ticker position in the stream is deterministic across all
> copies of the stream, if all copies have the same event order .
> This means reading can be  resumed across copies of the same stream , since
> positions are deterministic. For eg: if reader on copy X says I am at the
> ticker (a2), then the readers position is at (a2) in every other copy, Y or
> Z  . The reader can   stop at reading at (a8) in copy X and resume at (a8)
> in copy Y,  and do so without loss of events.
>
>  Second, consider a  merge operator that merges 'n'  _ordered_ input
> streams and produce one output stream.
>
> The operator can be modeled as being fed with with 'n' input readers and
> emitting one output. There is no buffering, If the operator gets an input,
> it has to write it to the output before it accepts another input from  any
> of its feeders.
>
> This merge operator has 2 properties
>           (1)Input order: the merge operator maintains input order in the
> output
>                            i.e. if input A had (a-x) preceding (a-y), then
> the output of the merge operator has (a-x) preceding (a-y).
>           (2)No output order: different merge operators can produce
> arbitrary output orders across the same input feeds
>                             i.e. No assumptions can be made that, in the
> output,  (a-x) will precede (b-y),   [..or (b-z)  or  (c-y) or (c-z) or ..]
>
> Assertion 2: The merge output can then be  represented as  an n-tuple of
> 'n' individual input  positions;  Since each input is an ordered stream,
> the position  within that input sub-stream  is deterministic, and a
> combination of positions on all inputs is deterministic.
>
> It follows that (1) the set of input positions can be transferred from one
> operator to another, and (2) the output will not lose events across such
> transfers  and (3) output order may change across such transfer
>
> Note that there is no assumption or assertions about the _output_ of the
> merge operator. We are only asserting this about the input.
>
> Example
> --------------
> For eg: readers P, Q R, are each  reading output of different merge
> operators. They all process the same three event streams, one generated
> from A, one from B, and one from C.
>
> Then P's merge operator can be represented as three input stream readers
> Pa, Pb, Pc who feed into  the merge operator P.   The operator for P may
> produce a different output than the operator for Q,  (say because input
> readers may progress at different speeds in each operator), but each  input
> stream reader position is deterministic (by Assertion 1)
>
> If P has a position at [ Pa(a8), Pb(b1), Pc(c3)] ,  Q has an equivalent
> position [Qa(a8), Qb(b1), Qc(c1)] for its input readers.
>           Reader on Q can set up input readers .. Qa to a(8), Qb to b(1)
> and Qc to c(3), to start  feeding into Q
>
> -----------
>
> These two assertions are the invariants in my suggestion. The rest is about
> solving two implementation issues,
>
> (1)  How to map the  input n-tuple of the merge operator into  a  specific
> position in the output of the merge operator
>                         eg: how to map [Pa(x), Pb(y) Pc(z)]  ===>  P(j)
>
> (2) How to resume a reader  across  the _outputs_ of  two merge operators,
> without loss of events (and as little duplication),  when there are fed
> with the same input, but at different rates.
>                      eg: If  [Pa(x), Pb(y) Pc(z)] ==> P(j) , then find
>  position Q(r) <==== [Pa(x), Pb(y) Pc(z)],
>
> And my thinking is that these two things can be solved similar to the
> existing proposal.
>
> On Wed, May 1, 2019 at 4:10 PM Matteo Merli <mme...@apache.org> wrote:
>
> > On Mon, Apr 29, 2019 at 1:57 PM Joe F <joefranc...@gmail.com> wrote:
> > >
> > > I have suggestions for an alternate solution.
> > >
> > > If source message-ids were known for replicated messages, a composite
> > > cursor can be maintained for replicated subscriptions as an n-tuple.
> > Since
> > > messages are ordered from a source, it would be possible to restart from
> > a
> > > known cursor n-tuple in any cluster by  a combination of cursor
> > > positioning  _and_ filtering
> >
> > Knowing the source message id alone is not enough to establish the
> > order relationship across all the clusters. I think that would only
> > work in the 2 clusters scenario.
> >
> > In general, for each message being written in region A, both locally
> > published or replicated from other regions, we'd have to associate the
> > highest (original from the source) message id. While that could be
> > easy in the simplest case (broker maintains hashmap of the highest
> > source message id from each region), it becomes more difficult to
> > consider failure cases in which we have to reconstruct that hashmap
> > from the log.
> >
> > Also, that would require to modify each message before persisting it,
> > in the target region.
> >
> > Finally, the N-tuple of message ids would have to either:
> >  * Have a mapping in the broker (local-message -> N-tuple)
> >  * Be pushed to consumers so that they will ack with the complete context
> >
> > >
> > > A simple way to approximate this is for each cluster to insert its own
> > > ticker marks into the topic. A ticker carries the messsage id as the
> > > message body. The ticker mark can be inserted every 't ' time interval or
> > > every 'n' messages as needed.
> > >
> > > The n-tuple of the tickers from each cluster is a well-known state  that
> > > can be re-started anywhere by proper positioning and filtering
> > >
> > > That is a simpler solution for users to understand and trouble-shoot.  It
> > > would be resilient to cluster failures, and does NOT require all clusters
> > > to be up, to determine cursor position. No cross-cluster
> > > communication/ordering is needed.
> >
> > I don't think this approach can remove the requirement of "all the
> > clusters to be up" because the information in A won't have any context
> > on what was exchanged between B and C.
> >
> > One quick example:
> >  * Message c2 was replicated to B but not A. Then C goes offline.
> >  * A tuple will be something like (a3, b4, c1)
> >  * In region B, b4 was actually written *after* c2, but c2 doesn't get
> > sent from B to A, since it was already replicated itself.
> >  * If we do a failover from A to B considering a3 ~= b4 we would be
> > missing the message c2
> >

Reply via email to