Hi,
I also ran the tests on top of PR 856 (inverse reducer) now. The results
seem incorrect. When I insert a Thread.sleep(1) in the tuple source, all
the previous tests reported around 3600 tuples (Size 5 sec, Slide 1 sec)
(Theoretically there would be 5000 tuples in 5 seconds but this is due to
overhead). These are the results for the inverse reduce optimisation:
(Tuple 0,38)
(Tuple 0,829)
(Tuple 0,1625)
(Tuple 0,2424)
(Tuple 0,3190)
(Tuple 0,3198)
(Tuple 0,-339368)
(Tuple 0,-1315725)
(Tuple 0,-2932932)
(Tuple 0,-5082735)
(Tuple 0,-7743256)
(Tuple 0,75701046)
(Tuple 0,642829470)
(Tuple 0,2242018381)
(Tuple 0,5190708618)
(Tuple 0,10060360311)
(Tuple 0,-94254951)
(Tuple 0,-219806321293)
(Tuple 0,-1258895232699)
(Tuple 0,-4074432596329)

One line is one emitted window count. This is what happens when I remove
the Thread.sleep(1):
(Tuple 0,660676)
(Tuple 0,2553733)
(Tuple 0,3542696)
(Tuple 0,1)
(Tuple 0,1107035)
(Tuple 0,2549491)
(Tuple 0,4100387)
(Tuple 0,-8406583360092)
(Tuple 0,-8406582150743)
(Tuple 0,-8406580427190)
(Tuple 0,-8406580427190)
(Tuple 0,-8406580427190)
(Tuple 0,6847279255682044995)
(Tuple 0,6847279255682044995)
(Tuple 0,-5390528042713628318)
(Tuple 0,-5390528042711551780)
(Tuple 0,-5390528042711551780)

So at some point the pre-reducer seems to go haywire and does not recover
from it. The good thing is that it does produce results now, where the
previous Current/Reduce would simply hang and not produce any output.

On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <gga...@gmail.com> wrote:

> Hello,
>
> Aljoscha, can you please try the performance test of Current/Reduce
> with the InversePreReducer in PR 856? (If you just call sum, it will
> use an InversePreReducer.) It would be an interesting test, because
> the inverse function optimization really depends on the stream being
> ordered, and I think it has the potential of being faster then
> Next/Reduce. Especially if the window size is much larger than the
> slide size.
>
> Best regards,
> Gabor
>
>
> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:
> > I think I'll have to elaborate a bit so I created a proof-of-concept
> > implementation of my Ideas and ran some throughput measurements to
> > alleviate concerns about performance.
> >
> > First, though, I want to highlight again why the current approach does
> not
> > work with out-of-order elements (which, again, occur constantly due to
> the
> > distributed nature of the system). This is the example I posted earlier:
> > https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks
> like
> > this:
> >
> > +--+
> > | | Source
> > +--+
> > |
> > +-----+
> > | |
> > | +--+
> > | | | Identity Map
> > | +--+
> > | |
> > +-----+
> > |
> > +--+
> > | | Window
> > +--+
> > |
> > |
> > +--+
> > | | Sink
> > +--+
> >
> > So all it does is pass the elements through an identity map and then
> merge
> > them again before the window operator. The source emits ascending
> integers
> > and the window operator has a custom timestamp extractor that uses the
> > integer itself as the timestamp and should create windows of size 4 (that
> > is elements with timestamp 0-3 are one window, the next are the elements
> > with timestamp 4-8, and so on). Since the topology basically doubles the
> > elements form the source I would expect to get these windows:
> > Window: 0, 0, 1, 1, 2, 2, 3, 3
> > Window: 4, 4, 6, 6, 7, 7, 8, 8
> >
> > The output is this, however:
> > Window: 0, 1, 2, 3,
> > Window: 4, 0, 1, 2, 3, 4, 5, 6, 7,
> > Window: 8, 9, 10, 11,
> > Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
> > Window: 16, 17, 18, 19,
> > Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
> > Window: 24, 25, 26, 27,
> > Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
> >
> > The reason is that the elements simply arrive out-of-order. Imagine what
> > would happen if the elements actually arrived with some delay from
> > different operations.
> >
> > Now, on to the performance numbers. The proof-of-concept I created is
> > available here:
> > https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The
> basic
> > idea is that sources assign the current timestamp when emitting elements.
> > They also periodically emit watermarks that tell us that no elements with
> > an earlier timestamp will be emitted. The watermarks propagate through
> the
> > operators. The window operator looks at the timestamp of an element and
> > puts it into the buffer that corresponds to that window. When the window
> > operator receives a watermark it will look at the in-flight windows
> > (basically the buffers) and emit those windows where the window-end is
> > before the watermark.
> >
> > For measuring throughput I did the following: The source emits tuples of
> > the form ("tuple", 1) in an infinite loop. The window operator sums up
> the
> > tuples, thereby counting how many tuples the window operator can handle
> in
> > a given time window. There are two different implementations for the
> > summation: 1) simply summing up the values in a mapWindow(), there you
> get
> > a List of all tuples and simple iterate over it. 2) using sum(1), which
> is
> > implemented as a reduce() (that uses the pre-reducer optimisations).
> >
> > These are the performance numbers (Current is the current implementation,
> > Next is my proof-of-concept):
> >
> > Tumbling (1 sec):
> >  - Current/Map: 1.6 mio
> >  - Current/Reduce: 2 mio
> >  - Next/Map: 2.2 mio
> >  - Next/Reduce: 4 mio
> >
> > Sliding (5 sec, slide 1 sec):
> >  - Current/Map: ca 3 mio (fluctuates a lot)
> >  - Current/Reduce: No output
> >  - Next/Map: ca 4 mio (fluctuates)
> >  - Next/Reduce: 10 mio
> >
> > The Next/Reduce variant can basically scale indefinitely with window size
> > because the internal state does not rely on the number of elements (it is
> > just the current sum). The pre-reducer for sliding elements cannot handle
> > the amount of tuples, it produces no output. For the two Map variants the
> > performance fluctuates because they always keep all the elements in an
> > internal buffer before emission, this seems to tax the garbage collector
> a
> > bit and leads to random pauses.
> >
> > One thing that should be noted is that I had to disable the fake-element
> > emission thread, otherwise the Current versions would deadlock.
> >
> > So, I started working on this because I thought that out-of-order
> > processing would be necessary for correctness. And it is certainly, But
> the
> > proof-of-concept also shows that performance can be greatly improved.
> >
> > On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <gyula.f...@gmail.com> wrote:
> >>
> >> I agree lets separate these topics from each other so we can get faster
> >> resolution.
> >>
> >> There is already a state discussion in the thread we started with Paris.
> >>
> >> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <ktzou...@apache.org>
> > wrote:
> >>
> >> > I agree with supporting out-of-order out of the box :-), even if this
> > means
> >> > a major refactoring. This is the right time to refactor the streaming
> > API
> >> > before we pull it out of beta. I think that this is more important
> than
> > new
> >> > features in the streaming API, which can be prioritized once the API
> is
> > out
> >> > of beta (meaning, that IMO this is the right time to stall PRs until
> we
> >> > agree on the design).
> >> >
> >> > There are three sections in the document: windowing, state, and API.
> How
> >> > convoluted are those with each other? Can we separate the discussion
> or
> > do
> >> > we need to discuss those all together? I think part of the difficulty
> is
> >> > that we are discussing three design choices at once.
> >> >
> >> > On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <ted.dunn...@gmail.com>
> >> > wrote:
> >> >
> >> > > Out of order is ubiquitous in the real-world.  Typically, what
> > happens is
> >> > > that businesses will declare a maximum allowable delay for delayed
> >> > > transactions and will commit to results when that delay is reached.
> >> > > Transactions that arrive later than this cutoff are collected
> > specially
> >> > as
> >> > > corrections which are reported/used when possible.
> >> > >
> >> > > Clearly, ordering can also be violated during processing, but if the
> > data
> >> > > is originally out of order the situation can't be repaired by any
> >> > protocol
> >> > > fixes that prevent transactions from becoming disordered but has to
> >> > handled
> >> > > at the data level.
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <
> aljos...@apache.org
> >>
> >> > > wrote:
> >> > >
> >> > > > I also don't like big changes but sometimes they are necessary.
> The
> >> > > reason
> >> > > > why I'm so adamant about out-of-order processing is that
> > out-of-order
> >> > > > elements are not some exception that occurs once in a while; they
> > occur
> >> > > > constantly in a distributed system. For example, in this:
> >> > > > https://gist.github.com/aljoscha/a367012646ab98208d27 the
> resulting
> >> > > > windows
> >> > > > are completely bogus because the current windowing system assumes
> >> > > elements
> >> > > > to globally arrive in order, which is simply not true. (The
> example
> >> > has a
> >> > > > source that generates increasing integers. Then these pass
> through a
> >> > map
> >> > > > and are unioned with the original DataStream before a window
> > operator.)
> >> > > > This simulates elements arriving from different operators at a
> >> > windowing
> >> > > > operator. The example is also DOP=1, I imagine this to get worse
> > with
> >> > > > higher DOP.
> >> > > >
> >> > > > What do you mean by costly? As I said, I have a proof-of-concept
> >> > > windowing
> >> > > > operator that can handle out-or-order elements. This is an example
> >> > using
> >> > > > the current Flink API:
> >> > > > https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
> >> > > > (It is an infinite source of tuples and a 5 second window operator
> > that
> >> > > > counts the tuples.) The first problem is that this code deadlocks
> >> > because
> >> > > > of the thread that emits fake elements. If I disable the fake
> > element
> >> > > code
> >> > > > it works, but the throughput using my mockup is 4 times higher .
> The
> >> > gap
> >> > > > widens dramatically if the window size increases.
> >> > > >
> >> > > > So, it actually increases performance (unless I'm making a mistake
> > in
> >> > my
> >> > > > explorations) and can handle elements that arrive out-of-order
> > (which
> >> > > > happens basically always in a real-world windowing use-cases).
> >> > > >
> >> > > >
> >> > > > On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <se...@apache.org>
> wrote:
> >> > > >
> >> > > > > What I like a lot about Aljoscha's proposed design is that we
> > need no
> >> > > > > different code for "system time" vs. "event time". It only
> > differs in
> >> > > > where
> >> > > > > the timestamps are assigned.
> >> > > > >
> >> > > > > The OOP approach also gives you the semantics of total ordering
> >> > without
> >> > > > > imposing merges on the streams.
> >> > > > >
> >> > > > > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
> >> > > > > mj...@informatik.hu-berlin.de> wrote:
> >> > > > >
> >> > > > > > I agree that there should be multiple alternatives the user(!)
> > can
> >> > > > > > choose from. Partial out-of-order processing works for
> many/most
> >> > > > > > aggregates. However, if you consider Event-Pattern-Matching,
> > global
> >> > > > > > ordering in necessary (even if the performance penalty might
> be
> >> > > high).
> >> > > > > >
> >> > > > > > I would also keep "system-time windows" as an alternative to
> >> > "source
> >> > > > > > assigned ts-windows".
> >> > > > > >
> >> > > > > > It might also be interesting to consider the following paper
> for
> >> > > > > > overlapping windows: "Resource sharing in continuous
> > sliding-window
> >> > > > > > aggregates"
> >> > > > > >
> >> > > > > > > https://dl.acm.org/citation.cfm?id=1316720
> >> > > > > >
> >> > > > > > -Matthias
> >> > > > > >
> >> > > > > > On 06/23/2015 10:37 AM, Gyula Fóra wrote:
> >> > > > > > > Hey
> >> > > > > > >
> >> > > > > > > I think we should not block PRs unnecessarily if your
> > suggested
> >> > > > changes
> >> > > > > > > might touch them at some point.
> >> > > > > > >
> >> > > > > > > Also I still think we should not put everything in the
> > Datastream
> >> > > > > because
> >> > > > > > > it will be a huge mess.
> >> > > > > > >
> >> > > > > > > Also we need to agree on the out of order processing,
> whether
> > we
> >> > > want
> >> > > > > it
> >> > > > > > > the way you proposed it(which is quite costly). Another
> >> > alternative
> >> > > > > > > approach there which fits in the current windowing is to
> > filter
> >> > out
> >> > > > if
> >> > > > > > > order events and apply a special handling operator on them.
> > This
> >> > > > would
> >> > > > > be
> >> > > > > > > fairly lightweight.
> >> > > > > > >
> >> > > > > > > My point is that we need to consider some alternative
> > solutions.
> >> > > And
> >> > > > we
> >> > > > > > > should not block contributions along the way.
> >> > > > > > >
> >> > > > > > > Cheers
> >> > > > > > > Gyula
> >> > > > > > >
> >> > > > > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <
> >> > > > aljos...@apache.org>
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > >> The reason I posted this now is that we need to think about
> > the
> >> > > API
> >> > > > > and
> >> > > > > > >> windowing before proceeding with the PRs of Gabor (inverse
> >> > reduce)
> >> > > > and
> >> > > > > > >> Gyula (removal of "aggregate" functions on DataStream).
> >> > > > > > >>
> >> > > > > > >> For the windowing, I think that the current model does not
> > work
> >> > > for
> >> > > > > > >> out-of-order processing. Therefore, the whole windowing
> >> > > > infrastructure
> >> > > > > > will
> >> > > > > > >> basically have to be redone. Meaning also that any work on
> > the
> >> > > > > > >> pre-aggregators or optimizations that we do now becomes
> > useless.
> >> > > > > > >>
> >> > > > > > >> For the API, I proposed to restructure the interactions
> > between
> >> > > all
> >> > > > > the
> >> > > > > > >> different *DataStream classes and grouping/windowing. (See
> > API
> >> > > > section
> >> > > > > > of
> >> > > > > > >> the doc I posted.)
> >> > > > > > >>
> >> > > > > > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <
> gyula.f...@gmail.com
> >>
> >> > > > wrote:
> >> > > > > > >>
> >> > > > > > >>> Hi Aljoscha,
> >> > > > > > >>>
> >> > > > > > >>> Thanks for the nice summary, this is a very good
> initiative.
> >> > > > > > >>>
> >> > > > > > >>> I added some comments to the respective sections (where I
> > didnt
> >> > > > fully
> >> > > > > > >> agree
> >> > > > > > >>> :).).
> >> > > > > > >>> At some point I think it would be good to have a public
> > hangout
> >> > > > > session
> >> > > > > > >> on
> >> > > > > > >>> this, which could make a more dynamic discussion.
> >> > > > > > >>>
> >> > > > > > >>> Cheers,
> >> > > > > > >>> Gyula
> >> > > > > > >>>
> >> > > > > > >>> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont:
> >> > 2015.
> >> > > > jún.
> >> > > > > > >> 22.,
> >> > > > > > >>> H, 21:34):
> >> > > > > > >>>
> >> > > > > > >>>> Hi,
> >> > > > > > >>>> with people proposing changes to the streaming part I
> also
> >> > > wanted
> >> > > > to
> >> > > > > > >>> throw
> >> > > > > > >>>> my hat into the ring. :D
> >> > > > > > >>>>
> >> > > > > > >>>> During the last few months, while I was getting
> acquainted
> >> > with
> >> > > > the
> >> > > > > > >>>> streaming system, I wrote down some thoughts I had about
> > how
> >> > > > things
> >> > > > > > >> could
> >> > > > > > >>>> be improved. Hopefully, they are in somewhat coherent
> shape
> >> > now,
> >> > > > so
> >> > > > > > >>> please
> >> > > > > > >>>> have a look if you are interested in this:
> >> > > > > > >>>>
> >> > > > > > >>>>
> >> > > > > > >>>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
> >> > > > > > >>>>
> >> > > > > > >>>> This mostly covers:
> >> > > > > > >>>>  - Timestamps assigned at sources
> >> > > > > > >>>>  - Out-of-order processing of elements in window
> operators
> >> > > > > > >>>>  - API design
> >> > > > > > >>>>
> >> > > > > > >>>> Please let me know what you think. Comment in the
> document
> > or
> >> > > here
> >> > > > > in
> >> > > > > > >> the
> >> > > > > > >>>> mailing list.
> >> > > > > > >>>>
> >> > > > > > >>>> I have a PR in the makings that would introduce source
> >> > > timestamps
> >> > > > > and
> >> > > > > > >>>> watermarks for keeping track of them. I also hacked a
> >> > > > > proof-of-concept
> >> > > > > > >>> of a
> >> > > > > > >>>> windowing system that is able to process out-of-order
> > elements
> >> > > > > using a
> >> > > > > > >>>> FlatMap operator. (It uses panes to perform efficient
> >> > > > > > >> pre-aggregations.)
> >> > > > > > >>>>
> >> > > > > > >>>> Cheers,
> >> > > > > > >>>> Aljoscha
> >> > > > > > >>>>
> >> > > > > > >>>
> >> > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
>

Reply via email to