Thanks for writing this up and comparing to the current implementation. It's 
great to see that your mockup indicates correct/expected behaviour *and* better 
performance. :-)

Regarding the results for the current mechanism: does this problem affects all 
window operators?

– Ufuk

On 25 Jun 2015, at 11:36, Aljoscha Krettek <aljos...@apache.org> wrote:

> I think I'll have to elaborate a bit so I created a proof-of-concept
> implementation of my Ideas and ran some throughput measurements to
> alleviate concerns about performance.
> 
> First, though, I want to highlight again why the current approach does not
> work with out-of-order elements (which, again, occur constantly due to the
> distributed nature of the system). This is the example I posted earlier:
> https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks like
> this:
> 
> +--+
> | | Source
> +--+
> |
> +-----+
> | |
> | +--+
> | | | Identity Map
> | +--+
> | |
> +-----+
> |
> +--+
> | | Window
> +--+
> |
> |
> +--+
> | | Sink
> +--+
> 
> So all it does is pass the elements through an identity map and then merge
> them again before the window operator. The source emits ascending integers
> and the window operator has a custom timestamp extractor that uses the
> integer itself as the timestamp and should create windows of size 4 (that
> is elements with timestamp 0-3 are one window, the next are the elements
> with timestamp 4-8, and so on). Since the topology basically doubles the
> elements form the source I would expect to get these windows:
> Window: 0, 0, 1, 1, 2, 2, 3, 3
> Window: 4, 4, 6, 6, 7, 7, 8, 8
> 
> The output is this, however:
> Window: 0, 1, 2, 3,
> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7,
> Window: 8, 9, 10, 11,
> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
> Window: 16, 17, 18, 19,
> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
> Window: 24, 25, 26, 27,
> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
> 
> The reason is that the elements simply arrive out-of-order. Imagine what
> would happen if the elements actually arrived with some delay from
> different operations.
> 
> Now, on to the performance numbers. The proof-of-concept I created is
> available here:
> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The basic
> idea is that sources assign the current timestamp when emitting elements.
> They also periodically emit watermarks that tell us that no elements with
> an earlier timestamp will be emitted. The watermarks propagate through the
> operators. The window operator looks at the timestamp of an element and
> puts it into the buffer that corresponds to that window. When the window
> operator receives a watermark it will look at the in-flight windows
> (basically the buffers) and emit those windows where the window-end is
> before the watermark.
> 
> For measuring throughput I did the following: The source emits tuples of
> the form ("tuple", 1) in an infinite loop. The window operator sums up the
> tuples, thereby counting how many tuples the window operator can handle in
> a given time window. There are two different implementations for the
> summation: 1) simply summing up the values in a mapWindow(), there you get
> a List of all tuples and simple iterate over it. 2) using sum(1), which is
> implemented as a reduce() (that uses the pre-reducer optimisations).
> 
> These are the performance numbers (Current is the current implementation,
> Next is my proof-of-concept):
> 
> Tumbling (1 sec):
> - Current/Map: 1.6 mio
> - Current/Reduce: 2 mio
> - Next/Map: 2.2 mio
> - Next/Reduce: 4 mio
> 
> Sliding (5 sec, slide 1 sec):
> - Current/Map: ca 3 mio (fluctuates a lot)
> - Current/Reduce: No output
> - Next/Map: ca 4 mio (fluctuates)
> - Next/Reduce: 10 mio
> 
> The Next/Reduce variant can basically scale indefinitely with window size
> because the internal state does not rely on the number of elements (it is
> just the current sum). The pre-reducer for sliding elements cannot handle
> the amount of tuples, it produces no output. For the two Map variants the
> performance fluctuates because they always keep all the elements in an
> internal buffer before emission, this seems to tax the garbage collector a
> bit and leads to random pauses.
> 
> One thing that should be noted is that I had to disable the fake-element
> emission thread, otherwise the Current versions would deadlock.
> 
> So, I started working on this because I thought that out-of-order
> processing would be necessary for correctness. And it is certainly, But the
> proof-of-concept also shows that performance can be greatly improved.
> 
> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <gyula.f...@gmail.com> wrote:
>> 
>> I agree lets separate these topics from each other so we can get faster
>> resolution.
>> 
>> There is already a state discussion in the thread we started with Paris.
>> 
>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <ktzou...@apache.org>
> wrote:
>> 
>>> I agree with supporting out-of-order out of the box :-), even if this
> means
>>> a major refactoring. This is the right time to refactor the streaming
> API
>>> before we pull it out of beta. I think that this is more important than
> new
>>> features in the streaming API, which can be prioritized once the API is
> out
>>> of beta (meaning, that IMO this is the right time to stall PRs until we
>>> agree on the design).
>>> 
>>> There are three sections in the document: windowing, state, and API. How
>>> convoluted are those with each other? Can we separate the discussion or
> do
>>> we need to discuss those all together? I think part of the difficulty is
>>> that we are discussing three design choices at once.
>>> 
>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <ted.dunn...@gmail.com>
>>> wrote:
>>> 
>>>> Out of order is ubiquitous in the real-world.  Typically, what
> happens is
>>>> that businesses will declare a maximum allowable delay for delayed
>>>> transactions and will commit to results when that delay is reached.
>>>> Transactions that arrive later than this cutoff are collected
> specially
>>> as
>>>> corrections which are reported/used when possible.
>>>> 
>>>> Clearly, ordering can also be violated during processing, but if the
> data
>>>> is originally out of order the situation can't be repaired by any
>>> protocol
>>>> fixes that prevent transactions from becoming disordered but has to
>>> handled
>>>> at the data level.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <aljos...@apache.org
>> 
>>>> wrote:
>>>> 
>>>>> I also don't like big changes but sometimes they are necessary. The
>>>> reason
>>>>> why I'm so adamant about out-of-order processing is that
> out-of-order
>>>>> elements are not some exception that occurs once in a while; they
> occur
>>>>> constantly in a distributed system. For example, in this:
>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 the resulting
>>>>> windows
>>>>> are completely bogus because the current windowing system assumes
>>>> elements
>>>>> to globally arrive in order, which is simply not true. (The example
>>> has a
>>>>> source that generates increasing integers. Then these pass through a
>>> map
>>>>> and are unioned with the original DataStream before a window
> operator.)
>>>>> This simulates elements arriving from different operators at a
>>> windowing
>>>>> operator. The example is also DOP=1, I imagine this to get worse
> with
>>>>> higher DOP.
>>>>> 
>>>>> What do you mean by costly? As I said, I have a proof-of-concept
>>>> windowing
>>>>> operator that can handle out-or-order elements. This is an example
>>> using
>>>>> the current Flink API:
>>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
>>>>> (It is an infinite source of tuples and a 5 second window operator
> that
>>>>> counts the tuples.) The first problem is that this code deadlocks
>>> because
>>>>> of the thread that emits fake elements. If I disable the fake
> element
>>>> code
>>>>> it works, but the throughput using my mockup is 4 times higher . The
>>> gap
>>>>> widens dramatically if the window size increases.
>>>>> 
>>>>> So, it actually increases performance (unless I'm making a mistake
> in
>>> my
>>>>> explorations) and can handle elements that arrive out-of-order
> (which
>>>>> happens basically always in a real-world windowing use-cases).
>>>>> 
>>>>> 
>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <se...@apache.org> wrote:
>>>>> 
>>>>>> What I like a lot about Aljoscha's proposed design is that we
> need no
>>>>>> different code for "system time" vs. "event time". It only
> differs in
>>>>> where
>>>>>> the timestamps are assigned.
>>>>>> 
>>>>>> The OOP approach also gives you the semantics of total ordering
>>> without
>>>>>> imposing merges on the streams.
>>>>>> 
>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
>>>>>> mj...@informatik.hu-berlin.de> wrote:
>>>>>> 
>>>>>>> I agree that there should be multiple alternatives the user(!)
> can
>>>>>>> choose from. Partial out-of-order processing works for many/most
>>>>>>> aggregates. However, if you consider Event-Pattern-Matching,
> global
>>>>>>> ordering in necessary (even if the performance penalty might be
>>>> high).
>>>>>>> 
>>>>>>> I would also keep "system-time windows" as an alternative to
>>> "source
>>>>>>> assigned ts-windows".
>>>>>>> 
>>>>>>> It might also be interesting to consider the following paper for
>>>>>>> overlapping windows: "Resource sharing in continuous
> sliding-window
>>>>>>> aggregates"
>>>>>>> 
>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720
>>>>>>> 
>>>>>>> -Matthias
>>>>>>> 
>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote:
>>>>>>>> Hey
>>>>>>>> 
>>>>>>>> I think we should not block PRs unnecessarily if your
> suggested
>>>>> changes
>>>>>>>> might touch them at some point.
>>>>>>>> 
>>>>>>>> Also I still think we should not put everything in the
> Datastream
>>>>>> because
>>>>>>>> it will be a huge mess.
>>>>>>>> 
>>>>>>>> Also we need to agree on the out of order processing, whether
> we
>>>> want
>>>>>> it
>>>>>>>> the way you proposed it(which is quite costly). Another
>>> alternative
>>>>>>>> approach there which fits in the current windowing is to
> filter
>>> out
>>>>> if
>>>>>>>> order events and apply a special handling operator on them.
> This
>>>>> would
>>>>>> be
>>>>>>>> fairly lightweight.
>>>>>>>> 
>>>>>>>> My point is that we need to consider some alternative
> solutions.
>>>> And
>>>>> we
>>>>>>>> should not block contributions along the way.
>>>>>>>> 
>>>>>>>> Cheers
>>>>>>>> Gyula
>>>>>>>> 
>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <
>>>>> aljos...@apache.org>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> The reason I posted this now is that we need to think about
> the
>>>> API
>>>>>> and
>>>>>>>>> windowing before proceeding with the PRs of Gabor (inverse
>>> reduce)
>>>>> and
>>>>>>>>> Gyula (removal of "aggregate" functions on DataStream).
>>>>>>>>> 
>>>>>>>>> For the windowing, I think that the current model does not
> work
>>>> for
>>>>>>>>> out-of-order processing. Therefore, the whole windowing
>>>>> infrastructure
>>>>>>> will
>>>>>>>>> basically have to be redone. Meaning also that any work on
> the
>>>>>>>>> pre-aggregators or optimizations that we do now becomes
> useless.
>>>>>>>>> 
>>>>>>>>> For the API, I proposed to restructure the interactions
> between
>>>> all
>>>>>> the
>>>>>>>>> different *DataStream classes and grouping/windowing. (See
> API
>>>>> section
>>>>>>> of
>>>>>>>>> the doc I posted.)
>>>>>>>>> 
>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <gyula.f...@gmail.com
>> 
>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>> 
>>>>>>>>>> Thanks for the nice summary, this is a very good initiative.
>>>>>>>>>> 
>>>>>>>>>> I added some comments to the respective sections (where I
> didnt
>>>>> fully
>>>>>>>>> agree
>>>>>>>>>> :).).
>>>>>>>>>> At some point I think it would be good to have a public
> hangout
>>>>>> session
>>>>>>>>> on
>>>>>>>>>> this, which could make a more dynamic discussion.
>>>>>>>>>> 
>>>>>>>>>> Cheers,
>>>>>>>>>> Gyula
>>>>>>>>>> 
>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont:
>>> 2015.
>>>>> jún.
>>>>>>>>> 22.,
>>>>>>>>>> H, 21:34):
>>>>>>>>>> 
>>>>>>>>>>> Hi,
>>>>>>>>>>> with people proposing changes to the streaming part I also
>>>> wanted
>>>>> to
>>>>>>>>>> throw
>>>>>>>>>>> my hat into the ring. :D
>>>>>>>>>>> 
>>>>>>>>>>> During the last few months, while I was getting acquainted
>>> with
>>>>> the
>>>>>>>>>>> streaming system, I wrote down some thoughts I had about
> how
>>>>> things
>>>>>>>>> could
>>>>>>>>>>> be improved. Hopefully, they are in somewhat coherent shape
>>> now,
>>>>> so
>>>>>>>>>> please
>>>>>>>>>>> have a look if you are interested in this:
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
>>>>>>>>>>> 
>>>>>>>>>>> This mostly covers:
>>>>>>>>>>> - Timestamps assigned at sources
>>>>>>>>>>> - Out-of-order processing of elements in window operators
>>>>>>>>>>> - API design
>>>>>>>>>>> 
>>>>>>>>>>> Please let me know what you think. Comment in the document
> or
>>>> here
>>>>>> in
>>>>>>>>> the
>>>>>>>>>>> mailing list.
>>>>>>>>>>> 
>>>>>>>>>>> I have a PR in the makings that would introduce source
>>>> timestamps
>>>>>> and
>>>>>>>>>>> watermarks for keeping track of them. I also hacked a
>>>>>> proof-of-concept
>>>>>>>>>> of a
>>>>>>>>>>> windowing system that is able to process out-of-order
> elements
>>>>>> using a
>>>>>>>>>>> FlatMap operator. (It uses panes to perform efficient
>>>>>>>>> pre-aggregations.)
>>>>>>>>>>> 
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Aljoscha
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 

Reply via email to