This was more a conceptual point-of-view argument. From an
implementation point of view, skipping the window building step is a
good idea if a tumbling 1-tuple-size window is detected.

I prefer to work with a minimum number of concepts (and apply internal
optimization if possible) instead of using redundant concepts for
special cases. Of course, this is my personal point of view.

-Matthias

On 06/27/2015 03:47 PM, Aljoscha Krettek wrote:
> What do you mean by Comment 2? Using the whole window apparatus if you just
> want to have, for example, a simple partitioned filter with partitioned
> state seems a bit extravagant.
> 
> On Sat, 27 Jun 2015 at 15:19 Matthias J. Sax <mj...@informatik.hu-berlin.de>
> wrote:
> 
>> Nice starting point.
>>
>> Comment 1:
>> "Each individual stream partition delivers elements strictly in order."
>> (in 'Parallel Streams, Partitions, Time, and Ordering')
>>
>> I would say "FIFO" and not "strictly in order". If data is not emitted
>> in-order, the stream partition will not be in-order either.
>>
>> Comment 2:
>> Why do we need KeyedDataStream. You can get everything done with
>> GroupedDataStream (using a tumbling window of size = 1 tuple).
>>
>>
>> -Matthias
>>
>> On 06/26/2015 07:42 PM, Stephan Ewen wrote:
>>> Here is a first bit of what I have been writing down. Will add more over
>>> the next days:
>>>
>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/Stream+Windows
>>>
>>>
>> https://cwiki.apache.org/confluence/display/FLINK/Parallel+Streams%2C+Partitions%2C+Time%2C+and+Ordering
>>>
>>>
>>>
>>> On Thu, Jun 25, 2015 at 6:35 PM, Paris Carbone <par...@kth.se> wrote:
>>>
>>>> +1 for writing this down
>>>>
>>>>> On 25 Jun 2015, at 18:11, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>>>>
>>>>> +1 go ahead
>>>>>
>>>>> On Thu, 25 Jun 2015 at 18:02 Stephan Ewen <se...@apache.org> wrote:
>>>>>
>>>>>> Hey!
>>>>>>
>>>>>> This thread covers many different topics. Lets break this up into
>>>> separate
>>>>>> discussions.
>>>>>>
>>>>>> - Operator State is already driven by Gyula and Paris and happening on
>>>> the
>>>>>> above mentioned pull request and the followup discussions.
>>>>>>
>>>>>> - For windowing, this discussion has brought some results that we
>> should
>>>>>> sum up and clearly write down.
>>>>>>   I would like to chime in to do that based on what I learned from the
>>>>>> document and this discussion. I also got some input from Marton about
>>>> what
>>>>>> he learned from mapping the Cloud DataFlow constructs to Flink.
>>>>>>   I'll draft a Wiki page (with the help of Aljoscha, Marton) that sums
>>>>>> this up and documents it for users (if we decide to adopt this).
>>>>>>   Then we run this by Gyula, Matthias Sax and Kostas for feedback.
>>>>>>
>>>>>> - API style discussions should be yet another thread. This will
>> probably
>>>>>> be opened as people start to address that.
>>>>>>
>>>>>>
>>>>>> I'll try to get a draft of the wiki version out tomorrow noon and send
>>>> the
>>>>>> link around.
>>>>>>
>>>>>> Greetings,
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 25, 2015 at 3:51 PM, Matthias J. Sax <
>>>>>> mj...@informatik.hu-berlin.de> wrote:
>>>>>>
>>>>>>> Sure. I picked this up. Using the current model for "occurrence time
>>>>>>> semantics" does not work.
>>>>>>>
>>>>>>> I elaborated on this in the past many times (but nobody cared). It is
>>>>>>> important to make it clear to the user what semantics are supported.
>>>>>>> Claiming to support "sliding windows" doesn't mean anything; there
>> are
>>>>>>> too many different semantics out there. :)
>>>>>>>
>>>>>>>
>>>>>>> On 06/25/2015 03:35 PM, Aljoscha Krettek wrote:
>>>>>>>> Yes, I am aware of this requirement and it would also be supported
>> in
>>>>>> my
>>>>>>>> proposed model.
>>>>>>>>
>>>>>>>> The problem is, that the "custom timestamp" feature gives the
>>>>>> impression
>>>>>>>> that the elements would be windowed according to a user-timestamp.
>> The
>>>>>>>> results, however, are wrong because of the assumption about elements
>>>>>>>> arriving in order. (This is what I was trying to show with my fancy
>>>>>> ASCII
>>>>>>>> art and result output.
>>>>>>>>
>>>>>>>> On Thu, 25 Jun 2015 at 15:26 Matthias J. Sax <
>>>>>>> mj...@informatik.hu-berlin.de>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Aljoscha,
>>>>>>>>>
>>>>>>>>> I like that you are pushing in this direction. However, IMHO you
>>>>>>>>> misinterpreter the current approach. It does not assume that tuples
>>>>>>>>> arrive in-order; the current approach has no notion about a
>>>>>>>>> pre-defined-order (for example, the order in which the event are
>>>>>>>>> created). There is only the notion of "arrival-order" at the
>>>> operator.
>>>>>>>>> From this "arrival-order" perspective, the result are correct(!).
>>>>>>>>>
>>>>>>>>> Windowing in the current approach means for example, "sum up an
>>>>>>>>> attribute of all events you *received* in the last 5 seconds". That
>>>>>> is a
>>>>>>>>> different meaning that "sum up an attribute of all event that
>>>>>> *occurred*
>>>>>>>>> in the last 5 seconds". Both queries are valid and Flink should
>>>>>> support
>>>>>>>>> both IMHO.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 06/25/2015 03:03 PM, Aljoscha Krettek wrote:
>>>>>>>>>> Yes, now this also processes about 3 mio Elements (Window Size 5
>>>> sec,
>>>>>>>>> Slide
>>>>>>>>>> 1 sec) but it still fluctuates a lot between 1 mio. and 5 mio.
>>>>>>>>>>
>>>>>>>>>> Performance is not my main concern, however. My concern is that
>> the
>>>>>>>>> current
>>>>>>>>>> model assumes elements to arrive in order, which is simply not
>> true.
>>>>>>>>>>
>>>>>>>>>> In your code you have these lines for specifying the window:
>>>>>>>>>> .window(Time.of(1l, TimeUnit.SECONDS))
>>>>>>>>>> .every(Time.of(1l, TimeUnit.SECONDS))
>>>>>>>>>>
>>>>>>>>>> Although this semantically specifies a tumbling window of size 1
>> sec
>>>>>>> I'm
>>>>>>>>>> afraid it uses the sliding window logic internally (because of the
>>>>>>>>>> .every()).
>>>>>>>>>>
>>>>>>>>>> In my tests I only have the first line.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <gga...@gmail.com>
>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'm very sorry, I had a bug in the InversePreReducer. It should
>> be
>>>>>>>>>>> fixed now. Can you please run it again?
>>>>>>>>>>>
>>>>>>>>>>> I also tried to reproduce some of your performance numbers, but
>> I'm
>>>>>>>>>>> getting only less than 1/10th of yours. For example, in the
>>>> Tumbling
>>>>>>>>>>> case, Current/Reduce produces only ~100000 for me. Do you have
>> any
>>>>>>>>>>> idea what I could be doing wrong? My code:
>>>>>>>>>>> http://pastebin.com/zbEjmGhk
>>>>>>>>>>> I am running it on a 2 GHz Core i7.
>>>>>>>>>>>
>>>>>>>>>>> Best regards,
>>>>>>>>>>> Gabor
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <aljos...@apache.org
>>> :
>>>>>>>>>>>> Hi,
>>>>>>>>>>>> I also ran the tests on top of PR 856 (inverse reducer) now. The
>>>>>>>>> results
>>>>>>>>>>>> seem incorrect. When I insert a Thread.sleep(1) in the tuple
>>>>>> source,
>>>>>>>>> all
>>>>>>>>>>>> the previous tests reported around 3600 tuples (Size 5 sec,
>> Slide
>>>> 1
>>>>>>>>> sec)
>>>>>>>>>>>> (Theoretically there would be 5000 tuples in 5 seconds but this
>> is
>>>>>>> due
>>>>>>>>> to
>>>>>>>>>>>> overhead). These are the results for the inverse reduce
>>>>>> optimisation:
>>>>>>>>>>>> (Tuple 0,38)
>>>>>>>>>>>> (Tuple 0,829)
>>>>>>>>>>>> (Tuple 0,1625)
>>>>>>>>>>>> (Tuple 0,2424)
>>>>>>>>>>>> (Tuple 0,3190)
>>>>>>>>>>>> (Tuple 0,3198)
>>>>>>>>>>>> (Tuple 0,-339368)
>>>>>>>>>>>> (Tuple 0,-1315725)
>>>>>>>>>>>> (Tuple 0,-2932932)
>>>>>>>>>>>> (Tuple 0,-5082735)
>>>>>>>>>>>> (Tuple 0,-7743256)
>>>>>>>>>>>> (Tuple 0,75701046)
>>>>>>>>>>>> (Tuple 0,642829470)
>>>>>>>>>>>> (Tuple 0,2242018381)
>>>>>>>>>>>> (Tuple 0,5190708618)
>>>>>>>>>>>> (Tuple 0,10060360311)
>>>>>>>>>>>> (Tuple 0,-94254951)
>>>>>>>>>>>> (Tuple 0,-219806321293)
>>>>>>>>>>>> (Tuple 0,-1258895232699)
>>>>>>>>>>>> (Tuple 0,-4074432596329)
>>>>>>>>>>>>
>>>>>>>>>>>> One line is one emitted window count. This is what happens when
>> I
>>>>>>>>> remove
>>>>>>>>>>>> the Thread.sleep(1):
>>>>>>>>>>>> (Tuple 0,660676)
>>>>>>>>>>>> (Tuple 0,2553733)
>>>>>>>>>>>> (Tuple 0,3542696)
>>>>>>>>>>>> (Tuple 0,1)
>>>>>>>>>>>> (Tuple 0,1107035)
>>>>>>>>>>>> (Tuple 0,2549491)
>>>>>>>>>>>> (Tuple 0,4100387)
>>>>>>>>>>>> (Tuple 0,-8406583360092)
>>>>>>>>>>>> (Tuple 0,-8406582150743)
>>>>>>>>>>>> (Tuple 0,-8406580427190)
>>>>>>>>>>>> (Tuple 0,-8406580427190)
>>>>>>>>>>>> (Tuple 0,-8406580427190)
>>>>>>>>>>>> (Tuple 0,6847279255682044995)
>>>>>>>>>>>> (Tuple 0,6847279255682044995)
>>>>>>>>>>>> (Tuple 0,-5390528042713628318)
>>>>>>>>>>>> (Tuple 0,-5390528042711551780)
>>>>>>>>>>>> (Tuple 0,-5390528042711551780)
>>>>>>>>>>>>
>>>>>>>>>>>> So at some point the pre-reducer seems to go haywire and does
>> not
>>>>>>>>> recover
>>>>>>>>>>>> from it. The good thing is that it does produce results now,
>> where
>>>>>>> the
>>>>>>>>>>>> previous Current/Reduce would simply hang and not produce any
>>>>>> output.
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <gga...@gmail.com>
>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Aljoscha, can you please try the performance test of
>>>>>> Current/Reduce
>>>>>>>>>>>>> with the InversePreReducer in PR 856? (If you just call sum, it
>>>>>> will
>>>>>>>>>>>>> use an InversePreReducer.) It would be an interesting test,
>>>>>> because
>>>>>>>>>>>>> the inverse function optimization really depends on the stream
>>>>>> being
>>>>>>>>>>>>> ordered, and I think it has the potential of being faster then
>>>>>>>>>>>>> Next/Reduce. Especially if the window size is much larger than
>>>> the
>>>>>>>>>>>>> slide size.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>> Gabor
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <
>> aljos...@apache.org
>>>>>>> :
>>>>>>>>>>>>>> I think I'll have to elaborate a bit so I created a
>>>>>>> proof-of-concept
>>>>>>>>>>>>>> implementation of my Ideas and ran some throughput
>> measurements
>>>>>> to
>>>>>>>>>>>>>> alleviate concerns about performance.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> First, though, I want to highlight again why the current
>>>> approach
>>>>>>>>> does
>>>>>>>>>>>>> not
>>>>>>>>>>>>>> work with out-of-order elements (which, again, occur
>> constantly
>>>>>> due
>>>>>>>>> to
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> distributed nature of the system). This is the example I
>> posted
>>>>>>>>>>> earlier:
>>>>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27. The
>> plan
>>>>>>>>> looks
>>>>>>>>>>>>> like
>>>>>>>>>>>>>> this:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> +--+
>>>>>>>>>>>>>> | | Source
>>>>>>>>>>>>>> +--+
>>>>>>>>>>>>>> |
>>>>>>>>>>>>>> +-----+
>>>>>>>>>>>>>> | |
>>>>>>>>>>>>>> | +--+
>>>>>>>>>>>>>> | | | Identity Map
>>>>>>>>>>>>>> | +--+
>>>>>>>>>>>>>> | |
>>>>>>>>>>>>>> +-----+
>>>>>>>>>>>>>> |
>>>>>>>>>>>>>> +--+
>>>>>>>>>>>>>> | | Window
>>>>>>>>>>>>>> +--+
>>>>>>>>>>>>>> |
>>>>>>>>>>>>>> |
>>>>>>>>>>>>>> +--+
>>>>>>>>>>>>>> | | Sink
>>>>>>>>>>>>>> +--+
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> So all it does is pass the elements through an identity map
>> and
>>>>>>> then
>>>>>>>>>>>>> merge
>>>>>>>>>>>>>> them again before the window operator. The source emits
>>>> ascending
>>>>>>>>>>>>> integers
>>>>>>>>>>>>>> and the window operator has a custom timestamp extractor that
>>>>>> uses
>>>>>>>>> the
>>>>>>>>>>>>>> integer itself as the timestamp and should create windows of
>>>>>> size 4
>>>>>>>>>>> (that
>>>>>>>>>>>>>> is elements with timestamp 0-3 are one window, the next are
>> the
>>>>>>>>>>> elements
>>>>>>>>>>>>>> with timestamp 4-8, and so on). Since the topology basically
>>>>>>> doubles
>>>>>>>>>>> the
>>>>>>>>>>>>>> elements form the source I would expect to get these windows:
>>>>>>>>>>>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3
>>>>>>>>>>>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The output is this, however:
>>>>>>>>>>>>>> Window: 0, 1, 2, 3,
>>>>>>>>>>>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7,
>>>>>>>>>>>>>> Window: 8, 9, 10, 11,
>>>>>>>>>>>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
>>>>>>>>>>>>>> Window: 16, 17, 18, 19,
>>>>>>>>>>>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
>>>>>>>>>>>>>> Window: 24, 25, 26, 27,
>>>>>>>>>>>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The reason is that the elements simply arrive out-of-order.
>>>>>> Imagine
>>>>>>>>>>> what
>>>>>>>>>>>>>> would happen if the elements actually arrived with some delay
>>>>>> from
>>>>>>>>>>>>>> different operations.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Now, on to the performance numbers. The proof-of-concept I
>>>>>> created
>>>>>>> is
>>>>>>>>>>>>>> available here:
>>>>>>>>>>>>>>
>>>> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock
>>>>>> .
>>>>>>>>> The
>>>>>>>>>>>>> basic
>>>>>>>>>>>>>> idea is that sources assign the current timestamp when
>> emitting
>>>>>>>>>>> elements.
>>>>>>>>>>>>>> They also periodically emit watermarks that tell us that no
>>>>>>> elements
>>>>>>>>>>> with
>>>>>>>>>>>>>> an earlier timestamp will be emitted. The watermarks propagate
>>>>>>>>> through
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> operators. The window operator looks at the timestamp of an
>>>>>> element
>>>>>>>>>>> and
>>>>>>>>>>>>>> puts it into the buffer that corresponds to that window. When
>>>> the
>>>>>>>>>>> window
>>>>>>>>>>>>>> operator receives a watermark it will look at the in-flight
>>>>>> windows
>>>>>>>>>>>>>> (basically the buffers) and emit those windows where the
>>>>>> window-end
>>>>>>>>> is
>>>>>>>>>>>>>> before the watermark.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For measuring throughput I did the following: The source emits
>>>>>>> tuples
>>>>>>>>>>> of
>>>>>>>>>>>>>> the form ("tuple", 1) in an infinite loop. The window operator
>>>>>> sums
>>>>>>>>> up
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> tuples, thereby counting how many tuples the window operator
>> can
>>>>>>>>>>> handle
>>>>>>>>>>>>> in
>>>>>>>>>>>>>> a given time window. There are two different implementations
>> for
>>>>>>> the
>>>>>>>>>>>>>> summation: 1) simply summing up the values in a mapWindow(),
>>>>>> there
>>>>>>>>> you
>>>>>>>>>>>>> get
>>>>>>>>>>>>>> a List of all tuples and simple iterate over it. 2) using
>>>> sum(1),
>>>>>>>>>>> which
>>>>>>>>>>>>> is
>>>>>>>>>>>>>> implemented as a reduce() (that uses the pre-reducer
>>>>>>> optimisations).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> These are the performance numbers (Current is the current
>>>>>>>>>>> implementation,
>>>>>>>>>>>>>> Next is my proof-of-concept):
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Tumbling (1 sec):
>>>>>>>>>>>>>> - Current/Map: 1.6 mio
>>>>>>>>>>>>>> - Current/Reduce: 2 mio
>>>>>>>>>>>>>> - Next/Map: 2.2 mio
>>>>>>>>>>>>>> - Next/Reduce: 4 mio
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Sliding (5 sec, slide 1 sec):
>>>>>>>>>>>>>> - Current/Map: ca 3 mio (fluctuates a lot)
>>>>>>>>>>>>>> - Current/Reduce: No output
>>>>>>>>>>>>>> - Next/Map: ca 4 mio (fluctuates)
>>>>>>>>>>>>>> - Next/Reduce: 10 mio
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The Next/Reduce variant can basically scale indefinitely with
>>>>>>> window
>>>>>>>>>>> size
>>>>>>>>>>>>>> because the internal state does not rely on the number of
>>>>>> elements
>>>>>>>>>>> (it is
>>>>>>>>>>>>>> just the current sum). The pre-reducer for sliding elements
>>>>>> cannot
>>>>>>>>>>> handle
>>>>>>>>>>>>>> the amount of tuples, it produces no output. For the two Map
>>>>>>> variants
>>>>>>>>>>> the
>>>>>>>>>>>>>> performance fluctuates because they always keep all the
>> elements
>>>>>> in
>>>>>>>>> an
>>>>>>>>>>>>>> internal buffer before emission, this seems to tax the garbage
>>>>>>>>>>> collector
>>>>>>>>>>>>> a
>>>>>>>>>>>>>> bit and leads to random pauses.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> One thing that should be noted is that I had to disable the
>>>>>>>>>>> fake-element
>>>>>>>>>>>>>> emission thread, otherwise the Current versions would
>> deadlock.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> So, I started working on this because I thought that
>>>> out-of-order
>>>>>>>>>>>>>> processing would be necessary for correctness. And it is
>>>>>> certainly,
>>>>>>>>>>> But
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> proof-of-concept also shows that performance can be greatly
>>>>>>> improved.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <gyula.f...@gmail.com
>>>
>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I agree lets separate these topics from each other so we can
>>>> get
>>>>>>>>>>> faster
>>>>>>>>>>>>>>> resolution.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> There is already a state discussion in the thread we started
>>>>>> with
>>>>>>>>>>> Paris.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <
>>>>>>> ktzou...@apache.org
>>>>>>>>>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I agree with supporting out-of-order out of the box :-),
>> even
>>>>>> if
>>>>>>>>>>> this
>>>>>>>>>>>>>> means
>>>>>>>>>>>>>>>> a major refactoring. This is the right time to refactor the
>>>>>>>>>>> streaming
>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>> before we pull it out of beta. I think that this is more
>>>>>>> important
>>>>>>>>>>>>> than
>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>> features in the streaming API, which can be prioritized once
>>>>>> the
>>>>>>>>>>> API
>>>>>>>>>>>>> is
>>>>>>>>>>>>>> out
>>>>>>>>>>>>>>>> of beta (meaning, that IMO this is the right time to stall
>> PRs
>>>>>>>>>>> until
>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>> agree on the design).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> There are three sections in the document: windowing, state,
>>>> and
>>>>>>>>>>> API.
>>>>>>>>>>>>> How
>>>>>>>>>>>>>>>> convoluted are those with each other? Can we separate the
>>>>>>>>>>> discussion
>>>>>>>>>>>>> or
>>>>>>>>>>>>>> do
>>>>>>>>>>>>>>>> we need to discuss those all together? I think part of the
>>>>>>>>>>> difficulty
>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> that we are discussing three design choices at once.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <
>>>>>>>>>>> ted.dunn...@gmail.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Out of order is ubiquitous in the real-world.  Typically,
>>>> what
>>>>>>>>>>>>>> happens is
>>>>>>>>>>>>>>>>> that businesses will declare a maximum allowable delay for
>>>>>>>>>>> delayed
>>>>>>>>>>>>>>>>> transactions and will commit to results when that delay is
>>>>>>>>>>> reached.
>>>>>>>>>>>>>>>>> Transactions that arrive later than this cutoff are
>> collected
>>>>>>>>>>>>>> specially
>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>> corrections which are reported/used when possible.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Clearly, ordering can also be violated during processing,
>> but
>>>>>> if
>>>>>>>>>>> the
>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>> is originally out of order the situation can't be repaired
>> by
>>>>>>> any
>>>>>>>>>>>>>>>> protocol
>>>>>>>>>>>>>>>>> fixes that prevent transactions from becoming disordered
>> but
>>>>>> has
>>>>>>>>>>> to
>>>>>>>>>>>>>>>> handled
>>>>>>>>>>>>>>>>> at the data level.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <
>>>>>>>>>>>>> aljos...@apache.org
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I also don't like big changes but sometimes they are
>>>>>> necessary.
>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>> reason
>>>>>>>>>>>>>>>>>> why I'm so adamant about out-of-order processing is that
>>>>>>>>>>>>>> out-of-order
>>>>>>>>>>>>>>>>>> elements are not some exception that occurs once in a
>> while;
>>>>>>>>>>> they
>>>>>>>>>>>>>> occur
>>>>>>>>>>>>>>>>>> constantly in a distributed system. For example, in this:
>>>>>>>>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 the
>>>>>>>>>>>>> resulting
>>>>>>>>>>>>>>>>>> windows
>>>>>>>>>>>>>>>>>> are completely bogus because the current windowing system
>>>>>>>>>>> assumes
>>>>>>>>>>>>>>>>> elements
>>>>>>>>>>>>>>>>>> to globally arrive in order, which is simply not true.
>> (The
>>>>>>>>>>>>> example
>>>>>>>>>>>>>>>> has a
>>>>>>>>>>>>>>>>>> source that generates increasing integers. Then these pass
>>>>>>>>>>>>> through a
>>>>>>>>>>>>>>>> map
>>>>>>>>>>>>>>>>>> and are unioned with the original DataStream before a
>> window
>>>>>>>>>>>>>> operator.)
>>>>>>>>>>>>>>>>>> This simulates elements arriving from different operators
>> at
>>>>>> a
>>>>>>>>>>>>>>>> windowing
>>>>>>>>>>>>>>>>>> operator. The example is also DOP=1, I imagine this to get
>>>>>>>>>>> worse
>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>> higher DOP.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> What do you mean by costly? As I said, I have a
>>>>>>>>>>> proof-of-concept
>>>>>>>>>>>>>>>>> windowing
>>>>>>>>>>>>>>>>>> operator that can handle out-or-order elements. This is an
>>>>>>>>>>> example
>>>>>>>>>>>>>>>> using
>>>>>>>>>>>>>>>>>> the current Flink API:
>>>>>>>>>>>>>>>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
>>>>>>>>>>>>>>>>>> (It is an infinite source of tuples and a 5 second window
>>>>>>>>>>> operator
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>> counts the tuples.) The first problem is that this code
>>>>>>>>>>> deadlocks
>>>>>>>>>>>>>>>> because
>>>>>>>>>>>>>>>>>> of the thread that emits fake elements. If I disable the
>>>> fake
>>>>>>>>>>>>>> element
>>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>> it works, but the throughput using my mockup is 4 times
>>>>>> higher
>>>>>>>>>>> .
>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>> gap
>>>>>>>>>>>>>>>>>> widens dramatically if the window size increases.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> So, it actually increases performance (unless I'm making a
>>>>>>>>>>> mistake
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>> explorations) and can handle elements that arrive
>>>>>> out-of-order
>>>>>>>>>>>>>> (which
>>>>>>>>>>>>>>>>>> happens basically always in a real-world windowing
>>>>>> use-cases).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <
>> se...@apache.org
>>>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> What I like a lot about Aljoscha's proposed design is
>> that
>>>>>> we
>>>>>>>>>>>>>> need no
>>>>>>>>>>>>>>>>>>> different code for "system time" vs. "event time". It
>> only
>>>>>>>>>>>>>> differs in
>>>>>>>>>>>>>>>>>> where
>>>>>>>>>>>>>>>>>>> the timestamps are assigned.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The OOP approach also gives you the semantics of total
>>>>>>>>>>> ordering
>>>>>>>>>>>>>>>> without
>>>>>>>>>>>>>>>>>>> imposing merges on the streams.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
>>>>>>>>>>>>>>>>>>> mj...@informatik.hu-berlin.de> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I agree that there should be multiple alternatives the
>>>>>>>>>>> user(!)
>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>> choose from. Partial out-of-order processing works for
>>>>>>>>>>>>> many/most
>>>>>>>>>>>>>>>>>>>> aggregates. However, if you consider
>>>>>>>>>>> Event-Pattern-Matching,
>>>>>>>>>>>>>> global
>>>>>>>>>>>>>>>>>>>> ordering in necessary (even if the performance penalty
>>>>>>>>>>> might
>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> high).
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I would also keep "system-time windows" as an
>> alternative
>>>>>>>>>>> to
>>>>>>>>>>>>>>>> "source
>>>>>>>>>>>>>>>>>>>> assigned ts-windows".
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> It might also be interesting to consider the following
>>>>>>>>>>> paper
>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>> overlapping windows: "Resource sharing in continuous
>>>>>>>>>>>>>> sliding-window
>>>>>>>>>>>>>>>>>>>> aggregates"
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote:
>>>>>>>>>>>>>>>>>>>>> Hey
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I think we should not block PRs unnecessarily if your
>>>>>>>>>>>>>> suggested
>>>>>>>>>>>>>>>>>> changes
>>>>>>>>>>>>>>>>>>>>> might touch them at some point.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Also I still think we should not put everything in the
>>>>>>>>>>>>>> Datastream
>>>>>>>>>>>>>>>>>>> because
>>>>>>>>>>>>>>>>>>>>> it will be a huge mess.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Also we need to agree on the out of order processing,
>>>>>>>>>>>>> whether
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>> want
>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>> the way you proposed it(which is quite costly). Another
>>>>>>>>>>>>>>>> alternative
>>>>>>>>>>>>>>>>>>>>> approach there which fits in the current windowing is
>> to
>>>>>>>>>>>>>> filter
>>>>>>>>>>>>>>>> out
>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>> order events and apply a special handling operator on
>>>>>>>>>>> them.
>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>> fairly lightweight.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> My point is that we need to consider some alternative
>>>>>>>>>>>>>> solutions.
>>>>>>>>>>>>>>>>> And
>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>> should not block contributions along the way.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Cheers
>>>>>>>>>>>>>>>>>>>>> Gyula
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <
>>>>>>>>>>>>>>>>>> aljos...@apache.org>
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> The reason I posted this now is that we need to think
>>>>>>>>>>> about
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> windowing before proceeding with the PRs of Gabor
>>>>>>>>>>> (inverse
>>>>>>>>>>>>>>>> reduce)
>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> Gyula (removal of "aggregate" functions on
>> DataStream).
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> For the windowing, I think that the current model does
>>>>>>>>>>> not
>>>>>>>>>>>>>> work
>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>> out-of-order processing. Therefore, the whole
>> windowing
>>>>>>>>>>>>>>>>>> infrastructure
>>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>> basically have to be redone. Meaning also that any
>> work
>>>>>>>>>>> on
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> pre-aggregators or optimizations that we do now
>> becomes
>>>>>>>>>>>>>> useless.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> For the API, I proposed to restructure the
>> interactions
>>>>>>>>>>>>>> between
>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> different *DataStream classes and grouping/windowing.
>>>>>>>>>>> (See
>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>>> section
>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>> the doc I posted.)
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <
>>>>>>>>>>>>> gyula.f...@gmail.com
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Thanks for the nice summary, this is a very good
>>>>>>>>>>>>> initiative.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I added some comments to the respective sections
>>>>>>>>>>> (where I
>>>>>>>>>>>>>> didnt
>>>>>>>>>>>>>>>>>> fully
>>>>>>>>>>>>>>>>>>>>>> agree
>>>>>>>>>>>>>>>>>>>>>>> :).).
>>>>>>>>>>>>>>>>>>>>>>> At some point I think it would be good to have a
>> public
>>>>>>>>>>>>>> hangout
>>>>>>>>>>>>>>>>>>> session
>>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>>> this, which could make a more dynamic discussion.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>> Gyula
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> ezt írta
>>>>>>>>>>> (időpont:
>>>>>>>>>>>>>>>> 2015.
>>>>>>>>>>>>>>>>>> jún.
>>>>>>>>>>>>>>>>>>>>>> 22.,
>>>>>>>>>>>>>>>>>>>>>>> H, 21:34):
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>> with people proposing changes to the streaming part
>> I
>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>>> wanted
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>> throw
>>>>>>>>>>>>>>>>>>>>>>>> my hat into the ring. :D
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> During the last few months, while I was getting
>>>>>>>>>>>>> acquainted
>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> streaming system, I wrote down some thoughts I had
>>>>>>>>>>> about
>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>> things
>>>>>>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>>>>> be improved. Hopefully, they are in somewhat
>> coherent
>>>>>>>>>>>>> shape
>>>>>>>>>>>>>>>> now,
>>>>>>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>>>>>>>>> please
>>>>>>>>>>>>>>>>>>>>>>>> have a look if you are interested in this:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> This mostly covers:
>>>>>>>>>>>>>>>>>>>>>>>> - Timestamps assigned at sources
>>>>>>>>>>>>>>>>>>>>>>>> - Out-of-order processing of elements in window
>>>>>>>>>>>>> operators
>>>>>>>>>>>>>>>>>>>>>>>> - API design
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Please let me know what you think. Comment in the
>>>>>>>>>>>>> document
>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>> here
>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> mailing list.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I have a PR in the makings that would introduce
>> source
>>>>>>>>>>>>>>>>> timestamps
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>> watermarks for keeping track of them. I also hacked
>> a
>>>>>>>>>>>>>>>>>>> proof-of-concept
>>>>>>>>>>>>>>>>>>>>>>> of a
>>>>>>>>>>>>>>>>>>>>>>>> windowing system that is able to process
>> out-of-order
>>>>>>>>>>>>>> elements
>>>>>>>>>>>>>>>>>>> using a
>>>>>>>>>>>>>>>>>>>>>>>> FlatMap operator. (It uses panes to perform
>> efficient
>>>>>>>>>>>>>>>>>>>>>> pre-aggregations.)
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to