Nice starting point.

Comment 1:
"Each individual stream partition delivers elements strictly in order."
(in 'Parallel Streams, Partitions, Time, and Ordering')

I would say "FIFO" and not "strictly in order". If data is not emitted
in-order, the stream partition will not be in-order either.

Comment 2:
Why do we need KeyedDataStream. You can get everything done with
GroupedDataStream (using a tumbling window of size = 1 tuple).


-Matthias

On 06/26/2015 07:42 PM, Stephan Ewen wrote:
> Here is a first bit of what I have been writing down. Will add more over
> the next days:
> 
> 
> https://cwiki.apache.org/confluence/display/FLINK/Stream+Windows
> 
> https://cwiki.apache.org/confluence/display/FLINK/Parallel+Streams%2C+Partitions%2C+Time%2C+and+Ordering
> 
> 
> 
> On Thu, Jun 25, 2015 at 6:35 PM, Paris Carbone <par...@kth.se> wrote:
> 
>> +1 for writing this down
>>
>>> On 25 Jun 2015, at 18:11, Aljoscha Krettek <aljos...@apache.org> wrote:
>>>
>>> +1 go ahead
>>>
>>> On Thu, 25 Jun 2015 at 18:02 Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Hey!
>>>>
>>>> This thread covers many different topics. Lets break this up into
>> separate
>>>> discussions.
>>>>
>>>> - Operator State is already driven by Gyula and Paris and happening on
>> the
>>>> above mentioned pull request and the followup discussions.
>>>>
>>>> - For windowing, this discussion has brought some results that we should
>>>> sum up and clearly write down.
>>>>   I would like to chime in to do that based on what I learned from the
>>>> document and this discussion. I also got some input from Marton about
>> what
>>>> he learned from mapping the Cloud DataFlow constructs to Flink.
>>>>   I'll draft a Wiki page (with the help of Aljoscha, Marton) that sums
>>>> this up and documents it for users (if we decide to adopt this).
>>>>   Then we run this by Gyula, Matthias Sax and Kostas for feedback.
>>>>
>>>> - API style discussions should be yet another thread. This will probably
>>>> be opened as people start to address that.
>>>>
>>>>
>>>> I'll try to get a draft of the wiki version out tomorrow noon and send
>> the
>>>> link around.
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>>
>>>> On Thu, Jun 25, 2015 at 3:51 PM, Matthias J. Sax <
>>>> mj...@informatik.hu-berlin.de> wrote:
>>>>
>>>>> Sure. I picked this up. Using the current model for "occurrence time
>>>>> semantics" does not work.
>>>>>
>>>>> I elaborated on this in the past many times (but nobody cared). It is
>>>>> important to make it clear to the user what semantics are supported.
>>>>> Claiming to support "sliding windows" doesn't mean anything; there are
>>>>> too many different semantics out there. :)
>>>>>
>>>>>
>>>>> On 06/25/2015 03:35 PM, Aljoscha Krettek wrote:
>>>>>> Yes, I am aware of this requirement and it would also be supported in
>>>> my
>>>>>> proposed model.
>>>>>>
>>>>>> The problem is, that the "custom timestamp" feature gives the
>>>> impression
>>>>>> that the elements would be windowed according to a user-timestamp. The
>>>>>> results, however, are wrong because of the assumption about elements
>>>>>> arriving in order. (This is what I was trying to show with my fancy
>>>> ASCII
>>>>>> art and result output.
>>>>>>
>>>>>> On Thu, 25 Jun 2015 at 15:26 Matthias J. Sax <
>>>>> mj...@informatik.hu-berlin.de>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Aljoscha,
>>>>>>>
>>>>>>> I like that you are pushing in this direction. However, IMHO you
>>>>>>> misinterpreter the current approach. It does not assume that tuples
>>>>>>> arrive in-order; the current approach has no notion about a
>>>>>>> pre-defined-order (for example, the order in which the event are
>>>>>>> created). There is only the notion of "arrival-order" at the
>> operator.
>>>>>>> From this "arrival-order" perspective, the result are correct(!).
>>>>>>>
>>>>>>> Windowing in the current approach means for example, "sum up an
>>>>>>> attribute of all events you *received* in the last 5 seconds". That
>>>> is a
>>>>>>> different meaning that "sum up an attribute of all event that
>>>> *occurred*
>>>>>>> in the last 5 seconds". Both queries are valid and Flink should
>>>> support
>>>>>>> both IMHO.
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 06/25/2015 03:03 PM, Aljoscha Krettek wrote:
>>>>>>>> Yes, now this also processes about 3 mio Elements (Window Size 5
>> sec,
>>>>>>> Slide
>>>>>>>> 1 sec) but it still fluctuates a lot between 1 mio. and 5 mio.
>>>>>>>>
>>>>>>>> Performance is not my main concern, however. My concern is that the
>>>>>>> current
>>>>>>>> model assumes elements to arrive in order, which is simply not true.
>>>>>>>>
>>>>>>>> In your code you have these lines for specifying the window:
>>>>>>>> .window(Time.of(1l, TimeUnit.SECONDS))
>>>>>>>> .every(Time.of(1l, TimeUnit.SECONDS))
>>>>>>>>
>>>>>>>> Although this semantically specifies a tumbling window of size 1 sec
>>>>> I'm
>>>>>>>> afraid it uses the sliding window logic internally (because of the
>>>>>>>> .every()).
>>>>>>>>
>>>>>>>> In my tests I only have the first line.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <gga...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I'm very sorry, I had a bug in the InversePreReducer. It should be
>>>>>>>>> fixed now. Can you please run it again?
>>>>>>>>>
>>>>>>>>> I also tried to reproduce some of your performance numbers, but I'm
>>>>>>>>> getting only less than 1/10th of yours. For example, in the
>> Tumbling
>>>>>>>>> case, Current/Reduce produces only ~100000 for me. Do you have any
>>>>>>>>> idea what I could be doing wrong? My code:
>>>>>>>>> http://pastebin.com/zbEjmGhk
>>>>>>>>> I am running it on a 2 GHz Core i7.
>>>>>>>>>
>>>>>>>>> Best regards,
>>>>>>>>> Gabor
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:
>>>>>>>>>> Hi,
>>>>>>>>>> I also ran the tests on top of PR 856 (inverse reducer) now. The
>>>>>>> results
>>>>>>>>>> seem incorrect. When I insert a Thread.sleep(1) in the tuple
>>>> source,
>>>>>>> all
>>>>>>>>>> the previous tests reported around 3600 tuples (Size 5 sec, Slide
>> 1
>>>>>>> sec)
>>>>>>>>>> (Theoretically there would be 5000 tuples in 5 seconds but this is
>>>>> due
>>>>>>> to
>>>>>>>>>> overhead). These are the results for the inverse reduce
>>>> optimisation:
>>>>>>>>>> (Tuple 0,38)
>>>>>>>>>> (Tuple 0,829)
>>>>>>>>>> (Tuple 0,1625)
>>>>>>>>>> (Tuple 0,2424)
>>>>>>>>>> (Tuple 0,3190)
>>>>>>>>>> (Tuple 0,3198)
>>>>>>>>>> (Tuple 0,-339368)
>>>>>>>>>> (Tuple 0,-1315725)
>>>>>>>>>> (Tuple 0,-2932932)
>>>>>>>>>> (Tuple 0,-5082735)
>>>>>>>>>> (Tuple 0,-7743256)
>>>>>>>>>> (Tuple 0,75701046)
>>>>>>>>>> (Tuple 0,642829470)
>>>>>>>>>> (Tuple 0,2242018381)
>>>>>>>>>> (Tuple 0,5190708618)
>>>>>>>>>> (Tuple 0,10060360311)
>>>>>>>>>> (Tuple 0,-94254951)
>>>>>>>>>> (Tuple 0,-219806321293)
>>>>>>>>>> (Tuple 0,-1258895232699)
>>>>>>>>>> (Tuple 0,-4074432596329)
>>>>>>>>>>
>>>>>>>>>> One line is one emitted window count. This is what happens when I
>>>>>>> remove
>>>>>>>>>> the Thread.sleep(1):
>>>>>>>>>> (Tuple 0,660676)
>>>>>>>>>> (Tuple 0,2553733)
>>>>>>>>>> (Tuple 0,3542696)
>>>>>>>>>> (Tuple 0,1)
>>>>>>>>>> (Tuple 0,1107035)
>>>>>>>>>> (Tuple 0,2549491)
>>>>>>>>>> (Tuple 0,4100387)
>>>>>>>>>> (Tuple 0,-8406583360092)
>>>>>>>>>> (Tuple 0,-8406582150743)
>>>>>>>>>> (Tuple 0,-8406580427190)
>>>>>>>>>> (Tuple 0,-8406580427190)
>>>>>>>>>> (Tuple 0,-8406580427190)
>>>>>>>>>> (Tuple 0,6847279255682044995)
>>>>>>>>>> (Tuple 0,6847279255682044995)
>>>>>>>>>> (Tuple 0,-5390528042713628318)
>>>>>>>>>> (Tuple 0,-5390528042711551780)
>>>>>>>>>> (Tuple 0,-5390528042711551780)
>>>>>>>>>>
>>>>>>>>>> So at some point the pre-reducer seems to go haywire and does not
>>>>>>> recover
>>>>>>>>>> from it. The good thing is that it does produce results now, where
>>>>> the
>>>>>>>>>> previous Current/Reduce would simply hang and not produce any
>>>> output.
>>>>>>>>>>
>>>>>>>>>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <gga...@gmail.com>
>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello,
>>>>>>>>>>>
>>>>>>>>>>> Aljoscha, can you please try the performance test of
>>>> Current/Reduce
>>>>>>>>>>> with the InversePreReducer in PR 856? (If you just call sum, it
>>>> will
>>>>>>>>>>> use an InversePreReducer.) It would be an interesting test,
>>>> because
>>>>>>>>>>> the inverse function optimization really depends on the stream
>>>> being
>>>>>>>>>>> ordered, and I think it has the potential of being faster then
>>>>>>>>>>> Next/Reduce. Especially if the window size is much larger than
>> the
>>>>>>>>>>> slide size.
>>>>>>>>>>>
>>>>>>>>>>> Best regards,
>>>>>>>>>>> Gabor
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <aljos...@apache.org
>>>>> :
>>>>>>>>>>>> I think I'll have to elaborate a bit so I created a
>>>>> proof-of-concept
>>>>>>>>>>>> implementation of my Ideas and ran some throughput measurements
>>>> to
>>>>>>>>>>>> alleviate concerns about performance.
>>>>>>>>>>>>
>>>>>>>>>>>> First, though, I want to highlight again why the current
>> approach
>>>>>>> does
>>>>>>>>>>> not
>>>>>>>>>>>> work with out-of-order elements (which, again, occur constantly
>>>> due
>>>>>>> to
>>>>>>>>>>> the
>>>>>>>>>>>> distributed nature of the system). This is the example I posted
>>>>>>>>> earlier:
>>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27. The plan
>>>>>>> looks
>>>>>>>>>>> like
>>>>>>>>>>>> this:
>>>>>>>>>>>>
>>>>>>>>>>>> +--+
>>>>>>>>>>>> | | Source
>>>>>>>>>>>> +--+
>>>>>>>>>>>> |
>>>>>>>>>>>> +-----+
>>>>>>>>>>>> | |
>>>>>>>>>>>> | +--+
>>>>>>>>>>>> | | | Identity Map
>>>>>>>>>>>> | +--+
>>>>>>>>>>>> | |
>>>>>>>>>>>> +-----+
>>>>>>>>>>>> |
>>>>>>>>>>>> +--+
>>>>>>>>>>>> | | Window
>>>>>>>>>>>> +--+
>>>>>>>>>>>> |
>>>>>>>>>>>> |
>>>>>>>>>>>> +--+
>>>>>>>>>>>> | | Sink
>>>>>>>>>>>> +--+
>>>>>>>>>>>>
>>>>>>>>>>>> So all it does is pass the elements through an identity map and
>>>>> then
>>>>>>>>>>> merge
>>>>>>>>>>>> them again before the window operator. The source emits
>> ascending
>>>>>>>>>>> integers
>>>>>>>>>>>> and the window operator has a custom timestamp extractor that
>>>> uses
>>>>>>> the
>>>>>>>>>>>> integer itself as the timestamp and should create windows of
>>>> size 4
>>>>>>>>> (that
>>>>>>>>>>>> is elements with timestamp 0-3 are one window, the next are the
>>>>>>>>> elements
>>>>>>>>>>>> with timestamp 4-8, and so on). Since the topology basically
>>>>> doubles
>>>>>>>>> the
>>>>>>>>>>>> elements form the source I would expect to get these windows:
>>>>>>>>>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3
>>>>>>>>>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8
>>>>>>>>>>>>
>>>>>>>>>>>> The output is this, however:
>>>>>>>>>>>> Window: 0, 1, 2, 3,
>>>>>>>>>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7,
>>>>>>>>>>>> Window: 8, 9, 10, 11,
>>>>>>>>>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
>>>>>>>>>>>> Window: 16, 17, 18, 19,
>>>>>>>>>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
>>>>>>>>>>>> Window: 24, 25, 26, 27,
>>>>>>>>>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
>>>>>>>>>>>>
>>>>>>>>>>>> The reason is that the elements simply arrive out-of-order.
>>>> Imagine
>>>>>>>>> what
>>>>>>>>>>>> would happen if the elements actually arrived with some delay
>>>> from
>>>>>>>>>>>> different operations.
>>>>>>>>>>>>
>>>>>>>>>>>> Now, on to the performance numbers. The proof-of-concept I
>>>> created
>>>>> is
>>>>>>>>>>>> available here:
>>>>>>>>>>>>
>> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock
>>>> .
>>>>>>> The
>>>>>>>>>>> basic
>>>>>>>>>>>> idea is that sources assign the current timestamp when emitting
>>>>>>>>> elements.
>>>>>>>>>>>> They also periodically emit watermarks that tell us that no
>>>>> elements
>>>>>>>>> with
>>>>>>>>>>>> an earlier timestamp will be emitted. The watermarks propagate
>>>>>>> through
>>>>>>>>>>> the
>>>>>>>>>>>> operators. The window operator looks at the timestamp of an
>>>> element
>>>>>>>>> and
>>>>>>>>>>>> puts it into the buffer that corresponds to that window. When
>> the
>>>>>>>>> window
>>>>>>>>>>>> operator receives a watermark it will look at the in-flight
>>>> windows
>>>>>>>>>>>> (basically the buffers) and emit those windows where the
>>>> window-end
>>>>>>> is
>>>>>>>>>>>> before the watermark.
>>>>>>>>>>>>
>>>>>>>>>>>> For measuring throughput I did the following: The source emits
>>>>> tuples
>>>>>>>>> of
>>>>>>>>>>>> the form ("tuple", 1) in an infinite loop. The window operator
>>>> sums
>>>>>>> up
>>>>>>>>>>> the
>>>>>>>>>>>> tuples, thereby counting how many tuples the window operator can
>>>>>>>>> handle
>>>>>>>>>>> in
>>>>>>>>>>>> a given time window. There are two different implementations for
>>>>> the
>>>>>>>>>>>> summation: 1) simply summing up the values in a mapWindow(),
>>>> there
>>>>>>> you
>>>>>>>>>>> get
>>>>>>>>>>>> a List of all tuples and simple iterate over it. 2) using
>> sum(1),
>>>>>>>>> which
>>>>>>>>>>> is
>>>>>>>>>>>> implemented as a reduce() (that uses the pre-reducer
>>>>> optimisations).
>>>>>>>>>>>>
>>>>>>>>>>>> These are the performance numbers (Current is the current
>>>>>>>>> implementation,
>>>>>>>>>>>> Next is my proof-of-concept):
>>>>>>>>>>>>
>>>>>>>>>>>> Tumbling (1 sec):
>>>>>>>>>>>> - Current/Map: 1.6 mio
>>>>>>>>>>>> - Current/Reduce: 2 mio
>>>>>>>>>>>> - Next/Map: 2.2 mio
>>>>>>>>>>>> - Next/Reduce: 4 mio
>>>>>>>>>>>>
>>>>>>>>>>>> Sliding (5 sec, slide 1 sec):
>>>>>>>>>>>> - Current/Map: ca 3 mio (fluctuates a lot)
>>>>>>>>>>>> - Current/Reduce: No output
>>>>>>>>>>>> - Next/Map: ca 4 mio (fluctuates)
>>>>>>>>>>>> - Next/Reduce: 10 mio
>>>>>>>>>>>>
>>>>>>>>>>>> The Next/Reduce variant can basically scale indefinitely with
>>>>> window
>>>>>>>>> size
>>>>>>>>>>>> because the internal state does not rely on the number of
>>>> elements
>>>>>>>>> (it is
>>>>>>>>>>>> just the current sum). The pre-reducer for sliding elements
>>>> cannot
>>>>>>>>> handle
>>>>>>>>>>>> the amount of tuples, it produces no output. For the two Map
>>>>> variants
>>>>>>>>> the
>>>>>>>>>>>> performance fluctuates because they always keep all the elements
>>>> in
>>>>>>> an
>>>>>>>>>>>> internal buffer before emission, this seems to tax the garbage
>>>>>>>>> collector
>>>>>>>>>>> a
>>>>>>>>>>>> bit and leads to random pauses.
>>>>>>>>>>>>
>>>>>>>>>>>> One thing that should be noted is that I had to disable the
>>>>>>>>> fake-element
>>>>>>>>>>>> emission thread, otherwise the Current versions would deadlock.
>>>>>>>>>>>>
>>>>>>>>>>>> So, I started working on this because I thought that
>> out-of-order
>>>>>>>>>>>> processing would be necessary for correctness. And it is
>>>> certainly,
>>>>>>>>> But
>>>>>>>>>>> the
>>>>>>>>>>>> proof-of-concept also shows that performance can be greatly
>>>>> improved.
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <gyula.f...@gmail.com>
>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> I agree lets separate these topics from each other so we can
>> get
>>>>>>>>> faster
>>>>>>>>>>>>> resolution.
>>>>>>>>>>>>>
>>>>>>>>>>>>> There is already a state discussion in the thread we started
>>>> with
>>>>>>>>> Paris.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <
>>>>> ktzou...@apache.org
>>>>>>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I agree with supporting out-of-order out of the box :-), even
>>>> if
>>>>>>>>> this
>>>>>>>>>>>> means
>>>>>>>>>>>>>> a major refactoring. This is the right time to refactor the
>>>>>>>>> streaming
>>>>>>>>>>>> API
>>>>>>>>>>>>>> before we pull it out of beta. I think that this is more
>>>>> important
>>>>>>>>>>> than
>>>>>>>>>>>> new
>>>>>>>>>>>>>> features in the streaming API, which can be prioritized once
>>>> the
>>>>>>>>> API
>>>>>>>>>>> is
>>>>>>>>>>>> out
>>>>>>>>>>>>>> of beta (meaning, that IMO this is the right time to stall PRs
>>>>>>>>> until
>>>>>>>>>>> we
>>>>>>>>>>>>>> agree on the design).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> There are three sections in the document: windowing, state,
>> and
>>>>>>>>> API.
>>>>>>>>>>> How
>>>>>>>>>>>>>> convoluted are those with each other? Can we separate the
>>>>>>>>> discussion
>>>>>>>>>>> or
>>>>>>>>>>>> do
>>>>>>>>>>>>>> we need to discuss those all together? I think part of the
>>>>>>>>> difficulty
>>>>>>>>>>> is
>>>>>>>>>>>>>> that we are discussing three design choices at once.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <
>>>>>>>>> ted.dunn...@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Out of order is ubiquitous in the real-world.  Typically,
>> what
>>>>>>>>>>>> happens is
>>>>>>>>>>>>>>> that businesses will declare a maximum allowable delay for
>>>>>>>>> delayed
>>>>>>>>>>>>>>> transactions and will commit to results when that delay is
>>>>>>>>> reached.
>>>>>>>>>>>>>>> Transactions that arrive later than this cutoff are collected
>>>>>>>>>>>> specially
>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>> corrections which are reported/used when possible.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Clearly, ordering can also be violated during processing, but
>>>> if
>>>>>>>>> the
>>>>>>>>>>>> data
>>>>>>>>>>>>>>> is originally out of order the situation can't be repaired by
>>>>> any
>>>>>>>>>>>>>> protocol
>>>>>>>>>>>>>>> fixes that prevent transactions from becoming disordered but
>>>> has
>>>>>>>>> to
>>>>>>>>>>>>>> handled
>>>>>>>>>>>>>>> at the data level.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <
>>>>>>>>>>> aljos...@apache.org
>>>>>>>>>>>>>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I also don't like big changes but sometimes they are
>>>> necessary.
>>>>>>>>>>> The
>>>>>>>>>>>>>>> reason
>>>>>>>>>>>>>>>> why I'm so adamant about out-of-order processing is that
>>>>>>>>>>>> out-of-order
>>>>>>>>>>>>>>>> elements are not some exception that occurs once in a while;
>>>>>>>>> they
>>>>>>>>>>>> occur
>>>>>>>>>>>>>>>> constantly in a distributed system. For example, in this:
>>>>>>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 the
>>>>>>>>>>> resulting
>>>>>>>>>>>>>>>> windows
>>>>>>>>>>>>>>>> are completely bogus because the current windowing system
>>>>>>>>> assumes
>>>>>>>>>>>>>>> elements
>>>>>>>>>>>>>>>> to globally arrive in order, which is simply not true. (The
>>>>>>>>>>> example
>>>>>>>>>>>>>> has a
>>>>>>>>>>>>>>>> source that generates increasing integers. Then these pass
>>>>>>>>>>> through a
>>>>>>>>>>>>>> map
>>>>>>>>>>>>>>>> and are unioned with the original DataStream before a window
>>>>>>>>>>>> operator.)
>>>>>>>>>>>>>>>> This simulates elements arriving from different operators at
>>>> a
>>>>>>>>>>>>>> windowing
>>>>>>>>>>>>>>>> operator. The example is also DOP=1, I imagine this to get
>>>>>>>>> worse
>>>>>>>>>>>> with
>>>>>>>>>>>>>>>> higher DOP.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> What do you mean by costly? As I said, I have a
>>>>>>>>> proof-of-concept
>>>>>>>>>>>>>>> windowing
>>>>>>>>>>>>>>>> operator that can handle out-or-order elements. This is an
>>>>>>>>> example
>>>>>>>>>>>>>> using
>>>>>>>>>>>>>>>> the current Flink API:
>>>>>>>>>>>>>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
>>>>>>>>>>>>>>>> (It is an infinite source of tuples and a 5 second window
>>>>>>>>> operator
>>>>>>>>>>>> that
>>>>>>>>>>>>>>>> counts the tuples.) The first problem is that this code
>>>>>>>>> deadlocks
>>>>>>>>>>>>>> because
>>>>>>>>>>>>>>>> of the thread that emits fake elements. If I disable the
>> fake
>>>>>>>>>>>> element
>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>> it works, but the throughput using my mockup is 4 times
>>>> higher
>>>>>>>>> .
>>>>>>>>>>> The
>>>>>>>>>>>>>> gap
>>>>>>>>>>>>>>>> widens dramatically if the window size increases.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> So, it actually increases performance (unless I'm making a
>>>>>>>>> mistake
>>>>>>>>>>>> in
>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>> explorations) and can handle elements that arrive
>>>> out-of-order
>>>>>>>>>>>> (which
>>>>>>>>>>>>>>>> happens basically always in a real-world windowing
>>>> use-cases).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <se...@apache.org
>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What I like a lot about Aljoscha's proposed design is that
>>>> we
>>>>>>>>>>>> need no
>>>>>>>>>>>>>>>>> different code for "system time" vs. "event time". It only
>>>>>>>>>>>> differs in
>>>>>>>>>>>>>>>> where
>>>>>>>>>>>>>>>>> the timestamps are assigned.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The OOP approach also gives you the semantics of total
>>>>>>>>> ordering
>>>>>>>>>>>>>> without
>>>>>>>>>>>>>>>>> imposing merges on the streams.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
>>>>>>>>>>>>>>>>> mj...@informatik.hu-berlin.de> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I agree that there should be multiple alternatives the
>>>>>>>>> user(!)
>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>> choose from. Partial out-of-order processing works for
>>>>>>>>>>> many/most
>>>>>>>>>>>>>>>>>> aggregates. However, if you consider
>>>>>>>>> Event-Pattern-Matching,
>>>>>>>>>>>> global
>>>>>>>>>>>>>>>>>> ordering in necessary (even if the performance penalty
>>>>>>>>> might
>>>>>>>>>>> be
>>>>>>>>>>>>>>> high).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I would also keep "system-time windows" as an alternative
>>>>>>>>> to
>>>>>>>>>>>>>> "source
>>>>>>>>>>>>>>>>>> assigned ts-windows".
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> It might also be interesting to consider the following
>>>>>>>>> paper
>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>> overlapping windows: "Resource sharing in continuous
>>>>>>>>>>>> sliding-window
>>>>>>>>>>>>>>>>>> aggregates"
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote:
>>>>>>>>>>>>>>>>>>> Hey
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I think we should not block PRs unnecessarily if your
>>>>>>>>>>>> suggested
>>>>>>>>>>>>>>>> changes
>>>>>>>>>>>>>>>>>>> might touch them at some point.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Also I still think we should not put everything in the
>>>>>>>>>>>> Datastream
>>>>>>>>>>>>>>>>> because
>>>>>>>>>>>>>>>>>>> it will be a huge mess.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Also we need to agree on the out of order processing,
>>>>>>>>>>> whether
>>>>>>>>>>>> we
>>>>>>>>>>>>>>> want
>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>> the way you proposed it(which is quite costly). Another
>>>>>>>>>>>>>> alternative
>>>>>>>>>>>>>>>>>>> approach there which fits in the current windowing is to
>>>>>>>>>>>> filter
>>>>>>>>>>>>>> out
>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>> order events and apply a special handling operator on
>>>>>>>>> them.
>>>>>>>>>>>> This
>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>> fairly lightweight.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> My point is that we need to consider some alternative
>>>>>>>>>>>> solutions.
>>>>>>>>>>>>>>> And
>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>> should not block contributions along the way.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Cheers
>>>>>>>>>>>>>>>>>>> Gyula
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <
>>>>>>>>>>>>>>>> aljos...@apache.org>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The reason I posted this now is that we need to think
>>>>>>>>> about
>>>>>>>>>>>> the
>>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> windowing before proceeding with the PRs of Gabor
>>>>>>>>> (inverse
>>>>>>>>>>>>>> reduce)
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> Gyula (removal of "aggregate" functions on DataStream).
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> For the windowing, I think that the current model does
>>>>>>>>> not
>>>>>>>>>>>> work
>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>> out-of-order processing. Therefore, the whole windowing
>>>>>>>>>>>>>>>> infrastructure
>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>> basically have to be redone. Meaning also that any work
>>>>>>>>> on
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> pre-aggregators or optimizations that we do now becomes
>>>>>>>>>>>> useless.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> For the API, I proposed to restructure the interactions
>>>>>>>>>>>> between
>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> different *DataStream classes and grouping/windowing.
>>>>>>>>> (See
>>>>>>>>>>>> API
>>>>>>>>>>>>>>>> section
>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> the doc I posted.)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <
>>>>>>>>>>> gyula.f...@gmail.com
>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks for the nice summary, this is a very good
>>>>>>>>>>> initiative.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I added some comments to the respective sections
>>>>>>>>> (where I
>>>>>>>>>>>> didnt
>>>>>>>>>>>>>>>> fully
>>>>>>>>>>>>>>>>>>>> agree
>>>>>>>>>>>>>>>>>>>>> :).).
>>>>>>>>>>>>>>>>>>>>> At some point I think it would be good to have a public
>>>>>>>>>>>> hangout
>>>>>>>>>>>>>>>>> session
>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>> this, which could make a more dynamic discussion.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>> Gyula
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> ezt írta
>>>>>>>>> (időpont:
>>>>>>>>>>>>>> 2015.
>>>>>>>>>>>>>>>> jún.
>>>>>>>>>>>>>>>>>>>> 22.,
>>>>>>>>>>>>>>>>>>>>> H, 21:34):
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>> with people proposing changes to the streaming part I
>>>>>>>>>>> also
>>>>>>>>>>>>>>> wanted
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> throw
>>>>>>>>>>>>>>>>>>>>>> my hat into the ring. :D
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> During the last few months, while I was getting
>>>>>>>>>>> acquainted
>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> streaming system, I wrote down some thoughts I had
>>>>>>>>> about
>>>>>>>>>>>> how
>>>>>>>>>>>>>>>> things
>>>>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>>> be improved. Hopefully, they are in somewhat coherent
>>>>>>>>>>> shape
>>>>>>>>>>>>>> now,
>>>>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>>>>>>> please
>>>>>>>>>>>>>>>>>>>>>> have a look if you are interested in this:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>>>>
>> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> This mostly covers:
>>>>>>>>>>>>>>>>>>>>>> - Timestamps assigned at sources
>>>>>>>>>>>>>>>>>>>>>> - Out-of-order processing of elements in window
>>>>>>>>>>> operators
>>>>>>>>>>>>>>>>>>>>>> - API design
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Please let me know what you think. Comment in the
>>>>>>>>>>> document
>>>>>>>>>>>> or
>>>>>>>>>>>>>>> here
>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> mailing list.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I have a PR in the makings that would introduce source
>>>>>>>>>>>>>>> timestamps
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> watermarks for keeping track of them. I also hacked a
>>>>>>>>>>>>>>>>> proof-of-concept
>>>>>>>>>>>>>>>>>>>>> of a
>>>>>>>>>>>>>>>>>>>>>> windowing system that is able to process out-of-order
>>>>>>>>>>>> elements
>>>>>>>>>>>>>>>>> using a
>>>>>>>>>>>>>>>>>>>>>> FlatMap operator. (It uses panes to perform efficient
>>>>>>>>>>>>>>>>>>>> pre-aggregations.)
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to