@Matthias: Your point of working with a minimal number of clear concepts is
desirable to say the least. :)

The reasoning behind the KeyedDatastream is to associate Flink persisted
operator state with the keys of the data that produced it, so that stateful
computation becomes scalabe in the future. This should not be tied to the
GroupedDataStream, especially not if we are removing the option to create
groups without windows as proposed on the Wiki by Stephan.

On Sat, Jun 27, 2015 at 4:15 PM, Matthias J. Sax <
mj...@informatik.hu-berlin.de> wrote:

> This was more a conceptual point-of-view argument. From an
> implementation point of view, skipping the window building step is a
> good idea if a tumbling 1-tuple-size window is detected.
>
> I prefer to work with a minimum number of concepts (and apply internal
> optimization if possible) instead of using redundant concepts for
> special cases. Of course, this is my personal point of view.
>
> -Matthias
>
> On 06/27/2015 03:47 PM, Aljoscha Krettek wrote:
> > What do you mean by Comment 2? Using the whole window apparatus if you
> just
> > want to have, for example, a simple partitioned filter with partitioned
> > state seems a bit extravagant.
> >
> > On Sat, 27 Jun 2015 at 15:19 Matthias J. Sax <
> mj...@informatik.hu-berlin.de>
> > wrote:
> >
> >> Nice starting point.
> >>
> >> Comment 1:
> >> "Each individual stream partition delivers elements strictly in order."
> >> (in 'Parallel Streams, Partitions, Time, and Ordering')
> >>
> >> I would say "FIFO" and not "strictly in order". If data is not emitted
> >> in-order, the stream partition will not be in-order either.
> >>
> >> Comment 2:
> >> Why do we need KeyedDataStream. You can get everything done with
> >> GroupedDataStream (using a tumbling window of size = 1 tuple).
> >>
> >>
> >> -Matthias
> >>
> >> On 06/26/2015 07:42 PM, Stephan Ewen wrote:
> >>> Here is a first bit of what I have been writing down. Will add more
> over
> >>> the next days:
> >>>
> >>>
> >>> https://cwiki.apache.org/confluence/display/FLINK/Stream+Windows
> >>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/Parallel+Streams%2C+Partitions%2C+Time%2C+and+Ordering
> >>>
> >>>
> >>>
> >>> On Thu, Jun 25, 2015 at 6:35 PM, Paris Carbone <par...@kth.se> wrote:
> >>>
> >>>> +1 for writing this down
> >>>>
> >>>>> On 25 Jun 2015, at 18:11, Aljoscha Krettek <aljos...@apache.org>
> >> wrote:
> >>>>>
> >>>>> +1 go ahead
> >>>>>
> >>>>> On Thu, 25 Jun 2015 at 18:02 Stephan Ewen <se...@apache.org> wrote:
> >>>>>
> >>>>>> Hey!
> >>>>>>
> >>>>>> This thread covers many different topics. Lets break this up into
> >>>> separate
> >>>>>> discussions.
> >>>>>>
> >>>>>> - Operator State is already driven by Gyula and Paris and happening
> on
> >>>> the
> >>>>>> above mentioned pull request and the followup discussions.
> >>>>>>
> >>>>>> - For windowing, this discussion has brought some results that we
> >> should
> >>>>>> sum up and clearly write down.
> >>>>>>   I would like to chime in to do that based on what I learned from
> the
> >>>>>> document and this discussion. I also got some input from Marton
> about
> >>>> what
> >>>>>> he learned from mapping the Cloud DataFlow constructs to Flink.
> >>>>>>   I'll draft a Wiki page (with the help of Aljoscha, Marton) that
> sums
> >>>>>> this up and documents it for users (if we decide to adopt this).
> >>>>>>   Then we run this by Gyula, Matthias Sax and Kostas for feedback.
> >>>>>>
> >>>>>> - API style discussions should be yet another thread. This will
> >> probably
> >>>>>> be opened as people start to address that.
> >>>>>>
> >>>>>>
> >>>>>> I'll try to get a draft of the wiki version out tomorrow noon and
> send
> >>>> the
> >>>>>> link around.
> >>>>>>
> >>>>>> Greetings,
> >>>>>> Stephan
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Thu, Jun 25, 2015 at 3:51 PM, Matthias J. Sax <
> >>>>>> mj...@informatik.hu-berlin.de> wrote:
> >>>>>>
> >>>>>>> Sure. I picked this up. Using the current model for "occurrence
> time
> >>>>>>> semantics" does not work.
> >>>>>>>
> >>>>>>> I elaborated on this in the past many times (but nobody cared). It
> is
> >>>>>>> important to make it clear to the user what semantics are
> supported.
> >>>>>>> Claiming to support "sliding windows" doesn't mean anything; there
> >> are
> >>>>>>> too many different semantics out there. :)
> >>>>>>>
> >>>>>>>
> >>>>>>> On 06/25/2015 03:35 PM, Aljoscha Krettek wrote:
> >>>>>>>> Yes, I am aware of this requirement and it would also be supported
> >> in
> >>>>>> my
> >>>>>>>> proposed model.
> >>>>>>>>
> >>>>>>>> The problem is, that the "custom timestamp" feature gives the
> >>>>>> impression
> >>>>>>>> that the elements would be windowed according to a user-timestamp.
> >> The
> >>>>>>>> results, however, are wrong because of the assumption about
> elements
> >>>>>>>> arriving in order. (This is what I was trying to show with my
> fancy
> >>>>>> ASCII
> >>>>>>>> art and result output.
> >>>>>>>>
> >>>>>>>> On Thu, 25 Jun 2015 at 15:26 Matthias J. Sax <
> >>>>>>> mj...@informatik.hu-berlin.de>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Aljoscha,
> >>>>>>>>>
> >>>>>>>>> I like that you are pushing in this direction. However, IMHO you
> >>>>>>>>> misinterpreter the current approach. It does not assume that
> tuples
> >>>>>>>>> arrive in-order; the current approach has no notion about a
> >>>>>>>>> pre-defined-order (for example, the order in which the event are
> >>>>>>>>> created). There is only the notion of "arrival-order" at the
> >>>> operator.
> >>>>>>>>> From this "arrival-order" perspective, the result are correct(!).
> >>>>>>>>>
> >>>>>>>>> Windowing in the current approach means for example, "sum up an
> >>>>>>>>> attribute of all events you *received* in the last 5 seconds".
> That
> >>>>>> is a
> >>>>>>>>> different meaning that "sum up an attribute of all event that
> >>>>>> *occurred*
> >>>>>>>>> in the last 5 seconds". Both queries are valid and Flink should
> >>>>>> support
> >>>>>>>>> both IMHO.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -Matthias
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 06/25/2015 03:03 PM, Aljoscha Krettek wrote:
> >>>>>>>>>> Yes, now this also processes about 3 mio Elements (Window Size 5
> >>>> sec,
> >>>>>>>>> Slide
> >>>>>>>>>> 1 sec) but it still fluctuates a lot between 1 mio. and 5 mio.
> >>>>>>>>>>
> >>>>>>>>>> Performance is not my main concern, however. My concern is that
> >> the
> >>>>>>>>> current
> >>>>>>>>>> model assumes elements to arrive in order, which is simply not
> >> true.
> >>>>>>>>>>
> >>>>>>>>>> In your code you have these lines for specifying the window:
> >>>>>>>>>> .window(Time.of(1l, TimeUnit.SECONDS))
> >>>>>>>>>> .every(Time.of(1l, TimeUnit.SECONDS))
> >>>>>>>>>>
> >>>>>>>>>> Although this semantically specifies a tumbling window of size 1
> >> sec
> >>>>>>> I'm
> >>>>>>>>>> afraid it uses the sliding window logic internally (because of
> the
> >>>>>>>>>> .every()).
> >>>>>>>>>>
> >>>>>>>>>> In my tests I only have the first line.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <gga...@gmail.com>
> >> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> I'm very sorry, I had a bug in the InversePreReducer. It should
> >> be
> >>>>>>>>>>> fixed now. Can you please run it again?
> >>>>>>>>>>>
> >>>>>>>>>>> I also tried to reproduce some of your performance numbers, but
> >> I'm
> >>>>>>>>>>> getting only less than 1/10th of yours. For example, in the
> >>>> Tumbling
> >>>>>>>>>>> case, Current/Reduce produces only ~100000 for me. Do you have
> >> any
> >>>>>>>>>>> idea what I could be doing wrong? My code:
> >>>>>>>>>>> http://pastebin.com/zbEjmGhk
> >>>>>>>>>>> I am running it on a 2 GHz Core i7.
> >>>>>>>>>>>
> >>>>>>>>>>> Best regards,
> >>>>>>>>>>> Gabor
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <
> aljos...@apache.org
> >>> :
> >>>>>>>>>>>> Hi,
> >>>>>>>>>>>> I also ran the tests on top of PR 856 (inverse reducer) now.
> The
> >>>>>>>>> results
> >>>>>>>>>>>> seem incorrect. When I insert a Thread.sleep(1) in the tuple
> >>>>>> source,
> >>>>>>>>> all
> >>>>>>>>>>>> the previous tests reported around 3600 tuples (Size 5 sec,
> >> Slide
> >>>> 1
> >>>>>>>>> sec)
> >>>>>>>>>>>> (Theoretically there would be 5000 tuples in 5 seconds but
> this
> >> is
> >>>>>>> due
> >>>>>>>>> to
> >>>>>>>>>>>> overhead). These are the results for the inverse reduce
> >>>>>> optimisation:
> >>>>>>>>>>>> (Tuple 0,38)
> >>>>>>>>>>>> (Tuple 0,829)
> >>>>>>>>>>>> (Tuple 0,1625)
> >>>>>>>>>>>> (Tuple 0,2424)
> >>>>>>>>>>>> (Tuple 0,3190)
> >>>>>>>>>>>> (Tuple 0,3198)
> >>>>>>>>>>>> (Tuple 0,-339368)
> >>>>>>>>>>>> (Tuple 0,-1315725)
> >>>>>>>>>>>> (Tuple 0,-2932932)
> >>>>>>>>>>>> (Tuple 0,-5082735)
> >>>>>>>>>>>> (Tuple 0,-7743256)
> >>>>>>>>>>>> (Tuple 0,75701046)
> >>>>>>>>>>>> (Tuple 0,642829470)
> >>>>>>>>>>>> (Tuple 0,2242018381)
> >>>>>>>>>>>> (Tuple 0,5190708618)
> >>>>>>>>>>>> (Tuple 0,10060360311)
> >>>>>>>>>>>> (Tuple 0,-94254951)
> >>>>>>>>>>>> (Tuple 0,-219806321293)
> >>>>>>>>>>>> (Tuple 0,-1258895232699)
> >>>>>>>>>>>> (Tuple 0,-4074432596329)
> >>>>>>>>>>>>
> >>>>>>>>>>>> One line is one emitted window count. This is what happens
> when
> >> I
> >>>>>>>>> remove
> >>>>>>>>>>>> the Thread.sleep(1):
> >>>>>>>>>>>> (Tuple 0,660676)
> >>>>>>>>>>>> (Tuple 0,2553733)
> >>>>>>>>>>>> (Tuple 0,3542696)
> >>>>>>>>>>>> (Tuple 0,1)
> >>>>>>>>>>>> (Tuple 0,1107035)
> >>>>>>>>>>>> (Tuple 0,2549491)
> >>>>>>>>>>>> (Tuple 0,4100387)
> >>>>>>>>>>>> (Tuple 0,-8406583360092)
> >>>>>>>>>>>> (Tuple 0,-8406582150743)
> >>>>>>>>>>>> (Tuple 0,-8406580427190)
> >>>>>>>>>>>> (Tuple 0,-8406580427190)
> >>>>>>>>>>>> (Tuple 0,-8406580427190)
> >>>>>>>>>>>> (Tuple 0,6847279255682044995)
> >>>>>>>>>>>> (Tuple 0,6847279255682044995)
> >>>>>>>>>>>> (Tuple 0,-5390528042713628318)
> >>>>>>>>>>>> (Tuple 0,-5390528042711551780)
> >>>>>>>>>>>> (Tuple 0,-5390528042711551780)
> >>>>>>>>>>>>
> >>>>>>>>>>>> So at some point the pre-reducer seems to go haywire and does
> >> not
> >>>>>>>>> recover
> >>>>>>>>>>>> from it. The good thing is that it does produce results now,
> >> where
> >>>>>>> the
> >>>>>>>>>>>> previous Current/Reduce would simply hang and not produce any
> >>>>>> output.
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <gga...@gmail.com>
> >>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hello,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Aljoscha, can you please try the performance test of
> >>>>>> Current/Reduce
> >>>>>>>>>>>>> with the InversePreReducer in PR 856? (If you just call sum,
> it
> >>>>>> will
> >>>>>>>>>>>>> use an InversePreReducer.) It would be an interesting test,
> >>>>>> because
> >>>>>>>>>>>>> the inverse function optimization really depends on the
> stream
> >>>>>> being
> >>>>>>>>>>>>> ordered, and I think it has the potential of being faster
> then
> >>>>>>>>>>>>> Next/Reduce. Especially if the window size is much larger
> than
> >>>> the
> >>>>>>>>>>>>> slide size.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best regards,
> >>>>>>>>>>>>> Gabor
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <
> >> aljos...@apache.org
> >>>>>>> :
> >>>>>>>>>>>>>> I think I'll have to elaborate a bit so I created a
> >>>>>>> proof-of-concept
> >>>>>>>>>>>>>> implementation of my Ideas and ran some throughput
> >> measurements
> >>>>>> to
> >>>>>>>>>>>>>> alleviate concerns about performance.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> First, though, I want to highlight again why the current
> >>>> approach
> >>>>>>>>> does
> >>>>>>>>>>>>> not
> >>>>>>>>>>>>>> work with out-of-order elements (which, again, occur
> >> constantly
> >>>>>> due
> >>>>>>>>> to
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>> distributed nature of the system). This is the example I
> >> posted
> >>>>>>>>>>> earlier:
> >>>>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27. The
> >> plan
> >>>>>>>>> looks
> >>>>>>>>>>>>> like
> >>>>>>>>>>>>>> this:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> +--+
> >>>>>>>>>>>>>> | | Source
> >>>>>>>>>>>>>> +--+
> >>>>>>>>>>>>>> |
> >>>>>>>>>>>>>> +-----+
> >>>>>>>>>>>>>> | |
> >>>>>>>>>>>>>> | +--+
> >>>>>>>>>>>>>> | | | Identity Map
> >>>>>>>>>>>>>> | +--+
> >>>>>>>>>>>>>> | |
> >>>>>>>>>>>>>> +-----+
> >>>>>>>>>>>>>> |
> >>>>>>>>>>>>>> +--+
> >>>>>>>>>>>>>> | | Window
> >>>>>>>>>>>>>> +--+
> >>>>>>>>>>>>>> |
> >>>>>>>>>>>>>> |
> >>>>>>>>>>>>>> +--+
> >>>>>>>>>>>>>> | | Sink
> >>>>>>>>>>>>>> +--+
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> So all it does is pass the elements through an identity map
> >> and
> >>>>>>> then
> >>>>>>>>>>>>> merge
> >>>>>>>>>>>>>> them again before the window operator. The source emits
> >>>> ascending
> >>>>>>>>>>>>> integers
> >>>>>>>>>>>>>> and the window operator has a custom timestamp extractor
> that
> >>>>>> uses
> >>>>>>>>> the
> >>>>>>>>>>>>>> integer itself as the timestamp and should create windows of
> >>>>>> size 4
> >>>>>>>>>>> (that
> >>>>>>>>>>>>>> is elements with timestamp 0-3 are one window, the next are
> >> the
> >>>>>>>>>>> elements
> >>>>>>>>>>>>>> with timestamp 4-8, and so on). Since the topology basically
> >>>>>>> doubles
> >>>>>>>>>>> the
> >>>>>>>>>>>>>> elements form the source I would expect to get these
> windows:
> >>>>>>>>>>>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3
> >>>>>>>>>>>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The output is this, however:
> >>>>>>>>>>>>>> Window: 0, 1, 2, 3,
> >>>>>>>>>>>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7,
> >>>>>>>>>>>>>> Window: 8, 9, 10, 11,
> >>>>>>>>>>>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
> >>>>>>>>>>>>>> Window: 16, 17, 18, 19,
> >>>>>>>>>>>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
> >>>>>>>>>>>>>> Window: 24, 25, 26, 27,
> >>>>>>>>>>>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The reason is that the elements simply arrive out-of-order.
> >>>>>> Imagine
> >>>>>>>>>>> what
> >>>>>>>>>>>>>> would happen if the elements actually arrived with some
> delay
> >>>>>> from
> >>>>>>>>>>>>>> different operations.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Now, on to the performance numbers. The proof-of-concept I
> >>>>>> created
> >>>>>>> is
> >>>>>>>>>>>>>> available here:
> >>>>>>>>>>>>>>
> >>>> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock
> >>>>>> .
> >>>>>>>>> The
> >>>>>>>>>>>>> basic
> >>>>>>>>>>>>>> idea is that sources assign the current timestamp when
> >> emitting
> >>>>>>>>>>> elements.
> >>>>>>>>>>>>>> They also periodically emit watermarks that tell us that no
> >>>>>>> elements
> >>>>>>>>>>> with
> >>>>>>>>>>>>>> an earlier timestamp will be emitted. The watermarks
> propagate
> >>>>>>>>> through
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>> operators. The window operator looks at the timestamp of an
> >>>>>> element
> >>>>>>>>>>> and
> >>>>>>>>>>>>>> puts it into the buffer that corresponds to that window.
> When
> >>>> the
> >>>>>>>>>>> window
> >>>>>>>>>>>>>> operator receives a watermark it will look at the in-flight
> >>>>>> windows
> >>>>>>>>>>>>>> (basically the buffers) and emit those windows where the
> >>>>>> window-end
> >>>>>>>>> is
> >>>>>>>>>>>>>> before the watermark.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> For measuring throughput I did the following: The source
> emits
> >>>>>>> tuples
> >>>>>>>>>>> of
> >>>>>>>>>>>>>> the form ("tuple", 1) in an infinite loop. The window
> operator
> >>>>>> sums
> >>>>>>>>> up
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>> tuples, thereby counting how many tuples the window operator
> >> can
> >>>>>>>>>>> handle
> >>>>>>>>>>>>> in
> >>>>>>>>>>>>>> a given time window. There are two different implementations
> >> for
> >>>>>>> the
> >>>>>>>>>>>>>> summation: 1) simply summing up the values in a mapWindow(),
> >>>>>> there
> >>>>>>>>> you
> >>>>>>>>>>>>> get
> >>>>>>>>>>>>>> a List of all tuples and simple iterate over it. 2) using
> >>>> sum(1),
> >>>>>>>>>>> which
> >>>>>>>>>>>>> is
> >>>>>>>>>>>>>> implemented as a reduce() (that uses the pre-reducer
> >>>>>>> optimisations).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> These are the performance numbers (Current is the current
> >>>>>>>>>>> implementation,
> >>>>>>>>>>>>>> Next is my proof-of-concept):
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Tumbling (1 sec):
> >>>>>>>>>>>>>> - Current/Map: 1.6 mio
> >>>>>>>>>>>>>> - Current/Reduce: 2 mio
> >>>>>>>>>>>>>> - Next/Map: 2.2 mio
> >>>>>>>>>>>>>> - Next/Reduce: 4 mio
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Sliding (5 sec, slide 1 sec):
> >>>>>>>>>>>>>> - Current/Map: ca 3 mio (fluctuates a lot)
> >>>>>>>>>>>>>> - Current/Reduce: No output
> >>>>>>>>>>>>>> - Next/Map: ca 4 mio (fluctuates)
> >>>>>>>>>>>>>> - Next/Reduce: 10 mio
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The Next/Reduce variant can basically scale indefinitely
> with
> >>>>>>> window
> >>>>>>>>>>> size
> >>>>>>>>>>>>>> because the internal state does not rely on the number of
> >>>>>> elements
> >>>>>>>>>>> (it is
> >>>>>>>>>>>>>> just the current sum). The pre-reducer for sliding elements
> >>>>>> cannot
> >>>>>>>>>>> handle
> >>>>>>>>>>>>>> the amount of tuples, it produces no output. For the two Map
> >>>>>>> variants
> >>>>>>>>>>> the
> >>>>>>>>>>>>>> performance fluctuates because they always keep all the
> >> elements
> >>>>>> in
> >>>>>>>>> an
> >>>>>>>>>>>>>> internal buffer before emission, this seems to tax the
> garbage
> >>>>>>>>>>> collector
> >>>>>>>>>>>>> a
> >>>>>>>>>>>>>> bit and leads to random pauses.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> One thing that should be noted is that I had to disable the
> >>>>>>>>>>> fake-element
> >>>>>>>>>>>>>> emission thread, otherwise the Current versions would
> >> deadlock.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> So, I started working on this because I thought that
> >>>> out-of-order
> >>>>>>>>>>>>>> processing would be necessary for correctness. And it is
> >>>>>> certainly,
> >>>>>>>>>>> But
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>> proof-of-concept also shows that performance can be greatly
> >>>>>>> improved.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <
> gyula.f...@gmail.com
> >>>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I agree lets separate these topics from each other so we
> can
> >>>> get
> >>>>>>>>>>> faster
> >>>>>>>>>>>>>>> resolution.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> There is already a state discussion in the thread we
> started
> >>>>>> with
> >>>>>>>>>>> Paris.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <
> >>>>>>> ktzou...@apache.org
> >>>>>>>>>>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I agree with supporting out-of-order out of the box :-),
> >> even
> >>>>>> if
> >>>>>>>>>>> this
> >>>>>>>>>>>>>> means
> >>>>>>>>>>>>>>>> a major refactoring. This is the right time to refactor
> the
> >>>>>>>>>>> streaming
> >>>>>>>>>>>>>> API
> >>>>>>>>>>>>>>>> before we pull it out of beta. I think that this is more
> >>>>>>> important
> >>>>>>>>>>>>> than
> >>>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>> features in the streaming API, which can be prioritized
> once
> >>>>>> the
> >>>>>>>>>>> API
> >>>>>>>>>>>>> is
> >>>>>>>>>>>>>> out
> >>>>>>>>>>>>>>>> of beta (meaning, that IMO this is the right time to stall
> >> PRs
> >>>>>>>>>>> until
> >>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>> agree on the design).
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> There are three sections in the document: windowing,
> state,
> >>>> and
> >>>>>>>>>>> API.
> >>>>>>>>>>>>> How
> >>>>>>>>>>>>>>>> convoluted are those with each other? Can we separate the
> >>>>>>>>>>> discussion
> >>>>>>>>>>>>> or
> >>>>>>>>>>>>>> do
> >>>>>>>>>>>>>>>> we need to discuss those all together? I think part of the
> >>>>>>>>>>> difficulty
> >>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>> that we are discussing three design choices at once.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <
> >>>>>>>>>>> ted.dunn...@gmail.com>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Out of order is ubiquitous in the real-world.  Typically,
> >>>> what
> >>>>>>>>>>>>>> happens is
> >>>>>>>>>>>>>>>>> that businesses will declare a maximum allowable delay
> for
> >>>>>>>>>>> delayed
> >>>>>>>>>>>>>>>>> transactions and will commit to results when that delay
> is
> >>>>>>>>>>> reached.
> >>>>>>>>>>>>>>>>> Transactions that arrive later than this cutoff are
> >> collected
> >>>>>>>>>>>>>> specially
> >>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>> corrections which are reported/used when possible.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Clearly, ordering can also be violated during processing,
> >> but
> >>>>>> if
> >>>>>>>>>>> the
> >>>>>>>>>>>>>> data
> >>>>>>>>>>>>>>>>> is originally out of order the situation can't be
> repaired
> >> by
> >>>>>>> any
> >>>>>>>>>>>>>>>> protocol
> >>>>>>>>>>>>>>>>> fixes that prevent transactions from becoming disordered
> >> but
> >>>>>> has
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>>> handled
> >>>>>>>>>>>>>>>>> at the data level.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <
> >>>>>>>>>>>>> aljos...@apache.org
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I also don't like big changes but sometimes they are
> >>>>>> necessary.
> >>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>> reason
> >>>>>>>>>>>>>>>>>> why I'm so adamant about out-of-order processing is that
> >>>>>>>>>>>>>> out-of-order
> >>>>>>>>>>>>>>>>>> elements are not some exception that occurs once in a
> >> while;
> >>>>>>>>>>> they
> >>>>>>>>>>>>>> occur
> >>>>>>>>>>>>>>>>>> constantly in a distributed system. For example, in
> this:
> >>>>>>>>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27
> the
> >>>>>>>>>>>>> resulting
> >>>>>>>>>>>>>>>>>> windows
> >>>>>>>>>>>>>>>>>> are completely bogus because the current windowing
> system
> >>>>>>>>>>> assumes
> >>>>>>>>>>>>>>>>> elements
> >>>>>>>>>>>>>>>>>> to globally arrive in order, which is simply not true.
> >> (The
> >>>>>>>>>>>>> example
> >>>>>>>>>>>>>>>> has a
> >>>>>>>>>>>>>>>>>> source that generates increasing integers. Then these
> pass
> >>>>>>>>>>>>> through a
> >>>>>>>>>>>>>>>> map
> >>>>>>>>>>>>>>>>>> and are unioned with the original DataStream before a
> >> window
> >>>>>>>>>>>>>> operator.)
> >>>>>>>>>>>>>>>>>> This simulates elements arriving from different
> operators
> >> at
> >>>>>> a
> >>>>>>>>>>>>>>>> windowing
> >>>>>>>>>>>>>>>>>> operator. The example is also DOP=1, I imagine this to
> get
> >>>>>>>>>>> worse
> >>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>> higher DOP.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> What do you mean by costly? As I said, I have a
> >>>>>>>>>>> proof-of-concept
> >>>>>>>>>>>>>>>>> windowing
> >>>>>>>>>>>>>>>>>> operator that can handle out-or-order elements. This is
> an
> >>>>>>>>>>> example
> >>>>>>>>>>>>>>>> using
> >>>>>>>>>>>>>>>>>> the current Flink API:
> >>>>>>>>>>>>>>>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
> >>>>>>>>>>>>>>>>>> (It is an infinite source of tuples and a 5 second
> window
> >>>>>>>>>>> operator
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>> counts the tuples.) The first problem is that this code
> >>>>>>>>>>> deadlocks
> >>>>>>>>>>>>>>>> because
> >>>>>>>>>>>>>>>>>> of the thread that emits fake elements. If I disable the
> >>>> fake
> >>>>>>>>>>>>>> element
> >>>>>>>>>>>>>>>>> code
> >>>>>>>>>>>>>>>>>> it works, but the throughput using my mockup is 4 times
> >>>>>> higher
> >>>>>>>>>>> .
> >>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>> gap
> >>>>>>>>>>>>>>>>>> widens dramatically if the window size increases.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> So, it actually increases performance (unless I'm
> making a
> >>>>>>>>>>> mistake
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>> my
> >>>>>>>>>>>>>>>>>> explorations) and can handle elements that arrive
> >>>>>> out-of-order
> >>>>>>>>>>>>>> (which
> >>>>>>>>>>>>>>>>>> happens basically always in a real-world windowing
> >>>>>> use-cases).
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <
> >> se...@apache.org
> >>>>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> What I like a lot about Aljoscha's proposed design is
> >> that
> >>>>>> we
> >>>>>>>>>>>>>> need no
> >>>>>>>>>>>>>>>>>>> different code for "system time" vs. "event time". It
> >> only
> >>>>>>>>>>>>>> differs in
> >>>>>>>>>>>>>>>>>> where
> >>>>>>>>>>>>>>>>>>> the timestamps are assigned.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> The OOP approach also gives you the semantics of total
> >>>>>>>>>>> ordering
> >>>>>>>>>>>>>>>> without
> >>>>>>>>>>>>>>>>>>> imposing merges on the streams.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
> >>>>>>>>>>>>>>>>>>> mj...@informatik.hu-berlin.de> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I agree that there should be multiple alternatives the
> >>>>>>>>>>> user(!)
> >>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>> choose from. Partial out-of-order processing works for
> >>>>>>>>>>>>> many/most
> >>>>>>>>>>>>>>>>>>>> aggregates. However, if you consider
> >>>>>>>>>>> Event-Pattern-Matching,
> >>>>>>>>>>>>>> global
> >>>>>>>>>>>>>>>>>>>> ordering in necessary (even if the performance penalty
> >>>>>>>>>>> might
> >>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>> high).
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I would also keep "system-time windows" as an
> >> alternative
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>>> "source
> >>>>>>>>>>>>>>>>>>>> assigned ts-windows".
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> It might also be interesting to consider the following
> >>>>>>>>>>> paper
> >>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> overlapping windows: "Resource sharing in continuous
> >>>>>>>>>>>>>> sliding-window
> >>>>>>>>>>>>>>>>>>>> aggregates"
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote:
> >>>>>>>>>>>>>>>>>>>>> Hey
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I think we should not block PRs unnecessarily if your
> >>>>>>>>>>>>>> suggested
> >>>>>>>>>>>>>>>>>> changes
> >>>>>>>>>>>>>>>>>>>>> might touch them at some point.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Also I still think we should not put everything in
> the
> >>>>>>>>>>>>>> Datastream
> >>>>>>>>>>>>>>>>>>> because
> >>>>>>>>>>>>>>>>>>>>> it will be a huge mess.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Also we need to agree on the out of order processing,
> >>>>>>>>>>>>> whether
> >>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>> want
> >>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>> the way you proposed it(which is quite costly).
> Another
> >>>>>>>>>>>>>>>> alternative
> >>>>>>>>>>>>>>>>>>>>> approach there which fits in the current windowing is
> >> to
> >>>>>>>>>>>>>> filter
> >>>>>>>>>>>>>>>> out
> >>>>>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>>>> order events and apply a special handling operator on
> >>>>>>>>>>> them.
> >>>>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>> fairly lightweight.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> My point is that we need to consider some alternative
> >>>>>>>>>>>>>> solutions.
> >>>>>>>>>>>>>>>>> And
> >>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>> should not block contributions along the way.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Cheers
> >>>>>>>>>>>>>>>>>>>>> Gyula
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <
> >>>>>>>>>>>>>>>>>> aljos...@apache.org>
> >>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> The reason I posted this now is that we need to
> think
> >>>>>>>>>>> about
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> API
> >>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>> windowing before proceeding with the PRs of Gabor
> >>>>>>>>>>> (inverse
> >>>>>>>>>>>>>>>> reduce)
> >>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>> Gyula (removal of "aggregate" functions on
> >> DataStream).
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> For the windowing, I think that the current model
> does
> >>>>>>>>>>> not
> >>>>>>>>>>>>>> work
> >>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>> out-of-order processing. Therefore, the whole
> >> windowing
> >>>>>>>>>>>>>>>>>> infrastructure
> >>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>>> basically have to be redone. Meaning also that any
> >> work
> >>>>>>>>>>> on
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> pre-aggregators or optimizations that we do now
> >> becomes
> >>>>>>>>>>>>>> useless.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> For the API, I proposed to restructure the
> >> interactions
> >>>>>>>>>>>>>> between
> >>>>>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> different *DataStream classes and
> grouping/windowing.
> >>>>>>>>>>> (See
> >>>>>>>>>>>>>> API
> >>>>>>>>>>>>>>>>>> section
> >>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>> the doc I posted.)
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <
> >>>>>>>>>>>>> gyula.f...@gmail.com
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Hi Aljoscha,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thanks for the nice summary, this is a very good
> >>>>>>>>>>>>> initiative.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I added some comments to the respective sections
> >>>>>>>>>>> (where I
> >>>>>>>>>>>>>> didnt
> >>>>>>>>>>>>>>>>>> fully
> >>>>>>>>>>>>>>>>>>>>>> agree
> >>>>>>>>>>>>>>>>>>>>>>> :).).
> >>>>>>>>>>>>>>>>>>>>>>> At some point I think it would be good to have a
> >> public
> >>>>>>>>>>>>>> hangout
> >>>>>>>>>>>>>>>>>>> session
> >>>>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>>> this, which could make a more dynamic discussion.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>>>>> Gyula
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> ezt írta
> >>>>>>>>>>> (időpont:
> >>>>>>>>>>>>>>>> 2015.
> >>>>>>>>>>>>>>>>>> jún.
> >>>>>>>>>>>>>>>>>>>>>> 22.,
> >>>>>>>>>>>>>>>>>>>>>>> H, 21:34):
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>>>>>>> with people proposing changes to the streaming
> part
> >> I
> >>>>>>>>>>>>> also
> >>>>>>>>>>>>>>>>> wanted
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>> throw
> >>>>>>>>>>>>>>>>>>>>>>>> my hat into the ring. :D
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> During the last few months, while I was getting
> >>>>>>>>>>>>> acquainted
> >>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> streaming system, I wrote down some thoughts I had
> >>>>>>>>>>> about
> >>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>> things
> >>>>>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>>>>> be improved. Hopefully, they are in somewhat
> >> coherent
> >>>>>>>>>>>>> shape
> >>>>>>>>>>>>>>>> now,
> >>>>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>>>>>>> please
> >>>>>>>>>>>>>>>>>>>>>>>> have a look if you are interested in this:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> This mostly covers:
> >>>>>>>>>>>>>>>>>>>>>>>> - Timestamps assigned at sources
> >>>>>>>>>>>>>>>>>>>>>>>> - Out-of-order processing of elements in window
> >>>>>>>>>>>>> operators
> >>>>>>>>>>>>>>>>>>>>>>>> - API design
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Please let me know what you think. Comment in the
> >>>>>>>>>>>>> document
> >>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>> here
> >>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> mailing list.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> I have a PR in the makings that would introduce
> >> source
> >>>>>>>>>>>>>>>>> timestamps
> >>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>> watermarks for keeping track of them. I also
> hacked
> >> a
> >>>>>>>>>>>>>>>>>>> proof-of-concept
> >>>>>>>>>>>>>>>>>>>>>>> of a
> >>>>>>>>>>>>>>>>>>>>>>>> windowing system that is able to process
> >> out-of-order
> >>>>>>>>>>>>>> elements
> >>>>>>>>>>>>>>>>>>> using a
> >>>>>>>>>>>>>>>>>>>>>>>> FlatMap operator. (It uses panes to perform
> >> efficient
> >>>>>>>>>>>>>>>>>>>>>> pre-aggregations.)
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>>>>>> Aljoscha
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to