Thanks for clarifying.

For map/flapMap it makes sense. However, I would handle filter
differently. Even if the re-grouping can be removed in an optimization
step internally -- what might also be possible for map/flatMap -- I
think it is annoying for the user to specify .byKey() twice...

-> stream.byKey(MYKEY).filter(...).byKey(MYKEY).window(...)
instead of the shorter
-> stream.byKey(MYKEY).filter(...).window(...)

Furthermore, I agree, that the use should choose global order explicitly.

However, I disagree at some point:

1) OrderedDataStream in not non-parallel from my point of view. It only
requires to receive all tuples within a Stream-Partition in order (not
sure, if there is any use-case for this though).

2) Why not supporting OrderedNonParallelStream? NonParallelStream is
already non-parallel, so adding global ordering on top if it should be
no problem (and sounds useful to me).


-Matthias


On 07/23/2015 11:37 AM, Aljoscha Krettek wrote:
> Hi,
> I'll try to answer these one at a time:
> 
> 1) After a map/flatMap the key (and partitioning) of the data is lost,
> that's why it goes back to a vanilla DataStream. I think filter also has
> this behavior just to fit in with the other ones. Also, any chain of
> filter/map/flatMap can also be expressed in a single flatMap.
> 
> 2) An OrderedDataStream would require a global ordering of the elements
> which is an inherently non-parallel operation. With the API rework we want
> to make fast (i.e. parallel, in most cases) operations more prominent while
> making it hard to specify operations that cannot run in parallel. As for
> windows, the ordering of the elements inside a window is decoupled from any
> ordering of a DataStream, the user should be able to specify whether his
> windowing policies or windowing functions require the input elements to be
> ordered by timestamp.
> 
> Now for the Discretization. I think it makes sense to keep it in the same
> document at the beginning but this has the potential to grow very large, at
> which point it should be moved to it's own document. (Exactly what Gyula
> suggested.)
> 
> On Thu, 23 Jul 2015 at 11:22 Matthias J. Sax <mj...@informatik.hu-berlin.de>
> wrote:
> 
>> I just had a look into the "Streams+and+Operations+on+Streams" document.
>>
>> The initial figure is confusing... (it makes sense after reading the
>> document but is a bumper in the beginning)
>>
>>
>> A few comments/question:
>>
>> 1) Why is a (Ordered)KeyedDataStream converted into a DataStream if
>> map/flatMap/filter is applied? (for broadcast/rebalance it makes sense
>> to me)
>>
>> 2) Why is there only an ordered KEYED-DataStream? An
>> NonParallelWindowStream might require ordered input, too. Maybe, even an
>> OrderedDataStream, ie, non-keyed, would make sense, too.
>>
>>
>> @Aljoscha: Decoupling API and window implementation is mandatory of
>> course (from my point of view).
>>
>> @Gyula: I would prefer to extend the current document. It is easier to
>> follow if all information is on a single place and not spread out.
>>
>>
>> -Matthias
>>
>> On 07/23/2015 10:57 AM, Gyula Fóra wrote:
>>> I think aside from the Discretization part we reached a consensus. I
>> think
>>> you can start with the implementation for the rest.
>>>
>>> I will do some updates to the Discretization part, and might even start a
>>> new doc if it gets too long.
>>>
>>> Gyula
>>>
>>> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. júl.
>> 23.,
>>> Cs, 10:47):
>>>
>>>> What's the status of the discussion? What are the opinions on the
>> reworking
>>>> of the Streaming API as presented here:
>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams
>>>>
>>>> If we could reach a consensus I would like to start working on this to
>> have
>>>> it done before the next release. In the process of this I would also
>> like
>>>> to decouple the current windowing implementation from the API to make it
>>>> possible to select different windowing systems per job, as outlined
>> here:
>>>>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=60624830
>>>>
>>>> On Mon, 29 Jun 2015 at 10:55 Stephan Ewen <se...@apache.org> wrote:
>>>>
>>>>> @Matthias:
>>>>>
>>>>> I think using the KeyedDataStream will simply result in smaller
>> programs.
>>>>> May be hard for some users to make the connection to a
>>>>> 1-element-tumbling-window, simply because they want to use state. Not
>>>>> everyone is a deep into that stuff as you are ;-)
>>>>>
>>>>>
>>>>> On Sun, Jun 28, 2015 at 1:13 AM, Matthias J. Sax <
>>>>> mj...@informatik.hu-berlin.de> wrote:
>>>>>
>>>>>> Yes. But as I said, you can get the same behavior with a
>>>>>> GroupedDataStream using a tumbling 1-tuple-size window. Thus, there is
>>>>>> no conceptual advantage in using KeyedDataStream and no disadvantage
>> in
>>>>>> binding stateful operations to GroupedDataStreams.
>>>>>>
>>>>>> On 06/27/2015 06:54 PM, Márton Balassi wrote:
>>>>>>> @Matthias: Your point of working with a minimal number of clear
>>>>> concepts
>>>>>> is
>>>>>>> desirable to say the least. :)
>>>>>>>
>>>>>>> The reasoning behind the KeyedDatastream is to associate Flink
>>>>> persisted
>>>>>>> operator state with the keys of the data that produced it, so that
>>>>>> stateful
>>>>>>> computation becomes scalabe in the future. This should not be tied to
>>>>> the
>>>>>>> GroupedDataStream, especially not if we are removing the option to
>>>>> create
>>>>>>> groups without windows as proposed on the Wiki by Stephan.
>>>>>>>
>>>>>>> On Sat, Jun 27, 2015 at 4:15 PM, Matthias J. Sax <
>>>>>>> mj...@informatik.hu-berlin.de> wrote:
>>>>>>>
>>>>>>>> This was more a conceptual point-of-view argument. From an
>>>>>>>> implementation point of view, skipping the window building step is a
>>>>>>>> good idea if a tumbling 1-tuple-size window is detected.
>>>>>>>>
>>>>>>>> I prefer to work with a minimum number of concepts (and apply
>>>> internal
>>>>>>>> optimization if possible) instead of using redundant concepts for
>>>>>>>> special cases. Of course, this is my personal point of view.
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 06/27/2015 03:47 PM, Aljoscha Krettek wrote:
>>>>>>>>> What do you mean by Comment 2? Using the whole window apparatus if
>>>>> you
>>>>>>>> just
>>>>>>>>> want to have, for example, a simple partitioned filter with
>>>>> partitioned
>>>>>>>>> state seems a bit extravagant.
>>>>>>>>>
>>>>>>>>> On Sat, 27 Jun 2015 at 15:19 Matthias J. Sax <
>>>>>>>> mj...@informatik.hu-berlin.de>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Nice starting point.
>>>>>>>>>>
>>>>>>>>>> Comment 1:
>>>>>>>>>> "Each individual stream partition delivers elements strictly in
>>>>>> order."
>>>>>>>>>> (in 'Parallel Streams, Partitions, Time, and Ordering')
>>>>>>>>>>
>>>>>>>>>> I would say "FIFO" and not "strictly in order". If data is not
>>>>> emitted
>>>>>>>>>> in-order, the stream partition will not be in-order either.
>>>>>>>>>>
>>>>>>>>>> Comment 2:
>>>>>>>>>> Why do we need KeyedDataStream. You can get everything done with
>>>>>>>>>> GroupedDataStream (using a tumbling window of size = 1 tuple).
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>> On 06/26/2015 07:42 PM, Stephan Ewen wrote:
>>>>>>>>>>> Here is a first bit of what I have been writing down. Will add
>>>> more
>>>>>>>> over
>>>>>>>>>>> the next days:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/Stream+Windows
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/Parallel+Streams%2C+Partitions%2C+Time%2C+and+Ordering
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jun 25, 2015 at 6:35 PM, Paris Carbone <par...@kth.se>
>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> +1 for writing this down
>>>>>>>>>>>>
>>>>>>>>>>>>> On 25 Jun 2015, at 18:11, Aljoscha Krettek <
>>>> aljos...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> +1 go ahead
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, 25 Jun 2015 at 18:02 Stephan Ewen <se...@apache.org>
>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hey!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This thread covers many different topics. Lets break this up
>>>>> into
>>>>>>>>>>>> separate
>>>>>>>>>>>>>> discussions.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - Operator State is already driven by Gyula and Paris and
>>>>>> happening
>>>>>>>> on
>>>>>>>>>>>> the
>>>>>>>>>>>>>> above mentioned pull request and the followup discussions.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - For windowing, this discussion has brought some results that
>>>>> we
>>>>>>>>>> should
>>>>>>>>>>>>>> sum up and clearly write down.
>>>>>>>>>>>>>>   I would like to chime in to do that based on what I learned
>>>>> from
>>>>>>>> the
>>>>>>>>>>>>>> document and this discussion. I also got some input from
>>>> Marton
>>>>>>>> about
>>>>>>>>>>>> what
>>>>>>>>>>>>>> he learned from mapping the Cloud DataFlow constructs to
>>>> Flink.
>>>>>>>>>>>>>>   I'll draft a Wiki page (with the help of Aljoscha, Marton)
>>>>> that
>>>>>>>> sums
>>>>>>>>>>>>>> this up and documents it for users (if we decide to adopt
>>>> this).
>>>>>>>>>>>>>>   Then we run this by Gyula, Matthias Sax and Kostas for
>>>>> feedback.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - API style discussions should be yet another thread. This
>>>> will
>>>>>>>>>> probably
>>>>>>>>>>>>>> be opened as people start to address that.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'll try to get a draft of the wiki version out tomorrow noon
>>>>> and
>>>>>>>> send
>>>>>>>>>>>> the
>>>>>>>>>>>>>> link around.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Greetings,
>>>>>>>>>>>>>> Stephan
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Jun 25, 2015 at 3:51 PM, Matthias J. Sax <
>>>>>>>>>>>>>> mj...@informatik.hu-berlin.de> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Sure. I picked this up. Using the current model for
>>>> "occurrence
>>>>>>>> time
>>>>>>>>>>>>>>> semantics" does not work.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I elaborated on this in the past many times (but nobody
>>>> cared).
>>>>>> It
>>>>>>>> is
>>>>>>>>>>>>>>> important to make it clear to the user what semantics are
>>>>>>>> supported.
>>>>>>>>>>>>>>> Claiming to support "sliding windows" doesn't mean anything;
>>>>>> there
>>>>>>>>>> are
>>>>>>>>>>>>>>> too many different semantics out there. :)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 06/25/2015 03:35 PM, Aljoscha Krettek wrote:
>>>>>>>>>>>>>>>> Yes, I am aware of this requirement and it would also be
>>>>>> supported
>>>>>>>>>> in
>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>> proposed model.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The problem is, that the "custom timestamp" feature gives
>>>> the
>>>>>>>>>>>>>> impression
>>>>>>>>>>>>>>>> that the elements would be windowed according to a
>>>>>> user-timestamp.
>>>>>>>>>> The
>>>>>>>>>>>>>>>> results, however, are wrong because of the assumption about
>>>>>>>> elements
>>>>>>>>>>>>>>>> arriving in order. (This is what I was trying to show with
>>>> my
>>>>>>>> fancy
>>>>>>>>>>>>>> ASCII
>>>>>>>>>>>>>>>> art and result output.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, 25 Jun 2015 at 15:26 Matthias J. Sax <
>>>>>>>>>>>>>>> mj...@informatik.hu-berlin.de>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I like that you are pushing in this direction. However,
>>>> IMHO
>>>>>> you
>>>>>>>>>>>>>>>>> misinterpreter the current approach. It does not assume
>>>> that
>>>>>>>> tuples
>>>>>>>>>>>>>>>>> arrive in-order; the current approach has no notion about a
>>>>>>>>>>>>>>>>> pre-defined-order (for example, the order in which the
>>>> event
>>>>>> are
>>>>>>>>>>>>>>>>> created). There is only the notion of "arrival-order" at
>>>> the
>>>>>>>>>>>> operator.
>>>>>>>>>>>>>>>>> From this "arrival-order" perspective, the result are
>>>>>> correct(!).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Windowing in the current approach means for example, "sum
>>>> up
>>>>> an
>>>>>>>>>>>>>>>>> attribute of all events you *received* in the last 5
>>>>> seconds".
>>>>>>>> That
>>>>>>>>>>>>>> is a
>>>>>>>>>>>>>>>>> different meaning that "sum up an attribute of all event
>>>> that
>>>>>>>>>>>>>> *occurred*
>>>>>>>>>>>>>>>>> in the last 5 seconds". Both queries are valid and Flink
>>>>> should
>>>>>>>>>>>>>> support
>>>>>>>>>>>>>>>>> both IMHO.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 06/25/2015 03:03 PM, Aljoscha Krettek wrote:
>>>>>>>>>>>>>>>>>> Yes, now this also processes about 3 mio Elements (Window
>>>>>> Size 5
>>>>>>>>>>>> sec,
>>>>>>>>>>>>>>>>> Slide
>>>>>>>>>>>>>>>>>> 1 sec) but it still fluctuates a lot between 1 mio. and 5
>>>>> mio.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Performance is not my main concern, however. My concern is
>>>>>> that
>>>>>>>>>> the
>>>>>>>>>>>>>>>>> current
>>>>>>>>>>>>>>>>>> model assumes elements to arrive in order, which is simply
>>>>> not
>>>>>>>>>> true.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> In your code you have these lines for specifying the
>>>> window:
>>>>>>>>>>>>>>>>>> .window(Time.of(1l, TimeUnit.SECONDS))
>>>>>>>>>>>>>>>>>> .every(Time.of(1l, TimeUnit.SECONDS))
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Although this semantically specifies a tumbling window of
>>>>>> size 1
>>>>>>>>>> sec
>>>>>>>>>>>>>>> I'm
>>>>>>>>>>>>>>>>>> afraid it uses the sliding window logic internally
>>>> (because
>>>>> of
>>>>>>>> the
>>>>>>>>>>>>>>>>>> .every()).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> In my tests I only have the first line.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <
>>>> gga...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I'm very sorry, I had a bug in the InversePreReducer. It
>>>>>> should
>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>> fixed now. Can you please run it again?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I also tried to reproduce some of your performance
>>>> numbers,
>>>>>> but
>>>>>>>>>> I'm
>>>>>>>>>>>>>>>>>>> getting only less than 1/10th of yours. For example, in
>>>> the
>>>>>>>>>>>> Tumbling
>>>>>>>>>>>>>>>>>>> case, Current/Reduce produces only ~100000 for me. Do you
>>>>>> have
>>>>>>>>>> any
>>>>>>>>>>>>>>>>>>> idea what I could be doing wrong? My code:
>>>>>>>>>>>>>>>>>>> http://pastebin.com/zbEjmGhk
>>>>>>>>>>>>>>>>>>> I am running it on a 2 GHz Core i7.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>> Gabor
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <
>>>>>>>> aljos...@apache.org
>>>>>>>>>>> :
>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>> I also ran the tests on top of PR 856 (inverse reducer)
>>>>> now.
>>>>>>>> The
>>>>>>>>>>>>>>>>> results
>>>>>>>>>>>>>>>>>>>> seem incorrect. When I insert a Thread.sleep(1) in the
>>>>> tuple
>>>>>>>>>>>>>> source,
>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>> the previous tests reported around 3600 tuples (Size 5
>>>>> sec,
>>>>>>>>>> Slide
>>>>>>>>>>>> 1
>>>>>>>>>>>>>>>>> sec)
>>>>>>>>>>>>>>>>>>>> (Theoretically there would be 5000 tuples in 5 seconds
>>>> but
>>>>>>>> this
>>>>>>>>>> is
>>>>>>>>>>>>>>> due
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> overhead). These are the results for the inverse reduce
>>>>>>>>>>>>>> optimisation:
>>>>>>>>>>>>>>>>>>>> (Tuple 0,38)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,829)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,1625)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,2424)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,3190)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,3198)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,-339368)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,-1315725)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,-2932932)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,-5082735)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,-7743256)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,75701046)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,642829470)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,2242018381)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,5190708618)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,10060360311)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,-94254951)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,-219806321293)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,-1258895232699)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,-4074432596329)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> One line is one emitted window count. This is what
>>>> happens
>>>>>>>> when
>>>>>>>>>> I
>>>>>>>>>>>>>>>>> remove
>>>>>>>>>>>>>>>>>>>> the Thread.sleep(1):
>>>>>>>>>>>>>>>>>>>> (Tuple 0,660676)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,2553733)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,3542696)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,1)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,1107035)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,2549491)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,4100387)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,-8406583360092)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,-8406582150743)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,-8406580427190)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,-8406580427190)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,-8406580427190)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,6847279255682044995)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,6847279255682044995)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,-5390528042713628318)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,-5390528042711551780)
>>>>>>>>>>>>>>>>>>>> (Tuple 0,-5390528042711551780)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> So at some point the pre-reducer seems to go haywire and
>>>>>> does
>>>>>>>>>> not
>>>>>>>>>>>>>>>>> recover
>>>>>>>>>>>>>>>>>>>> from it. The good thing is that it does produce results
>>>>> now,
>>>>>>>>>> where
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> previous Current/Reduce would simply hang and not
>>>> produce
>>>>>> any
>>>>>>>>>>>>>> output.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <
>>>>> gga...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Aljoscha, can you please try the performance test of
>>>>>>>>>>>>>> Current/Reduce
>>>>>>>>>>>>>>>>>>>>> with the InversePreReducer in PR 856? (If you just call
>>>>>> sum,
>>>>>>>> it
>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>> use an InversePreReducer.) It would be an interesting
>>>>> test,
>>>>>>>>>>>>>> because
>>>>>>>>>>>>>>>>>>>>> the inverse function optimization really depends on the
>>>>>>>> stream
>>>>>>>>>>>>>> being
>>>>>>>>>>>>>>>>>>>>> ordered, and I think it has the potential of being
>>>> faster
>>>>>>>> then
>>>>>>>>>>>>>>>>>>>>> Next/Reduce. Especially if the window size is much
>>>> larger
>>>>>>>> than
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> slide size.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>>>> Gabor
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <
>>>>>>>>>> aljos...@apache.org
>>>>>>>>>>>>>>> :
>>>>>>>>>>>>>>>>>>>>>> I think I'll have to elaborate a bit so I created a
>>>>>>>>>>>>>>> proof-of-concept
>>>>>>>>>>>>>>>>>>>>>> implementation of my Ideas and ran some throughput
>>>>>>>>>> measurements
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> alleviate concerns about performance.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> First, though, I want to highlight again why the
>>>> current
>>>>>>>>>>>> approach
>>>>>>>>>>>>>>>>> does
>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>> work with out-of-order elements (which, again, occur
>>>>>>>>>> constantly
>>>>>>>>>>>>>> due
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> distributed nature of the system). This is the
>>>> example I
>>>>>>>>>> posted
>>>>>>>>>>>>>>>>>>> earlier:
>>>>>>>>>>>>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27
>>>> .
>>>>>> The
>>>>>>>>>> plan
>>>>>>>>>>>>>>>>> looks
>>>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>> this:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> +--+
>>>>>>>>>>>>>>>>>>>>>> | | Source
>>>>>>>>>>>>>>>>>>>>>> +--+
>>>>>>>>>>>>>>>>>>>>>> |
>>>>>>>>>>>>>>>>>>>>>> +-----+
>>>>>>>>>>>>>>>>>>>>>> | |
>>>>>>>>>>>>>>>>>>>>>> | +--+
>>>>>>>>>>>>>>>>>>>>>> | | | Identity Map
>>>>>>>>>>>>>>>>>>>>>> | +--+
>>>>>>>>>>>>>>>>>>>>>> | |
>>>>>>>>>>>>>>>>>>>>>> +-----+
>>>>>>>>>>>>>>>>>>>>>> |
>>>>>>>>>>>>>>>>>>>>>> +--+
>>>>>>>>>>>>>>>>>>>>>> | | Window
>>>>>>>>>>>>>>>>>>>>>> +--+
>>>>>>>>>>>>>>>>>>>>>> |
>>>>>>>>>>>>>>>>>>>>>> |
>>>>>>>>>>>>>>>>>>>>>> +--+
>>>>>>>>>>>>>>>>>>>>>> | | Sink
>>>>>>>>>>>>>>>>>>>>>> +--+
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> So all it does is pass the elements through an
>>>> identity
>>>>>> map
>>>>>>>>>> and
>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>>>> merge
>>>>>>>>>>>>>>>>>>>>>> them again before the window operator. The source
>>>> emits
>>>>>>>>>>>> ascending
>>>>>>>>>>>>>>>>>>>>> integers
>>>>>>>>>>>>>>>>>>>>>> and the window operator has a custom timestamp
>>>> extractor
>>>>>>>> that
>>>>>>>>>>>>>> uses
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> integer itself as the timestamp and should create
>>>>> windows
>>>>>> of
>>>>>>>>>>>>>> size 4
>>>>>>>>>>>>>>>>>>> (that
>>>>>>>>>>>>>>>>>>>>>> is elements with timestamp 0-3 are one window, the
>>>> next
>>>>>> are
>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> elements
>>>>>>>>>>>>>>>>>>>>>> with timestamp 4-8, and so on). Since the topology
>>>>>> basically
>>>>>>>>>>>>>>> doubles
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> elements form the source I would expect to get these
>>>>>>>> windows:
>>>>>>>>>>>>>>>>>>>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3
>>>>>>>>>>>>>>>>>>>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> The output is this, however:
>>>>>>>>>>>>>>>>>>>>>> Window: 0, 1, 2, 3,
>>>>>>>>>>>>>>>>>>>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7,
>>>>>>>>>>>>>>>>>>>>>> Window: 8, 9, 10, 11,
>>>>>>>>>>>>>>>>>>>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
>>>>>>>>>>>>>>>>>>>>>> Window: 16, 17, 18, 19,
>>>>>>>>>>>>>>>>>>>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21,
>>>> 22,
>>>>>> 23,
>>>>>>>>>>>>>>>>>>>>>> Window: 24, 25, 26, 27,
>>>>>>>>>>>>>>>>>>>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29,
>>>> 30,
>>>>>> 31,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> The reason is that the elements simply arrive
>>>>>> out-of-order.
>>>>>>>>>>>>>> Imagine
>>>>>>>>>>>>>>>>>>> what
>>>>>>>>>>>>>>>>>>>>>> would happen if the elements actually arrived with
>>>> some
>>>>>>>> delay
>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>> different operations.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Now, on to the performance numbers. The
>>>>> proof-of-concept I
>>>>>>>>>>>>>> created
>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>> available here:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock
>>>>>>>>>>>>>> .
>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>>> basic
>>>>>>>>>>>>>>>>>>>>>> idea is that sources assign the current timestamp when
>>>>>>>>>> emitting
>>>>>>>>>>>>>>>>>>> elements.
>>>>>>>>>>>>>>>>>>>>>> They also periodically emit watermarks that tell us
>>>> that
>>>>>> no
>>>>>>>>>>>>>>> elements
>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>> an earlier timestamp will be emitted. The watermarks
>>>>>>>> propagate
>>>>>>>>>>>>>>>>> through
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> operators. The window operator looks at the timestamp
>>>> of
>>>>>> an
>>>>>>>>>>>>>> element
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> puts it into the buffer that corresponds to that
>>>> window.
>>>>>>>> When
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> window
>>>>>>>>>>>>>>>>>>>>>> operator receives a watermark it will look at the
>>>>>> in-flight
>>>>>>>>>>>>>> windows
>>>>>>>>>>>>>>>>>>>>>> (basically the buffers) and emit those windows where
>>>> the
>>>>>>>>>>>>>> window-end
>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>> before the watermark.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> For measuring throughput I did the following: The
>>>> source
>>>>>>>> emits
>>>>>>>>>>>>>>> tuples
>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>> the form ("tuple", 1) in an infinite loop. The window
>>>>>>>> operator
>>>>>>>>>>>>>> sums
>>>>>>>>>>>>>>>>> up
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> tuples, thereby counting how many tuples the window
>>>>>> operator
>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>> handle
>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>> a given time window. There are two different
>>>>>> implementations
>>>>>>>>>> for
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> summation: 1) simply summing up the values in a
>>>>>> mapWindow(),
>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>> get
>>>>>>>>>>>>>>>>>>>>>> a List of all tuples and simple iterate over it. 2)
>>>>> using
>>>>>>>>>>>> sum(1),
>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>> implemented as a reduce() (that uses the pre-reducer
>>>>>>>>>>>>>>> optimisations).
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> These are the performance numbers (Current is the
>>>>> current
>>>>>>>>>>>>>>>>>>> implementation,
>>>>>>>>>>>>>>>>>>>>>> Next is my proof-of-concept):
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Tumbling (1 sec):
>>>>>>>>>>>>>>>>>>>>>> - Current/Map: 1.6 mio
>>>>>>>>>>>>>>>>>>>>>> - Current/Reduce: 2 mio
>>>>>>>>>>>>>>>>>>>>>> - Next/Map: 2.2 mio
>>>>>>>>>>>>>>>>>>>>>> - Next/Reduce: 4 mio
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Sliding (5 sec, slide 1 sec):
>>>>>>>>>>>>>>>>>>>>>> - Current/Map: ca 3 mio (fluctuates a lot)
>>>>>>>>>>>>>>>>>>>>>> - Current/Reduce: No output
>>>>>>>>>>>>>>>>>>>>>> - Next/Map: ca 4 mio (fluctuates)
>>>>>>>>>>>>>>>>>>>>>> - Next/Reduce: 10 mio
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> The Next/Reduce variant can basically scale
>>>> indefinitely
>>>>>>>> with
>>>>>>>>>>>>>>> window
>>>>>>>>>>>>>>>>>>> size
>>>>>>>>>>>>>>>>>>>>>> because the internal state does not rely on the number
>>>>> of
>>>>>>>>>>>>>> elements
>>>>>>>>>>>>>>>>>>> (it is
>>>>>>>>>>>>>>>>>>>>>> just the current sum). The pre-reducer for sliding
>>>>>> elements
>>>>>>>>>>>>>> cannot
>>>>>>>>>>>>>>>>>>> handle
>>>>>>>>>>>>>>>>>>>>>> the amount of tuples, it produces no output. For the
>>>> two
>>>>>> Map
>>>>>>>>>>>>>>> variants
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> performance fluctuates because they always keep all
>>>> the
>>>>>>>>>> elements
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>> internal buffer before emission, this seems to tax the
>>>>>>>> garbage
>>>>>>>>>>>>>>>>>>> collector
>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>> bit and leads to random pauses.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> One thing that should be noted is that I had to
>>>> disable
>>>>>> the
>>>>>>>>>>>>>>>>>>> fake-element
>>>>>>>>>>>>>>>>>>>>>> emission thread, otherwise the Current versions would
>>>>>>>>>> deadlock.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> So, I started working on this because I thought that
>>>>>>>>>>>> out-of-order
>>>>>>>>>>>>>>>>>>>>>> processing would be necessary for correctness. And it
>>>> is
>>>>>>>>>>>>>> certainly,
>>>>>>>>>>>>>>>>>>> But
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> proof-of-concept also shows that performance can be
>>>>>> greatly
>>>>>>>>>>>>>>> improved.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <
>>>>>>>> gyula.f...@gmail.com
>>>>>>>>>>>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I agree lets separate these topics from each other so
>>>>> we
>>>>>>>> can
>>>>>>>>>>>> get
>>>>>>>>>>>>>>>>>>> faster
>>>>>>>>>>>>>>>>>>>>>>> resolution.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> There is already a state discussion in the thread we
>>>>>>>> started
>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>> Paris.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <
>>>>>>>>>>>>>>> ktzou...@apache.org
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I agree with supporting out-of-order out of the box
>>>>> :-),
>>>>>>>>>> even
>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>> means
>>>>>>>>>>>>>>>>>>>>>>>> a major refactoring. This is the right time to
>>>>> refactor
>>>>>>>> the
>>>>>>>>>>>>>>>>>>> streaming
>>>>>>>>>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>>>>>>>>> before we pull it out of beta. I think that this is
>>>>> more
>>>>>>>>>>>>>>> important
>>>>>>>>>>>>>>>>>>>>> than
>>>>>>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>>>>>> features in the streaming API, which can be
>>>>> prioritized
>>>>>>>> once
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>> out
>>>>>>>>>>>>>>>>>>>>>>>> of beta (meaning, that IMO this is the right time to
>>>>>> stall
>>>>>>>>>> PRs
>>>>>>>>>>>>>>>>>>> until
>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>> agree on the design).
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> There are three sections in the document: windowing,
>>>>>>>> state,
>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> API.
>>>>>>>>>>>>>>>>>>>>> How
>>>>>>>>>>>>>>>>>>>>>>>> convoluted are those with each other? Can we
>>>> separate
>>>>>> the
>>>>>>>>>>>>>>>>>>> discussion
>>>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>> do
>>>>>>>>>>>>>>>>>>>>>>>> we need to discuss those all together? I think part
>>>> of
>>>>>> the
>>>>>>>>>>>>>>>>>>> difficulty
>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>> that we are discussing three design choices at once.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <
>>>>>>>>>>>>>>>>>>> ted.dunn...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Out of order is ubiquitous in the real-world.
>>>>>> Typically,
>>>>>>>>>>>> what
>>>>>>>>>>>>>>>>>>>>>> happens is
>>>>>>>>>>>>>>>>>>>>>>>>> that businesses will declare a maximum allowable
>>>>> delay
>>>>>>>> for
>>>>>>>>>>>>>>>>>>> delayed
>>>>>>>>>>>>>>>>>>>>>>>>> transactions and will commit to results when that
>>>>> delay
>>>>>>>> is
>>>>>>>>>>>>>>>>>>> reached.
>>>>>>>>>>>>>>>>>>>>>>>>> Transactions that arrive later than this cutoff are
>>>>>>>>>> collected
>>>>>>>>>>>>>>>>>>>>>> specially
>>>>>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>>>> corrections which are reported/used when possible.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Clearly, ordering can also be violated during
>>>>>> processing,
>>>>>>>>>> but
>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>>>>>>>>>> is originally out of order the situation can't be
>>>>>>>> repaired
>>>>>>>>>> by
>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>>>>>>>>> protocol
>>>>>>>>>>>>>>>>>>>>>>>>> fixes that prevent transactions from becoming
>>>>>> disordered
>>>>>>>>>> but
>>>>>>>>>>>>>> has
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>> handled
>>>>>>>>>>>>>>>>>>>>>>>>> at the data level.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <
>>>>>>>>>>>>>>>>>>>>> aljos...@apache.org
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> I also don't like big changes but sometimes they
>>>> are
>>>>>>>>>>>>>> necessary.
>>>>>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>>>>>>> reason
>>>>>>>>>>>>>>>>>>>>>>>>>> why I'm so adamant about out-of-order processing
>>>> is
>>>>>> that
>>>>>>>>>>>>>>>>>>>>>> out-of-order
>>>>>>>>>>>>>>>>>>>>>>>>>> elements are not some exception that occurs once
>>>> in
>>>>> a
>>>>>>>>>> while;
>>>>>>>>>>>>>>>>>>> they
>>>>>>>>>>>>>>>>>>>>>> occur
>>>>>>>>>>>>>>>>>>>>>>>>>> constantly in a distributed system. For example,
>>>> in
>>>>>>>> this:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27
>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> resulting
>>>>>>>>>>>>>>>>>>>>>>>>>> windows
>>>>>>>>>>>>>>>>>>>>>>>>>> are completely bogus because the current windowing
>>>>>>>> system
>>>>>>>>>>>>>>>>>>> assumes
>>>>>>>>>>>>>>>>>>>>>>>>> elements
>>>>>>>>>>>>>>>>>>>>>>>>>> to globally arrive in order, which is simply not
>>>>> true.
>>>>>>>>>> (The
>>>>>>>>>>>>>>>>>>>>> example
>>>>>>>>>>>>>>>>>>>>>>>> has a
>>>>>>>>>>>>>>>>>>>>>>>>>> source that generates increasing integers. Then
>>>>> these
>>>>>>>> pass
>>>>>>>>>>>>>>>>>>>>> through a
>>>>>>>>>>>>>>>>>>>>>>>> map
>>>>>>>>>>>>>>>>>>>>>>>>>> and are unioned with the original DataStream
>>>> before
>>>>> a
>>>>>>>>>> window
>>>>>>>>>>>>>>>>>>>>>> operator.)
>>>>>>>>>>>>>>>>>>>>>>>>>> This simulates elements arriving from different
>>>>>>>> operators
>>>>>>>>>> at
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>> windowing
>>>>>>>>>>>>>>>>>>>>>>>>>> operator. The example is also DOP=1, I imagine
>>>> this
>>>>> to
>>>>>>>> get
>>>>>>>>>>>>>>>>>>> worse
>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>> higher DOP.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> What do you mean by costly? As I said, I have a
>>>>>>>>>>>>>>>>>>> proof-of-concept
>>>>>>>>>>>>>>>>>>>>>>>>> windowing
>>>>>>>>>>>>>>>>>>>>>>>>>> operator that can handle out-or-order elements.
>>>> This
>>>>>> is
>>>>>>>> an
>>>>>>>>>>>>>>>>>>> example
>>>>>>>>>>>>>>>>>>>>>>>> using
>>>>>>>>>>>>>>>>>>>>>>>>>> the current Flink API:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8
>>>>>> .
>>>>>>>>>>>>>>>>>>>>>>>>>> (It is an infinite source of tuples and a 5 second
>>>>>>>> window
>>>>>>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>> counts the tuples.) The first problem is that this
>>>>>> code
>>>>>>>>>>>>>>>>>>> deadlocks
>>>>>>>>>>>>>>>>>>>>>>>> because
>>>>>>>>>>>>>>>>>>>>>>>>>> of the thread that emits fake elements. If I
>>>> disable
>>>>>> the
>>>>>>>>>>>> fake
>>>>>>>>>>>>>>>>>>>>>> element
>>>>>>>>>>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>>>>>>>>>> it works, but the throughput using my mockup is 4
>>>>>> times
>>>>>>>>>>>>>> higher
>>>>>>>>>>>>>>>>>>> .
>>>>>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>>>>>> gap
>>>>>>>>>>>>>>>>>>>>>>>>>> widens dramatically if the window size increases.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> So, it actually increases performance (unless I'm
>>>>>>>> making a
>>>>>>>>>>>>>>>>>>> mistake
>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>>>>>>>>>> explorations) and can handle elements that arrive
>>>>>>>>>>>>>> out-of-order
>>>>>>>>>>>>>>>>>>>>>> (which
>>>>>>>>>>>>>>>>>>>>>>>>>> happens basically always in a real-world windowing
>>>>>>>>>>>>>> use-cases).
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <
>>>>>>>>>> se...@apache.org
>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> What I like a lot about Aljoscha's proposed
>>>> design
>>>>> is
>>>>>>>>>> that
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>> need no
>>>>>>>>>>>>>>>>>>>>>>>>>>> different code for "system time" vs. "event
>>>> time".
>>>>> It
>>>>>>>>>> only
>>>>>>>>>>>>>>>>>>>>>> differs in
>>>>>>>>>>>>>>>>>>>>>>>>>> where
>>>>>>>>>>>>>>>>>>>>>>>>>>> the timestamps are assigned.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> The OOP approach also gives you the semantics of
>>>>>> total
>>>>>>>>>>>>>>>>>>> ordering
>>>>>>>>>>>>>>>>>>>>>>>> without
>>>>>>>>>>>>>>>>>>>>>>>>>>> imposing merges on the streams.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J.
>>>> Sax <
>>>>>>>>>>>>>>>>>>>>>>>>>>> mj...@informatik.hu-berlin.de> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> I agree that there should be multiple
>>>> alternatives
>>>>>> the
>>>>>>>>>>>>>>>>>>> user(!)
>>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>>>>> choose from. Partial out-of-order processing
>>>> works
>>>>>> for
>>>>>>>>>>>>>>>>>>>>> many/most
>>>>>>>>>>>>>>>>>>>>>>>>>>>> aggregates. However, if you consider
>>>>>>>>>>>>>>>>>>> Event-Pattern-Matching,
>>>>>>>>>>>>>>>>>>>>>> global
>>>>>>>>>>>>>>>>>>>>>>>>>>>> ordering in necessary (even if the performance
>>>>>> penalty
>>>>>>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>> high).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> I would also keep "system-time windows" as an
>>>>>>>>>> alternative
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>> "source
>>>>>>>>>>>>>>>>>>>>>>>>>>>> assigned ts-windows".
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> It might also be interesting to consider the
>>>>>> following
>>>>>>>>>>>>>>>>>>> paper
>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>> overlapping windows: "Resource sharing in
>>>>> continuous
>>>>>>>>>>>>>>>>>>>>>> sliding-window
>>>>>>>>>>>>>>>>>>>>>>>>>>>> aggregates"
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think we should not block PRs unnecessarily
>>>> if
>>>>>> your
>>>>>>>>>>>>>>>>>>>>>> suggested
>>>>>>>>>>>>>>>>>>>>>>>>>> changes
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> might touch them at some point.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Also I still think we should not put everything
>>>>> in
>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> Datastream
>>>>>>>>>>>>>>>>>>>>>>>>>>> because
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it will be a huge mess.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Also we need to agree on the out of order
>>>>>> processing,
>>>>>>>>>>>>>>>>>>>>> whether
>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>> want
>>>>>>>>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the way you proposed it(which is quite costly).
>>>>>>>> Another
>>>>>>>>>>>>>>>>>>>>>>>> alternative
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> approach there which fits in the current
>>>>> windowing
>>>>>> is
>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> filter
>>>>>>>>>>>>>>>>>>>>>>>> out
>>>>>>>>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> order events and apply a special handling
>>>>> operator
>>>>>> on
>>>>>>>>>>>>>>>>>>> them.
>>>>>>>>>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fairly lightweight.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> My point is that we need to consider some
>>>>>> alternative
>>>>>>>>>>>>>>>>>>>>>> solutions.
>>>>>>>>>>>>>>>>>>>>>>>>> And
>>>>>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should not block contributions along the way.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Gyula
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha
>>>> Krettek
>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>> aljos...@apache.org>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The reason I posted this now is that we need
>>>> to
>>>>>>>> think
>>>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> windowing before proceeding with the PRs of
>>>>> Gabor
>>>>>>>>>>>>>>>>>>> (inverse
>>>>>>>>>>>>>>>>>>>>>>>> reduce)
>>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Gyula (removal of "aggregate" functions on
>>>>>>>>>> DataStream).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For the windowing, I think that the current
>>>>> model
>>>>>>>> does
>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>> work
>>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> out-of-order processing. Therefore, the whole
>>>>>>>>>> windowing
>>>>>>>>>>>>>>>>>>>>>>>>>> infrastructure
>>>>>>>>>>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> basically have to be redone. Meaning also that
>>>>> any
>>>>>>>>>> work
>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> pre-aggregators or optimizations that we do
>>>> now
>>>>>>>>>> becomes
>>>>>>>>>>>>>>>>>>>>>> useless.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For the API, I proposed to restructure the
>>>>>>>>>> interactions
>>>>>>>>>>>>>>>>>>>>>> between
>>>>>>>>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> different *DataStream classes and
>>>>>>>> grouping/windowing.
>>>>>>>>>>>>>>>>>>> (See
>>>>>>>>>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>>>>>>>>>>> section
>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the doc I posted.)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <
>>>>>>>>>>>>>>>>>>>>> gyula.f...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the nice summary, this is a very
>>>>> good
>>>>>>>>>>>>>>>>>>>>> initiative.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I added some comments to the respective
>>>>> sections
>>>>>>>>>>>>>>>>>>> (where I
>>>>>>>>>>>>>>>>>>>>>> didnt
>>>>>>>>>>>>>>>>>>>>>>>>>> fully
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> agree
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> :).).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> At some point I think it would be good to
>>>> have
>>>>> a
>>>>>>>>>> public
>>>>>>>>>>>>>>>>>>>>>> hangout
>>>>>>>>>>>>>>>>>>>>>>>>>>> session
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this, which could make a more dynamic
>>>>> discussion.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Gyula
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> ezt
>>>>> írta
>>>>>>>>>>>>>>>>>>> (időpont:
>>>>>>>>>>>>>>>>>>>>>>>> 2015.
>>>>>>>>>>>>>>>>>>>>>>>>>> jún.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 22.,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> H, 21:34):
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with people proposing changes to the
>>>> streaming
>>>>>>>> part
>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>>>>>>>>>>> wanted
>>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> throw
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> my hat into the ring. :D
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> During the last few months, while I was
>>>>> getting
>>>>>>>>>>>>>>>>>>>>> acquainted
>>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> streaming system, I wrote down some
>>>> thoughts I
>>>>>> had
>>>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>>>>>>>>>> things
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be improved. Hopefully, they are in somewhat
>>>>>>>>>> coherent
>>>>>>>>>>>>>>>>>>>>> shape
>>>>>>>>>>>>>>>>>>>>>>>> now,
>>>>>>>>>>>>>>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> please
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have a look if you are interested in this:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This mostly covers:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Timestamps assigned at sources
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Out-of-order processing of elements in
>>>>> window
>>>>>>>>>>>>>>>>>>>>> operators
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - API design
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Please let me know what you think. Comment
>>>> in
>>>>>> the
>>>>>>>>>>>>>>>>>>>>> document
>>>>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>>>>> here
>>>>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mailing list.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have a PR in the makings that would
>>>>> introduce
>>>>>>>>>> source
>>>>>>>>>>>>>>>>>>>>>>>>> timestamps
>>>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> watermarks for keeping track of them. I also
>>>>>>>> hacked
>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>>> proof-of-concept
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> windowing system that is able to process
>>>>>>>>>> out-of-order
>>>>>>>>>>>>>>>>>>>>>> elements
>>>>>>>>>>>>>>>>>>>>>>>>>>> using a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FlatMap operator. (It uses panes to perform
>>>>>>>>>> efficient
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> pre-aggregations.)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to