I'm very sorry, I had a bug in the InversePreReducer. It should be
fixed now. Can you please run it again?

I also tried to reproduce some of your performance numbers, but I'm
getting only less than 1/10th of yours. For example, in the Tumbling
case, Current/Reduce produces only ~100000 for me. Do you have any
idea what I could be doing wrong? My code:
http://pastebin.com/zbEjmGhk
I am running it on a 2 GHz Core i7.

Best regards,
Gabor


2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:
> Hi,
> I also ran the tests on top of PR 856 (inverse reducer) now. The results
> seem incorrect. When I insert a Thread.sleep(1) in the tuple source, all
> the previous tests reported around 3600 tuples (Size 5 sec, Slide 1 sec)
> (Theoretically there would be 5000 tuples in 5 seconds but this is due to
> overhead). These are the results for the inverse reduce optimisation:
> (Tuple 0,38)
> (Tuple 0,829)
> (Tuple 0,1625)
> (Tuple 0,2424)
> (Tuple 0,3190)
> (Tuple 0,3198)
> (Tuple 0,-339368)
> (Tuple 0,-1315725)
> (Tuple 0,-2932932)
> (Tuple 0,-5082735)
> (Tuple 0,-7743256)
> (Tuple 0,75701046)
> (Tuple 0,642829470)
> (Tuple 0,2242018381)
> (Tuple 0,5190708618)
> (Tuple 0,10060360311)
> (Tuple 0,-94254951)
> (Tuple 0,-219806321293)
> (Tuple 0,-1258895232699)
> (Tuple 0,-4074432596329)
>
> One line is one emitted window count. This is what happens when I remove
> the Thread.sleep(1):
> (Tuple 0,660676)
> (Tuple 0,2553733)
> (Tuple 0,3542696)
> (Tuple 0,1)
> (Tuple 0,1107035)
> (Tuple 0,2549491)
> (Tuple 0,4100387)
> (Tuple 0,-8406583360092)
> (Tuple 0,-8406582150743)
> (Tuple 0,-8406580427190)
> (Tuple 0,-8406580427190)
> (Tuple 0,-8406580427190)
> (Tuple 0,6847279255682044995)
> (Tuple 0,6847279255682044995)
> (Tuple 0,-5390528042713628318)
> (Tuple 0,-5390528042711551780)
> (Tuple 0,-5390528042711551780)
>
> So at some point the pre-reducer seems to go haywire and does not recover
> from it. The good thing is that it does produce results now, where the
> previous Current/Reduce would simply hang and not produce any output.
>
> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <gga...@gmail.com> wrote:
>
>> Hello,
>>
>> Aljoscha, can you please try the performance test of Current/Reduce
>> with the InversePreReducer in PR 856? (If you just call sum, it will
>> use an InversePreReducer.) It would be an interesting test, because
>> the inverse function optimization really depends on the stream being
>> ordered, and I think it has the potential of being faster then
>> Next/Reduce. Especially if the window size is much larger than the
>> slide size.
>>
>> Best regards,
>> Gabor
>>
>>
>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:
>> > I think I'll have to elaborate a bit so I created a proof-of-concept
>> > implementation of my Ideas and ran some throughput measurements to
>> > alleviate concerns about performance.
>> >
>> > First, though, I want to highlight again why the current approach does
>> not
>> > work with out-of-order elements (which, again, occur constantly due to
>> the
>> > distributed nature of the system). This is the example I posted earlier:
>> > https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks
>> like
>> > this:
>> >
>> > +--+
>> > | | Source
>> > +--+
>> > |
>> > +-----+
>> > | |
>> > | +--+
>> > | | | Identity Map
>> > | +--+
>> > | |
>> > +-----+
>> > |
>> > +--+
>> > | | Window
>> > +--+
>> > |
>> > |
>> > +--+
>> > | | Sink
>> > +--+
>> >
>> > So all it does is pass the elements through an identity map and then
>> merge
>> > them again before the window operator. The source emits ascending
>> integers
>> > and the window operator has a custom timestamp extractor that uses the
>> > integer itself as the timestamp and should create windows of size 4 (that
>> > is elements with timestamp 0-3 are one window, the next are the elements
>> > with timestamp 4-8, and so on). Since the topology basically doubles the
>> > elements form the source I would expect to get these windows:
>> > Window: 0, 0, 1, 1, 2, 2, 3, 3
>> > Window: 4, 4, 6, 6, 7, 7, 8, 8
>> >
>> > The output is this, however:
>> > Window: 0, 1, 2, 3,
>> > Window: 4, 0, 1, 2, 3, 4, 5, 6, 7,
>> > Window: 8, 9, 10, 11,
>> > Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
>> > Window: 16, 17, 18, 19,
>> > Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
>> > Window: 24, 25, 26, 27,
>> > Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
>> >
>> > The reason is that the elements simply arrive out-of-order. Imagine what
>> > would happen if the elements actually arrived with some delay from
>> > different operations.
>> >
>> > Now, on to the performance numbers. The proof-of-concept I created is
>> > available here:
>> > https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The
>> basic
>> > idea is that sources assign the current timestamp when emitting elements.
>> > They also periodically emit watermarks that tell us that no elements with
>> > an earlier timestamp will be emitted. The watermarks propagate through
>> the
>> > operators. The window operator looks at the timestamp of an element and
>> > puts it into the buffer that corresponds to that window. When the window
>> > operator receives a watermark it will look at the in-flight windows
>> > (basically the buffers) and emit those windows where the window-end is
>> > before the watermark.
>> >
>> > For measuring throughput I did the following: The source emits tuples of
>> > the form ("tuple", 1) in an infinite loop. The window operator sums up
>> the
>> > tuples, thereby counting how many tuples the window operator can handle
>> in
>> > a given time window. There are two different implementations for the
>> > summation: 1) simply summing up the values in a mapWindow(), there you
>> get
>> > a List of all tuples and simple iterate over it. 2) using sum(1), which
>> is
>> > implemented as a reduce() (that uses the pre-reducer optimisations).
>> >
>> > These are the performance numbers (Current is the current implementation,
>> > Next is my proof-of-concept):
>> >
>> > Tumbling (1 sec):
>> >  - Current/Map: 1.6 mio
>> >  - Current/Reduce: 2 mio
>> >  - Next/Map: 2.2 mio
>> >  - Next/Reduce: 4 mio
>> >
>> > Sliding (5 sec, slide 1 sec):
>> >  - Current/Map: ca 3 mio (fluctuates a lot)
>> >  - Current/Reduce: No output
>> >  - Next/Map: ca 4 mio (fluctuates)
>> >  - Next/Reduce: 10 mio
>> >
>> > The Next/Reduce variant can basically scale indefinitely with window size
>> > because the internal state does not rely on the number of elements (it is
>> > just the current sum). The pre-reducer for sliding elements cannot handle
>> > the amount of tuples, it produces no output. For the two Map variants the
>> > performance fluctuates because they always keep all the elements in an
>> > internal buffer before emission, this seems to tax the garbage collector
>> a
>> > bit and leads to random pauses.
>> >
>> > One thing that should be noted is that I had to disable the fake-element
>> > emission thread, otherwise the Current versions would deadlock.
>> >
>> > So, I started working on this because I thought that out-of-order
>> > processing would be necessary for correctness. And it is certainly, But
>> the
>> > proof-of-concept also shows that performance can be greatly improved.
>> >
>> > On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <gyula.f...@gmail.com> wrote:
>> >>
>> >> I agree lets separate these topics from each other so we can get faster
>> >> resolution.
>> >>
>> >> There is already a state discussion in the thread we started with Paris.
>> >>
>> >> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <ktzou...@apache.org>
>> > wrote:
>> >>
>> >> > I agree with supporting out-of-order out of the box :-), even if this
>> > means
>> >> > a major refactoring. This is the right time to refactor the streaming
>> > API
>> >> > before we pull it out of beta. I think that this is more important
>> than
>> > new
>> >> > features in the streaming API, which can be prioritized once the API
>> is
>> > out
>> >> > of beta (meaning, that IMO this is the right time to stall PRs until
>> we
>> >> > agree on the design).
>> >> >
>> >> > There are three sections in the document: windowing, state, and API.
>> How
>> >> > convoluted are those with each other? Can we separate the discussion
>> or
>> > do
>> >> > we need to discuss those all together? I think part of the difficulty
>> is
>> >> > that we are discussing three design choices at once.
>> >> >
>> >> > On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <ted.dunn...@gmail.com>
>> >> > wrote:
>> >> >
>> >> > > Out of order is ubiquitous in the real-world.  Typically, what
>> > happens is
>> >> > > that businesses will declare a maximum allowable delay for delayed
>> >> > > transactions and will commit to results when that delay is reached.
>> >> > > Transactions that arrive later than this cutoff are collected
>> > specially
>> >> > as
>> >> > > corrections which are reported/used when possible.
>> >> > >
>> >> > > Clearly, ordering can also be violated during processing, but if the
>> > data
>> >> > > is originally out of order the situation can't be repaired by any
>> >> > protocol
>> >> > > fixes that prevent transactions from becoming disordered but has to
>> >> > handled
>> >> > > at the data level.
>> >> > >
>> >> > >
>> >> > >
>> >> > >
>> >> > > On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <
>> aljos...@apache.org
>> >>
>> >> > > wrote:
>> >> > >
>> >> > > > I also don't like big changes but sometimes they are necessary.
>> The
>> >> > > reason
>> >> > > > why I'm so adamant about out-of-order processing is that
>> > out-of-order
>> >> > > > elements are not some exception that occurs once in a while; they
>> > occur
>> >> > > > constantly in a distributed system. For example, in this:
>> >> > > > https://gist.github.com/aljoscha/a367012646ab98208d27 the
>> resulting
>> >> > > > windows
>> >> > > > are completely bogus because the current windowing system assumes
>> >> > > elements
>> >> > > > to globally arrive in order, which is simply not true. (The
>> example
>> >> > has a
>> >> > > > source that generates increasing integers. Then these pass
>> through a
>> >> > map
>> >> > > > and are unioned with the original DataStream before a window
>> > operator.)
>> >> > > > This simulates elements arriving from different operators at a
>> >> > windowing
>> >> > > > operator. The example is also DOP=1, I imagine this to get worse
>> > with
>> >> > > > higher DOP.
>> >> > > >
>> >> > > > What do you mean by costly? As I said, I have a proof-of-concept
>> >> > > windowing
>> >> > > > operator that can handle out-or-order elements. This is an example
>> >> > using
>> >> > > > the current Flink API:
>> >> > > > https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
>> >> > > > (It is an infinite source of tuples and a 5 second window operator
>> > that
>> >> > > > counts the tuples.) The first problem is that this code deadlocks
>> >> > because
>> >> > > > of the thread that emits fake elements. If I disable the fake
>> > element
>> >> > > code
>> >> > > > it works, but the throughput using my mockup is 4 times higher .
>> The
>> >> > gap
>> >> > > > widens dramatically if the window size increases.
>> >> > > >
>> >> > > > So, it actually increases performance (unless I'm making a mistake
>> > in
>> >> > my
>> >> > > > explorations) and can handle elements that arrive out-of-order
>> > (which
>> >> > > > happens basically always in a real-world windowing use-cases).
>> >> > > >
>> >> > > >
>> >> > > > On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <se...@apache.org>
>> wrote:
>> >> > > >
>> >> > > > > What I like a lot about Aljoscha's proposed design is that we
>> > need no
>> >> > > > > different code for "system time" vs. "event time". It only
>> > differs in
>> >> > > > where
>> >> > > > > the timestamps are assigned.
>> >> > > > >
>> >> > > > > The OOP approach also gives you the semantics of total ordering
>> >> > without
>> >> > > > > imposing merges on the streams.
>> >> > > > >
>> >> > > > > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
>> >> > > > > mj...@informatik.hu-berlin.de> wrote:
>> >> > > > >
>> >> > > > > > I agree that there should be multiple alternatives the user(!)
>> > can
>> >> > > > > > choose from. Partial out-of-order processing works for
>> many/most
>> >> > > > > > aggregates. However, if you consider Event-Pattern-Matching,
>> > global
>> >> > > > > > ordering in necessary (even if the performance penalty might
>> be
>> >> > > high).
>> >> > > > > >
>> >> > > > > > I would also keep "system-time windows" as an alternative to
>> >> > "source
>> >> > > > > > assigned ts-windows".
>> >> > > > > >
>> >> > > > > > It might also be interesting to consider the following paper
>> for
>> >> > > > > > overlapping windows: "Resource sharing in continuous
>> > sliding-window
>> >> > > > > > aggregates"
>> >> > > > > >
>> >> > > > > > > https://dl.acm.org/citation.cfm?id=1316720
>> >> > > > > >
>> >> > > > > > -Matthias
>> >> > > > > >
>> >> > > > > > On 06/23/2015 10:37 AM, Gyula Fóra wrote:
>> >> > > > > > > Hey
>> >> > > > > > >
>> >> > > > > > > I think we should not block PRs unnecessarily if your
>> > suggested
>> >> > > > changes
>> >> > > > > > > might touch them at some point.
>> >> > > > > > >
>> >> > > > > > > Also I still think we should not put everything in the
>> > Datastream
>> >> > > > > because
>> >> > > > > > > it will be a huge mess.
>> >> > > > > > >
>> >> > > > > > > Also we need to agree on the out of order processing,
>> whether
>> > we
>> >> > > want
>> >> > > > > it
>> >> > > > > > > the way you proposed it(which is quite costly). Another
>> >> > alternative
>> >> > > > > > > approach there which fits in the current windowing is to
>> > filter
>> >> > out
>> >> > > > if
>> >> > > > > > > order events and apply a special handling operator on them.
>> > This
>> >> > > > would
>> >> > > > > be
>> >> > > > > > > fairly lightweight.
>> >> > > > > > >
>> >> > > > > > > My point is that we need to consider some alternative
>> > solutions.
>> >> > > And
>> >> > > > we
>> >> > > > > > > should not block contributions along the way.
>> >> > > > > > >
>> >> > > > > > > Cheers
>> >> > > > > > > Gyula
>> >> > > > > > >
>> >> > > > > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <
>> >> > > > aljos...@apache.org>
>> >> > > > > > > wrote:
>> >> > > > > > >
>> >> > > > > > >> The reason I posted this now is that we need to think about
>> > the
>> >> > > API
>> >> > > > > and
>> >> > > > > > >> windowing before proceeding with the PRs of Gabor (inverse
>> >> > reduce)
>> >> > > > and
>> >> > > > > > >> Gyula (removal of "aggregate" functions on DataStream).
>> >> > > > > > >>
>> >> > > > > > >> For the windowing, I think that the current model does not
>> > work
>> >> > > for
>> >> > > > > > >> out-of-order processing. Therefore, the whole windowing
>> >> > > > infrastructure
>> >> > > > > > will
>> >> > > > > > >> basically have to be redone. Meaning also that any work on
>> > the
>> >> > > > > > >> pre-aggregators or optimizations that we do now becomes
>> > useless.
>> >> > > > > > >>
>> >> > > > > > >> For the API, I proposed to restructure the interactions
>> > between
>> >> > > all
>> >> > > > > the
>> >> > > > > > >> different *DataStream classes and grouping/windowing. (See
>> > API
>> >> > > > section
>> >> > > > > > of
>> >> > > > > > >> the doc I posted.)
>> >> > > > > > >>
>> >> > > > > > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <
>> gyula.f...@gmail.com
>> >>
>> >> > > > wrote:
>> >> > > > > > >>
>> >> > > > > > >>> Hi Aljoscha,
>> >> > > > > > >>>
>> >> > > > > > >>> Thanks for the nice summary, this is a very good
>> initiative.
>> >> > > > > > >>>
>> >> > > > > > >>> I added some comments to the respective sections (where I
>> > didnt
>> >> > > > fully
>> >> > > > > > >> agree
>> >> > > > > > >>> :).).
>> >> > > > > > >>> At some point I think it would be good to have a public
>> > hangout
>> >> > > > > session
>> >> > > > > > >> on
>> >> > > > > > >>> this, which could make a more dynamic discussion.
>> >> > > > > > >>>
>> >> > > > > > >>> Cheers,
>> >> > > > > > >>> Gyula
>> >> > > > > > >>>
>> >> > > > > > >>> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont:
>> >> > 2015.
>> >> > > > jún.
>> >> > > > > > >> 22.,
>> >> > > > > > >>> H, 21:34):
>> >> > > > > > >>>
>> >> > > > > > >>>> Hi,
>> >> > > > > > >>>> with people proposing changes to the streaming part I
>> also
>> >> > > wanted
>> >> > > > to
>> >> > > > > > >>> throw
>> >> > > > > > >>>> my hat into the ring. :D
>> >> > > > > > >>>>
>> >> > > > > > >>>> During the last few months, while I was getting
>> acquainted
>> >> > with
>> >> > > > the
>> >> > > > > > >>>> streaming system, I wrote down some thoughts I had about
>> > how
>> >> > > > things
>> >> > > > > > >> could
>> >> > > > > > >>>> be improved. Hopefully, they are in somewhat coherent
>> shape
>> >> > now,
>> >> > > > so
>> >> > > > > > >>> please
>> >> > > > > > >>>> have a look if you are interested in this:
>> >> > > > > > >>>>
>> >> > > > > > >>>>
>> >> > > > > > >>>
>> >> > > > > > >>
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >
>> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
>> >> > > > > > >>>>
>> >> > > > > > >>>> This mostly covers:
>> >> > > > > > >>>>  - Timestamps assigned at sources
>> >> > > > > > >>>>  - Out-of-order processing of elements in window
>> operators
>> >> > > > > > >>>>  - API design
>> >> > > > > > >>>>
>> >> > > > > > >>>> Please let me know what you think. Comment in the
>> document
>> > or
>> >> > > here
>> >> > > > > in
>> >> > > > > > >> the
>> >> > > > > > >>>> mailing list.
>> >> > > > > > >>>>
>> >> > > > > > >>>> I have a PR in the makings that would introduce source
>> >> > > timestamps
>> >> > > > > and
>> >> > > > > > >>>> watermarks for keeping track of them. I also hacked a
>> >> > > > > proof-of-concept
>> >> > > > > > >>> of a
>> >> > > > > > >>>> windowing system that is able to process out-of-order
>> > elements
>> >> > > > > using a
>> >> > > > > > >>>> FlatMap operator. (It uses panes to perform efficient
>> >> > > > > > >> pre-aggregations.)
>> >> > > > > > >>>>
>> >> > > > > > >>>> Cheers,
>> >> > > > > > >>>> Aljoscha
>> >> > > > > > >>>>
>> >> > > > > > >>>
>> >> > > > > > >>
>> >> > > > > > >
>> >> > > > > >
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>>

Reply via email to