What do you mean by Comment 2? Using the whole window apparatus if you just
want to have, for example, a simple partitioned filter with partitioned
state seems a bit extravagant.

On Sat, 27 Jun 2015 at 15:19 Matthias J. Sax <mj...@informatik.hu-berlin.de>
wrote:

> Nice starting point.
>
> Comment 1:
> "Each individual stream partition delivers elements strictly in order."
> (in 'Parallel Streams, Partitions, Time, and Ordering')
>
> I would say "FIFO" and not "strictly in order". If data is not emitted
> in-order, the stream partition will not be in-order either.
>
> Comment 2:
> Why do we need KeyedDataStream. You can get everything done with
> GroupedDataStream (using a tumbling window of size = 1 tuple).
>
>
> -Matthias
>
> On 06/26/2015 07:42 PM, Stephan Ewen wrote:
> > Here is a first bit of what I have been writing down. Will add more over
> > the next days:
> >
> >
> > https://cwiki.apache.org/confluence/display/FLINK/Stream+Windows
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Parallel+Streams%2C+Partitions%2C+Time%2C+and+Ordering
> >
> >
> >
> > On Thu, Jun 25, 2015 at 6:35 PM, Paris Carbone <par...@kth.se> wrote:
> >
> >> +1 for writing this down
> >>
> >>> On 25 Jun 2015, at 18:11, Aljoscha Krettek <aljos...@apache.org>
> wrote:
> >>>
> >>> +1 go ahead
> >>>
> >>> On Thu, 25 Jun 2015 at 18:02 Stephan Ewen <se...@apache.org> wrote:
> >>>
> >>>> Hey!
> >>>>
> >>>> This thread covers many different topics. Lets break this up into
> >> separate
> >>>> discussions.
> >>>>
> >>>> - Operator State is already driven by Gyula and Paris and happening on
> >> the
> >>>> above mentioned pull request and the followup discussions.
> >>>>
> >>>> - For windowing, this discussion has brought some results that we
> should
> >>>> sum up and clearly write down.
> >>>>   I would like to chime in to do that based on what I learned from the
> >>>> document and this discussion. I also got some input from Marton about
> >> what
> >>>> he learned from mapping the Cloud DataFlow constructs to Flink.
> >>>>   I'll draft a Wiki page (with the help of Aljoscha, Marton) that sums
> >>>> this up and documents it for users (if we decide to adopt this).
> >>>>   Then we run this by Gyula, Matthias Sax and Kostas for feedback.
> >>>>
> >>>> - API style discussions should be yet another thread. This will
> probably
> >>>> be opened as people start to address that.
> >>>>
> >>>>
> >>>> I'll try to get a draft of the wiki version out tomorrow noon and send
> >> the
> >>>> link around.
> >>>>
> >>>> Greetings,
> >>>> Stephan
> >>>>
> >>>>
> >>>>
> >>>> On Thu, Jun 25, 2015 at 3:51 PM, Matthias J. Sax <
> >>>> mj...@informatik.hu-berlin.de> wrote:
> >>>>
> >>>>> Sure. I picked this up. Using the current model for "occurrence time
> >>>>> semantics" does not work.
> >>>>>
> >>>>> I elaborated on this in the past many times (but nobody cared). It is
> >>>>> important to make it clear to the user what semantics are supported.
> >>>>> Claiming to support "sliding windows" doesn't mean anything; there
> are
> >>>>> too many different semantics out there. :)
> >>>>>
> >>>>>
> >>>>> On 06/25/2015 03:35 PM, Aljoscha Krettek wrote:
> >>>>>> Yes, I am aware of this requirement and it would also be supported
> in
> >>>> my
> >>>>>> proposed model.
> >>>>>>
> >>>>>> The problem is, that the "custom timestamp" feature gives the
> >>>> impression
> >>>>>> that the elements would be windowed according to a user-timestamp.
> The
> >>>>>> results, however, are wrong because of the assumption about elements
> >>>>>> arriving in order. (This is what I was trying to show with my fancy
> >>>> ASCII
> >>>>>> art and result output.
> >>>>>>
> >>>>>> On Thu, 25 Jun 2015 at 15:26 Matthias J. Sax <
> >>>>> mj...@informatik.hu-berlin.de>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi Aljoscha,
> >>>>>>>
> >>>>>>> I like that you are pushing in this direction. However, IMHO you
> >>>>>>> misinterpreter the current approach. It does not assume that tuples
> >>>>>>> arrive in-order; the current approach has no notion about a
> >>>>>>> pre-defined-order (for example, the order in which the event are
> >>>>>>> created). There is only the notion of "arrival-order" at the
> >> operator.
> >>>>>>> From this "arrival-order" perspective, the result are correct(!).
> >>>>>>>
> >>>>>>> Windowing in the current approach means for example, "sum up an
> >>>>>>> attribute of all events you *received* in the last 5 seconds". That
> >>>> is a
> >>>>>>> different meaning that "sum up an attribute of all event that
> >>>> *occurred*
> >>>>>>> in the last 5 seconds". Both queries are valid and Flink should
> >>>> support
> >>>>>>> both IMHO.
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On 06/25/2015 03:03 PM, Aljoscha Krettek wrote:
> >>>>>>>> Yes, now this also processes about 3 mio Elements (Window Size 5
> >> sec,
> >>>>>>> Slide
> >>>>>>>> 1 sec) but it still fluctuates a lot between 1 mio. and 5 mio.
> >>>>>>>>
> >>>>>>>> Performance is not my main concern, however. My concern is that
> the
> >>>>>>> current
> >>>>>>>> model assumes elements to arrive in order, which is simply not
> true.
> >>>>>>>>
> >>>>>>>> In your code you have these lines for specifying the window:
> >>>>>>>> .window(Time.of(1l, TimeUnit.SECONDS))
> >>>>>>>> .every(Time.of(1l, TimeUnit.SECONDS))
> >>>>>>>>
> >>>>>>>> Although this semantically specifies a tumbling window of size 1
> sec
> >>>>> I'm
> >>>>>>>> afraid it uses the sliding window logic internally (because of the
> >>>>>>>> .every()).
> >>>>>>>>
> >>>>>>>> In my tests I only have the first line.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <gga...@gmail.com>
> wrote:
> >>>>>>>>
> >>>>>>>>> I'm very sorry, I had a bug in the InversePreReducer. It should
> be
> >>>>>>>>> fixed now. Can you please run it again?
> >>>>>>>>>
> >>>>>>>>> I also tried to reproduce some of your performance numbers, but
> I'm
> >>>>>>>>> getting only less than 1/10th of yours. For example, in the
> >> Tumbling
> >>>>>>>>> case, Current/Reduce produces only ~100000 for me. Do you have
> any
> >>>>>>>>> idea what I could be doing wrong? My code:
> >>>>>>>>> http://pastebin.com/zbEjmGhk
> >>>>>>>>> I am running it on a 2 GHz Core i7.
> >>>>>>>>>
> >>>>>>>>> Best regards,
> >>>>>>>>> Gabor
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <aljos...@apache.org
> >:
> >>>>>>>>>> Hi,
> >>>>>>>>>> I also ran the tests on top of PR 856 (inverse reducer) now. The
> >>>>>>> results
> >>>>>>>>>> seem incorrect. When I insert a Thread.sleep(1) in the tuple
> >>>> source,
> >>>>>>> all
> >>>>>>>>>> the previous tests reported around 3600 tuples (Size 5 sec,
> Slide
> >> 1
> >>>>>>> sec)
> >>>>>>>>>> (Theoretically there would be 5000 tuples in 5 seconds but this
> is
> >>>>> due
> >>>>>>> to
> >>>>>>>>>> overhead). These are the results for the inverse reduce
> >>>> optimisation:
> >>>>>>>>>> (Tuple 0,38)
> >>>>>>>>>> (Tuple 0,829)
> >>>>>>>>>> (Tuple 0,1625)
> >>>>>>>>>> (Tuple 0,2424)
> >>>>>>>>>> (Tuple 0,3190)
> >>>>>>>>>> (Tuple 0,3198)
> >>>>>>>>>> (Tuple 0,-339368)
> >>>>>>>>>> (Tuple 0,-1315725)
> >>>>>>>>>> (Tuple 0,-2932932)
> >>>>>>>>>> (Tuple 0,-5082735)
> >>>>>>>>>> (Tuple 0,-7743256)
> >>>>>>>>>> (Tuple 0,75701046)
> >>>>>>>>>> (Tuple 0,642829470)
> >>>>>>>>>> (Tuple 0,2242018381)
> >>>>>>>>>> (Tuple 0,5190708618)
> >>>>>>>>>> (Tuple 0,10060360311)
> >>>>>>>>>> (Tuple 0,-94254951)
> >>>>>>>>>> (Tuple 0,-219806321293)
> >>>>>>>>>> (Tuple 0,-1258895232699)
> >>>>>>>>>> (Tuple 0,-4074432596329)
> >>>>>>>>>>
> >>>>>>>>>> One line is one emitted window count. This is what happens when
> I
> >>>>>>> remove
> >>>>>>>>>> the Thread.sleep(1):
> >>>>>>>>>> (Tuple 0,660676)
> >>>>>>>>>> (Tuple 0,2553733)
> >>>>>>>>>> (Tuple 0,3542696)
> >>>>>>>>>> (Tuple 0,1)
> >>>>>>>>>> (Tuple 0,1107035)
> >>>>>>>>>> (Tuple 0,2549491)
> >>>>>>>>>> (Tuple 0,4100387)
> >>>>>>>>>> (Tuple 0,-8406583360092)
> >>>>>>>>>> (Tuple 0,-8406582150743)
> >>>>>>>>>> (Tuple 0,-8406580427190)
> >>>>>>>>>> (Tuple 0,-8406580427190)
> >>>>>>>>>> (Tuple 0,-8406580427190)
> >>>>>>>>>> (Tuple 0,6847279255682044995)
> >>>>>>>>>> (Tuple 0,6847279255682044995)
> >>>>>>>>>> (Tuple 0,-5390528042713628318)
> >>>>>>>>>> (Tuple 0,-5390528042711551780)
> >>>>>>>>>> (Tuple 0,-5390528042711551780)
> >>>>>>>>>>
> >>>>>>>>>> So at some point the pre-reducer seems to go haywire and does
> not
> >>>>>>> recover
> >>>>>>>>>> from it. The good thing is that it does produce results now,
> where
> >>>>> the
> >>>>>>>>>> previous Current/Reduce would simply hang and not produce any
> >>>> output.
> >>>>>>>>>>
> >>>>>>>>>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <gga...@gmail.com>
> >> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hello,
> >>>>>>>>>>>
> >>>>>>>>>>> Aljoscha, can you please try the performance test of
> >>>> Current/Reduce
> >>>>>>>>>>> with the InversePreReducer in PR 856? (If you just call sum, it
> >>>> will
> >>>>>>>>>>> use an InversePreReducer.) It would be an interesting test,
> >>>> because
> >>>>>>>>>>> the inverse function optimization really depends on the stream
> >>>> being
> >>>>>>>>>>> ordered, and I think it has the potential of being faster then
> >>>>>>>>>>> Next/Reduce. Especially if the window size is much larger than
> >> the
> >>>>>>>>>>> slide size.
> >>>>>>>>>>>
> >>>>>>>>>>> Best regards,
> >>>>>>>>>>> Gabor
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <
> aljos...@apache.org
> >>>>> :
> >>>>>>>>>>>> I think I'll have to elaborate a bit so I created a
> >>>>> proof-of-concept
> >>>>>>>>>>>> implementation of my Ideas and ran some throughput
> measurements
> >>>> to
> >>>>>>>>>>>> alleviate concerns about performance.
> >>>>>>>>>>>>
> >>>>>>>>>>>> First, though, I want to highlight again why the current
> >> approach
> >>>>>>> does
> >>>>>>>>>>> not
> >>>>>>>>>>>> work with out-of-order elements (which, again, occur
> constantly
> >>>> due
> >>>>>>> to
> >>>>>>>>>>> the
> >>>>>>>>>>>> distributed nature of the system). This is the example I
> posted
> >>>>>>>>> earlier:
> >>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27. The
> plan
> >>>>>>> looks
> >>>>>>>>>>> like
> >>>>>>>>>>>> this:
> >>>>>>>>>>>>
> >>>>>>>>>>>> +--+
> >>>>>>>>>>>> | | Source
> >>>>>>>>>>>> +--+
> >>>>>>>>>>>> |
> >>>>>>>>>>>> +-----+
> >>>>>>>>>>>> | |
> >>>>>>>>>>>> | +--+
> >>>>>>>>>>>> | | | Identity Map
> >>>>>>>>>>>> | +--+
> >>>>>>>>>>>> | |
> >>>>>>>>>>>> +-----+
> >>>>>>>>>>>> |
> >>>>>>>>>>>> +--+
> >>>>>>>>>>>> | | Window
> >>>>>>>>>>>> +--+
> >>>>>>>>>>>> |
> >>>>>>>>>>>> |
> >>>>>>>>>>>> +--+
> >>>>>>>>>>>> | | Sink
> >>>>>>>>>>>> +--+
> >>>>>>>>>>>>
> >>>>>>>>>>>> So all it does is pass the elements through an identity map
> and
> >>>>> then
> >>>>>>>>>>> merge
> >>>>>>>>>>>> them again before the window operator. The source emits
> >> ascending
> >>>>>>>>>>> integers
> >>>>>>>>>>>> and the window operator has a custom timestamp extractor that
> >>>> uses
> >>>>>>> the
> >>>>>>>>>>>> integer itself as the timestamp and should create windows of
> >>>> size 4
> >>>>>>>>> (that
> >>>>>>>>>>>> is elements with timestamp 0-3 are one window, the next are
> the
> >>>>>>>>> elements
> >>>>>>>>>>>> with timestamp 4-8, and so on). Since the topology basically
> >>>>> doubles
> >>>>>>>>> the
> >>>>>>>>>>>> elements form the source I would expect to get these windows:
> >>>>>>>>>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3
> >>>>>>>>>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8
> >>>>>>>>>>>>
> >>>>>>>>>>>> The output is this, however:
> >>>>>>>>>>>> Window: 0, 1, 2, 3,
> >>>>>>>>>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7,
> >>>>>>>>>>>> Window: 8, 9, 10, 11,
> >>>>>>>>>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
> >>>>>>>>>>>> Window: 16, 17, 18, 19,
> >>>>>>>>>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
> >>>>>>>>>>>> Window: 24, 25, 26, 27,
> >>>>>>>>>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
> >>>>>>>>>>>>
> >>>>>>>>>>>> The reason is that the elements simply arrive out-of-order.
> >>>> Imagine
> >>>>>>>>> what
> >>>>>>>>>>>> would happen if the elements actually arrived with some delay
> >>>> from
> >>>>>>>>>>>> different operations.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Now, on to the performance numbers. The proof-of-concept I
> >>>> created
> >>>>> is
> >>>>>>>>>>>> available here:
> >>>>>>>>>>>>
> >> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock
> >>>> .
> >>>>>>> The
> >>>>>>>>>>> basic
> >>>>>>>>>>>> idea is that sources assign the current timestamp when
> emitting
> >>>>>>>>> elements.
> >>>>>>>>>>>> They also periodically emit watermarks that tell us that no
> >>>>> elements
> >>>>>>>>> with
> >>>>>>>>>>>> an earlier timestamp will be emitted. The watermarks propagate
> >>>>>>> through
> >>>>>>>>>>> the
> >>>>>>>>>>>> operators. The window operator looks at the timestamp of an
> >>>> element
> >>>>>>>>> and
> >>>>>>>>>>>> puts it into the buffer that corresponds to that window. When
> >> the
> >>>>>>>>> window
> >>>>>>>>>>>> operator receives a watermark it will look at the in-flight
> >>>> windows
> >>>>>>>>>>>> (basically the buffers) and emit those windows where the
> >>>> window-end
> >>>>>>> is
> >>>>>>>>>>>> before the watermark.
> >>>>>>>>>>>>
> >>>>>>>>>>>> For measuring throughput I did the following: The source emits
> >>>>> tuples
> >>>>>>>>> of
> >>>>>>>>>>>> the form ("tuple", 1) in an infinite loop. The window operator
> >>>> sums
> >>>>>>> up
> >>>>>>>>>>> the
> >>>>>>>>>>>> tuples, thereby counting how many tuples the window operator
> can
> >>>>>>>>> handle
> >>>>>>>>>>> in
> >>>>>>>>>>>> a given time window. There are two different implementations
> for
> >>>>> the
> >>>>>>>>>>>> summation: 1) simply summing up the values in a mapWindow(),
> >>>> there
> >>>>>>> you
> >>>>>>>>>>> get
> >>>>>>>>>>>> a List of all tuples and simple iterate over it. 2) using
> >> sum(1),
> >>>>>>>>> which
> >>>>>>>>>>> is
> >>>>>>>>>>>> implemented as a reduce() (that uses the pre-reducer
> >>>>> optimisations).
> >>>>>>>>>>>>
> >>>>>>>>>>>> These are the performance numbers (Current is the current
> >>>>>>>>> implementation,
> >>>>>>>>>>>> Next is my proof-of-concept):
> >>>>>>>>>>>>
> >>>>>>>>>>>> Tumbling (1 sec):
> >>>>>>>>>>>> - Current/Map: 1.6 mio
> >>>>>>>>>>>> - Current/Reduce: 2 mio
> >>>>>>>>>>>> - Next/Map: 2.2 mio
> >>>>>>>>>>>> - Next/Reduce: 4 mio
> >>>>>>>>>>>>
> >>>>>>>>>>>> Sliding (5 sec, slide 1 sec):
> >>>>>>>>>>>> - Current/Map: ca 3 mio (fluctuates a lot)
> >>>>>>>>>>>> - Current/Reduce: No output
> >>>>>>>>>>>> - Next/Map: ca 4 mio (fluctuates)
> >>>>>>>>>>>> - Next/Reduce: 10 mio
> >>>>>>>>>>>>
> >>>>>>>>>>>> The Next/Reduce variant can basically scale indefinitely with
> >>>>> window
> >>>>>>>>> size
> >>>>>>>>>>>> because the internal state does not rely on the number of
> >>>> elements
> >>>>>>>>> (it is
> >>>>>>>>>>>> just the current sum). The pre-reducer for sliding elements
> >>>> cannot
> >>>>>>>>> handle
> >>>>>>>>>>>> the amount of tuples, it produces no output. For the two Map
> >>>>> variants
> >>>>>>>>> the
> >>>>>>>>>>>> performance fluctuates because they always keep all the
> elements
> >>>> in
> >>>>>>> an
> >>>>>>>>>>>> internal buffer before emission, this seems to tax the garbage
> >>>>>>>>> collector
> >>>>>>>>>>> a
> >>>>>>>>>>>> bit and leads to random pauses.
> >>>>>>>>>>>>
> >>>>>>>>>>>> One thing that should be noted is that I had to disable the
> >>>>>>>>> fake-element
> >>>>>>>>>>>> emission thread, otherwise the Current versions would
> deadlock.
> >>>>>>>>>>>>
> >>>>>>>>>>>> So, I started working on this because I thought that
> >> out-of-order
> >>>>>>>>>>>> processing would be necessary for correctness. And it is
> >>>> certainly,
> >>>>>>>>> But
> >>>>>>>>>>> the
> >>>>>>>>>>>> proof-of-concept also shows that performance can be greatly
> >>>>> improved.
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <gyula.f...@gmail.com
> >
> >>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I agree lets separate these topics from each other so we can
> >> get
> >>>>>>>>> faster
> >>>>>>>>>>>>> resolution.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> There is already a state discussion in the thread we started
> >>>> with
> >>>>>>>>> Paris.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <
> >>>>> ktzou...@apache.org
> >>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> I agree with supporting out-of-order out of the box :-),
> even
> >>>> if
> >>>>>>>>> this
> >>>>>>>>>>>> means
> >>>>>>>>>>>>>> a major refactoring. This is the right time to refactor the
> >>>>>>>>> streaming
> >>>>>>>>>>>> API
> >>>>>>>>>>>>>> before we pull it out of beta. I think that this is more
> >>>>> important
> >>>>>>>>>>> than
> >>>>>>>>>>>> new
> >>>>>>>>>>>>>> features in the streaming API, which can be prioritized once
> >>>> the
> >>>>>>>>> API
> >>>>>>>>>>> is
> >>>>>>>>>>>> out
> >>>>>>>>>>>>>> of beta (meaning, that IMO this is the right time to stall
> PRs
> >>>>>>>>> until
> >>>>>>>>>>> we
> >>>>>>>>>>>>>> agree on the design).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> There are three sections in the document: windowing, state,
> >> and
> >>>>>>>>> API.
> >>>>>>>>>>> How
> >>>>>>>>>>>>>> convoluted are those with each other? Can we separate the
> >>>>>>>>> discussion
> >>>>>>>>>>> or
> >>>>>>>>>>>> do
> >>>>>>>>>>>>>> we need to discuss those all together? I think part of the
> >>>>>>>>> difficulty
> >>>>>>>>>>> is
> >>>>>>>>>>>>>> that we are discussing three design choices at once.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <
> >>>>>>>>> ted.dunn...@gmail.com>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Out of order is ubiquitous in the real-world.  Typically,
> >> what
> >>>>>>>>>>>> happens is
> >>>>>>>>>>>>>>> that businesses will declare a maximum allowable delay for
> >>>>>>>>> delayed
> >>>>>>>>>>>>>>> transactions and will commit to results when that delay is
> >>>>>>>>> reached.
> >>>>>>>>>>>>>>> Transactions that arrive later than this cutoff are
> collected
> >>>>>>>>>>>> specially
> >>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>> corrections which are reported/used when possible.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Clearly, ordering can also be violated during processing,
> but
> >>>> if
> >>>>>>>>> the
> >>>>>>>>>>>> data
> >>>>>>>>>>>>>>> is originally out of order the situation can't be repaired
> by
> >>>>> any
> >>>>>>>>>>>>>> protocol
> >>>>>>>>>>>>>>> fixes that prevent transactions from becoming disordered
> but
> >>>> has
> >>>>>>>>> to
> >>>>>>>>>>>>>> handled
> >>>>>>>>>>>>>>> at the data level.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <
> >>>>>>>>>>> aljos...@apache.org
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I also don't like big changes but sometimes they are
> >>>> necessary.
> >>>>>>>>>>> The
> >>>>>>>>>>>>>>> reason
> >>>>>>>>>>>>>>>> why I'm so adamant about out-of-order processing is that
> >>>>>>>>>>>> out-of-order
> >>>>>>>>>>>>>>>> elements are not some exception that occurs once in a
> while;
> >>>>>>>>> they
> >>>>>>>>>>>> occur
> >>>>>>>>>>>>>>>> constantly in a distributed system. For example, in this:
> >>>>>>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 the
> >>>>>>>>>>> resulting
> >>>>>>>>>>>>>>>> windows
> >>>>>>>>>>>>>>>> are completely bogus because the current windowing system
> >>>>>>>>> assumes
> >>>>>>>>>>>>>>> elements
> >>>>>>>>>>>>>>>> to globally arrive in order, which is simply not true.
> (The
> >>>>>>>>>>> example
> >>>>>>>>>>>>>> has a
> >>>>>>>>>>>>>>>> source that generates increasing integers. Then these pass
> >>>>>>>>>>> through a
> >>>>>>>>>>>>>> map
> >>>>>>>>>>>>>>>> and are unioned with the original DataStream before a
> window
> >>>>>>>>>>>> operator.)
> >>>>>>>>>>>>>>>> This simulates elements arriving from different operators
> at
> >>>> a
> >>>>>>>>>>>>>> windowing
> >>>>>>>>>>>>>>>> operator. The example is also DOP=1, I imagine this to get
> >>>>>>>>> worse
> >>>>>>>>>>>> with
> >>>>>>>>>>>>>>>> higher DOP.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> What do you mean by costly? As I said, I have a
> >>>>>>>>> proof-of-concept
> >>>>>>>>>>>>>>> windowing
> >>>>>>>>>>>>>>>> operator that can handle out-or-order elements. This is an
> >>>>>>>>> example
> >>>>>>>>>>>>>> using
> >>>>>>>>>>>>>>>> the current Flink API:
> >>>>>>>>>>>>>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
> >>>>>>>>>>>>>>>> (It is an infinite source of tuples and a 5 second window
> >>>>>>>>> operator
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>>> counts the tuples.) The first problem is that this code
> >>>>>>>>> deadlocks
> >>>>>>>>>>>>>> because
> >>>>>>>>>>>>>>>> of the thread that emits fake elements. If I disable the
> >> fake
> >>>>>>>>>>>> element
> >>>>>>>>>>>>>>> code
> >>>>>>>>>>>>>>>> it works, but the throughput using my mockup is 4 times
> >>>> higher
> >>>>>>>>> .
> >>>>>>>>>>> The
> >>>>>>>>>>>>>> gap
> >>>>>>>>>>>>>>>> widens dramatically if the window size increases.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> So, it actually increases performance (unless I'm making a
> >>>>>>>>> mistake
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>> my
> >>>>>>>>>>>>>>>> explorations) and can handle elements that arrive
> >>>> out-of-order
> >>>>>>>>>>>> (which
> >>>>>>>>>>>>>>>> happens basically always in a real-world windowing
> >>>> use-cases).
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <
> se...@apache.org
> >>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> What I like a lot about Aljoscha's proposed design is
> that
> >>>> we
> >>>>>>>>>>>> need no
> >>>>>>>>>>>>>>>>> different code for "system time" vs. "event time". It
> only
> >>>>>>>>>>>> differs in
> >>>>>>>>>>>>>>>> where
> >>>>>>>>>>>>>>>>> the timestamps are assigned.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> The OOP approach also gives you the semantics of total
> >>>>>>>>> ordering
> >>>>>>>>>>>>>> without
> >>>>>>>>>>>>>>>>> imposing merges on the streams.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
> >>>>>>>>>>>>>>>>> mj...@informatik.hu-berlin.de> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I agree that there should be multiple alternatives the
> >>>>>>>>> user(!)
> >>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>> choose from. Partial out-of-order processing works for
> >>>>>>>>>>> many/most
> >>>>>>>>>>>>>>>>>> aggregates. However, if you consider
> >>>>>>>>> Event-Pattern-Matching,
> >>>>>>>>>>>> global
> >>>>>>>>>>>>>>>>>> ordering in necessary (even if the performance penalty
> >>>>>>>>> might
> >>>>>>>>>>> be
> >>>>>>>>>>>>>>> high).
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I would also keep "system-time windows" as an
> alternative
> >>>>>>>>> to
> >>>>>>>>>>>>>> "source
> >>>>>>>>>>>>>>>>>> assigned ts-windows".
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> It might also be interesting to consider the following
> >>>>>>>>> paper
> >>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>> overlapping windows: "Resource sharing in continuous
> >>>>>>>>>>>> sliding-window
> >>>>>>>>>>>>>>>>>> aggregates"
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote:
> >>>>>>>>>>>>>>>>>>> Hey
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I think we should not block PRs unnecessarily if your
> >>>>>>>>>>>> suggested
> >>>>>>>>>>>>>>>> changes
> >>>>>>>>>>>>>>>>>>> might touch them at some point.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Also I still think we should not put everything in the
> >>>>>>>>>>>> Datastream
> >>>>>>>>>>>>>>>>> because
> >>>>>>>>>>>>>>>>>>> it will be a huge mess.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Also we need to agree on the out of order processing,
> >>>>>>>>>>> whether
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>>> want
> >>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>> the way you proposed it(which is quite costly). Another
> >>>>>>>>>>>>>> alternative
> >>>>>>>>>>>>>>>>>>> approach there which fits in the current windowing is
> to
> >>>>>>>>>>>> filter
> >>>>>>>>>>>>>> out
> >>>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>> order events and apply a special handling operator on
> >>>>>>>>> them.
> >>>>>>>>>>>> This
> >>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>> fairly lightweight.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> My point is that we need to consider some alternative
> >>>>>>>>>>>> solutions.
> >>>>>>>>>>>>>>> And
> >>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>> should not block contributions along the way.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Cheers
> >>>>>>>>>>>>>>>>>>> Gyula
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <
> >>>>>>>>>>>>>>>> aljos...@apache.org>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> The reason I posted this now is that we need to think
> >>>>>>>>> about
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>> API
> >>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>> windowing before proceeding with the PRs of Gabor
> >>>>>>>>> (inverse
> >>>>>>>>>>>>>> reduce)
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>> Gyula (removal of "aggregate" functions on
> DataStream).
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> For the windowing, I think that the current model does
> >>>>>>>>> not
> >>>>>>>>>>>> work
> >>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> out-of-order processing. Therefore, the whole
> windowing
> >>>>>>>>>>>>>>>> infrastructure
> >>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>> basically have to be redone. Meaning also that any
> work
> >>>>>>>>> on
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> pre-aggregators or optimizations that we do now
> becomes
> >>>>>>>>>>>> useless.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> For the API, I proposed to restructure the
> interactions
> >>>>>>>>>>>> between
> >>>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> different *DataStream classes and grouping/windowing.
> >>>>>>>>> (See
> >>>>>>>>>>>> API
> >>>>>>>>>>>>>>>> section
> >>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>> the doc I posted.)
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <
> >>>>>>>>>>> gyula.f...@gmail.com
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hi Aljoscha,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks for the nice summary, this is a very good
> >>>>>>>>>>> initiative.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I added some comments to the respective sections
> >>>>>>>>> (where I
> >>>>>>>>>>>> didnt
> >>>>>>>>>>>>>>>> fully
> >>>>>>>>>>>>>>>>>>>> agree
> >>>>>>>>>>>>>>>>>>>>> :).).
> >>>>>>>>>>>>>>>>>>>>> At some point I think it would be good to have a
> public
> >>>>>>>>>>>> hangout
> >>>>>>>>>>>>>>>>> session
> >>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>> this, which could make a more dynamic discussion.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>>> Gyula
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> ezt írta
> >>>>>>>>> (időpont:
> >>>>>>>>>>>>>> 2015.
> >>>>>>>>>>>>>>>> jún.
> >>>>>>>>>>>>>>>>>>>> 22.,
> >>>>>>>>>>>>>>>>>>>>> H, 21:34):
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>>>>> with people proposing changes to the streaming part
> I
> >>>>>>>>>>> also
> >>>>>>>>>>>>>>> wanted
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> throw
> >>>>>>>>>>>>>>>>>>>>>> my hat into the ring. :D
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> During the last few months, while I was getting
> >>>>>>>>>>> acquainted
> >>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> streaming system, I wrote down some thoughts I had
> >>>>>>>>> about
> >>>>>>>>>>>> how
> >>>>>>>>>>>>>>>> things
> >>>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>>> be improved. Hopefully, they are in somewhat
> coherent
> >>>>>>>>>>> shape
> >>>>>>>>>>>>>> now,
> >>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>>>>> please
> >>>>>>>>>>>>>>>>>>>>>> have a look if you are interested in this:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> This mostly covers:
> >>>>>>>>>>>>>>>>>>>>>> - Timestamps assigned at sources
> >>>>>>>>>>>>>>>>>>>>>> - Out-of-order processing of elements in window
> >>>>>>>>>>> operators
> >>>>>>>>>>>>>>>>>>>>>> - API design
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Please let me know what you think. Comment in the
> >>>>>>>>>>> document
> >>>>>>>>>>>> or
> >>>>>>>>>>>>>>> here
> >>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> mailing list.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I have a PR in the makings that would introduce
> source
> >>>>>>>>>>>>>>> timestamps
> >>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>> watermarks for keeping track of them. I also hacked
> a
> >>>>>>>>>>>>>>>>> proof-of-concept
> >>>>>>>>>>>>>>>>>>>>> of a
> >>>>>>>>>>>>>>>>>>>>>> windowing system that is able to process
> out-of-order
> >>>>>>>>>>>> elements
> >>>>>>>>>>>>>>>>> using a
> >>>>>>>>>>>>>>>>>>>>>> FlatMap operator. (It uses panes to perform
> efficient
> >>>>>>>>>>>>>>>>>>>> pre-aggregations.)
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>>>> Aljoscha
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>
> >>
> >
>
>

Reply via email to