I like the bit about the API a lot. What I don't see yet is how delta
window can work in a distributed way with out-of-order elements.

On Fri, 26 Jun 2015 at 19:43 Stephan Ewen <se...@apache.org> wrote:

> Here is a first bit of what I have been writing down. Will add more over
> the next days:
>
>
> https://cwiki.apache.org/confluence/display/FLINK/Stream+Windows
>
>
> https://cwiki.apache.org/confluence/display/FLINK/Parallel+Streams%2C+Partitions%2C+Time%2C+and+Ordering
>
>
>
> On Thu, Jun 25, 2015 at 6:35 PM, Paris Carbone <par...@kth.se> wrote:
>
> > +1 for writing this down
> >
> > > On 25 Jun 2015, at 18:11, Aljoscha Krettek <aljos...@apache.org>
> wrote:
> > >
> > > +1 go ahead
> > >
> > > On Thu, 25 Jun 2015 at 18:02 Stephan Ewen <se...@apache.org> wrote:
> > >
> > >> Hey!
> > >>
> > >> This thread covers many different topics. Lets break this up into
> > separate
> > >> discussions.
> > >>
> > >> - Operator State is already driven by Gyula and Paris and happening on
> > the
> > >> above mentioned pull request and the followup discussions.
> > >>
> > >> - For windowing, this discussion has brought some results that we
> should
> > >> sum up and clearly write down.
> > >>   I would like to chime in to do that based on what I learned from the
> > >> document and this discussion. I also got some input from Marton about
> > what
> > >> he learned from mapping the Cloud DataFlow constructs to Flink.
> > >>   I'll draft a Wiki page (with the help of Aljoscha, Marton) that sums
> > >> this up and documents it for users (if we decide to adopt this).
> > >>   Then we run this by Gyula, Matthias Sax and Kostas for feedback.
> > >>
> > >> - API style discussions should be yet another thread. This will
> probably
> > >> be opened as people start to address that.
> > >>
> > >>
> > >> I'll try to get a draft of the wiki version out tomorrow noon and send
> > the
> > >> link around.
> > >>
> > >> Greetings,
> > >> Stephan
> > >>
> > >>
> > >>
> > >> On Thu, Jun 25, 2015 at 3:51 PM, Matthias J. Sax <
> > >> mj...@informatik.hu-berlin.de> wrote:
> > >>
> > >>> Sure. I picked this up. Using the current model for "occurrence time
> > >>> semantics" does not work.
> > >>>
> > >>> I elaborated on this in the past many times (but nobody cared). It is
> > >>> important to make it clear to the user what semantics are supported.
> > >>> Claiming to support "sliding windows" doesn't mean anything; there
> are
> > >>> too many different semantics out there. :)
> > >>>
> > >>>
> > >>> On 06/25/2015 03:35 PM, Aljoscha Krettek wrote:
> > >>>> Yes, I am aware of this requirement and it would also be supported
> in
> > >> my
> > >>>> proposed model.
> > >>>>
> > >>>> The problem is, that the "custom timestamp" feature gives the
> > >> impression
> > >>>> that the elements would be windowed according to a user-timestamp.
> The
> > >>>> results, however, are wrong because of the assumption about elements
> > >>>> arriving in order. (This is what I was trying to show with my fancy
> > >> ASCII
> > >>>> art and result output.
> > >>>>
> > >>>> On Thu, 25 Jun 2015 at 15:26 Matthias J. Sax <
> > >>> mj...@informatik.hu-berlin.de>
> > >>>> wrote:
> > >>>>
> > >>>>> Hi Aljoscha,
> > >>>>>
> > >>>>> I like that you are pushing in this direction. However, IMHO you
> > >>>>> misinterpreter the current approach. It does not assume that tuples
> > >>>>> arrive in-order; the current approach has no notion about a
> > >>>>> pre-defined-order (for example, the order in which the event are
> > >>>>> created). There is only the notion of "arrival-order" at the
> > operator.
> > >>>>> From this "arrival-order" perspective, the result are correct(!).
> > >>>>>
> > >>>>> Windowing in the current approach means for example, "sum up an
> > >>>>> attribute of all events you *received* in the last 5 seconds". That
> > >> is a
> > >>>>> different meaning that "sum up an attribute of all event that
> > >> *occurred*
> > >>>>> in the last 5 seconds". Both queries are valid and Flink should
> > >> support
> > >>>>> both IMHO.
> > >>>>>
> > >>>>>
> > >>>>> -Matthias
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> On 06/25/2015 03:03 PM, Aljoscha Krettek wrote:
> > >>>>>> Yes, now this also processes about 3 mio Elements (Window Size 5
> > sec,
> > >>>>> Slide
> > >>>>>> 1 sec) but it still fluctuates a lot between 1 mio. and 5 mio.
> > >>>>>>
> > >>>>>> Performance is not my main concern, however. My concern is that
> the
> > >>>>> current
> > >>>>>> model assumes elements to arrive in order, which is simply not
> true.
> > >>>>>>
> > >>>>>> In your code you have these lines for specifying the window:
> > >>>>>> .window(Time.of(1l, TimeUnit.SECONDS))
> > >>>>>> .every(Time.of(1l, TimeUnit.SECONDS))
> > >>>>>>
> > >>>>>> Although this semantically specifies a tumbling window of size 1
> sec
> > >>> I'm
> > >>>>>> afraid it uses the sliding window logic internally (because of the
> > >>>>>> .every()).
> > >>>>>>
> > >>>>>> In my tests I only have the first line.
> > >>>>>>
> > >>>>>>
> > >>>>>> On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <gga...@gmail.com>
> wrote:
> > >>>>>>
> > >>>>>>> I'm very sorry, I had a bug in the InversePreReducer. It should
> be
> > >>>>>>> fixed now. Can you please run it again?
> > >>>>>>>
> > >>>>>>> I also tried to reproduce some of your performance numbers, but
> I'm
> > >>>>>>> getting only less than 1/10th of yours. For example, in the
> > Tumbling
> > >>>>>>> case, Current/Reduce produces only ~100000 for me. Do you have
> any
> > >>>>>>> idea what I could be doing wrong? My code:
> > >>>>>>> http://pastebin.com/zbEjmGhk
> > >>>>>>> I am running it on a 2 GHz Core i7.
> > >>>>>>>
> > >>>>>>> Best regards,
> > >>>>>>> Gabor
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <aljos...@apache.org
> >:
> > >>>>>>>> Hi,
> > >>>>>>>> I also ran the tests on top of PR 856 (inverse reducer) now. The
> > >>>>> results
> > >>>>>>>> seem incorrect. When I insert a Thread.sleep(1) in the tuple
> > >> source,
> > >>>>> all
> > >>>>>>>> the previous tests reported around 3600 tuples (Size 5 sec,
> Slide
> > 1
> > >>>>> sec)
> > >>>>>>>> (Theoretically there would be 5000 tuples in 5 seconds but this
> is
> > >>> due
> > >>>>> to
> > >>>>>>>> overhead). These are the results for the inverse reduce
> > >> optimisation:
> > >>>>>>>> (Tuple 0,38)
> > >>>>>>>> (Tuple 0,829)
> > >>>>>>>> (Tuple 0,1625)
> > >>>>>>>> (Tuple 0,2424)
> > >>>>>>>> (Tuple 0,3190)
> > >>>>>>>> (Tuple 0,3198)
> > >>>>>>>> (Tuple 0,-339368)
> > >>>>>>>> (Tuple 0,-1315725)
> > >>>>>>>> (Tuple 0,-2932932)
> > >>>>>>>> (Tuple 0,-5082735)
> > >>>>>>>> (Tuple 0,-7743256)
> > >>>>>>>> (Tuple 0,75701046)
> > >>>>>>>> (Tuple 0,642829470)
> > >>>>>>>> (Tuple 0,2242018381)
> > >>>>>>>> (Tuple 0,5190708618)
> > >>>>>>>> (Tuple 0,10060360311)
> > >>>>>>>> (Tuple 0,-94254951)
> > >>>>>>>> (Tuple 0,-219806321293)
> > >>>>>>>> (Tuple 0,-1258895232699)
> > >>>>>>>> (Tuple 0,-4074432596329)
> > >>>>>>>>
> > >>>>>>>> One line is one emitted window count. This is what happens when
> I
> > >>>>> remove
> > >>>>>>>> the Thread.sleep(1):
> > >>>>>>>> (Tuple 0,660676)
> > >>>>>>>> (Tuple 0,2553733)
> > >>>>>>>> (Tuple 0,3542696)
> > >>>>>>>> (Tuple 0,1)
> > >>>>>>>> (Tuple 0,1107035)
> > >>>>>>>> (Tuple 0,2549491)
> > >>>>>>>> (Tuple 0,4100387)
> > >>>>>>>> (Tuple 0,-8406583360092)
> > >>>>>>>> (Tuple 0,-8406582150743)
> > >>>>>>>> (Tuple 0,-8406580427190)
> > >>>>>>>> (Tuple 0,-8406580427190)
> > >>>>>>>> (Tuple 0,-8406580427190)
> > >>>>>>>> (Tuple 0,6847279255682044995)
> > >>>>>>>> (Tuple 0,6847279255682044995)
> > >>>>>>>> (Tuple 0,-5390528042713628318)
> > >>>>>>>> (Tuple 0,-5390528042711551780)
> > >>>>>>>> (Tuple 0,-5390528042711551780)
> > >>>>>>>>
> > >>>>>>>> So at some point the pre-reducer seems to go haywire and does
> not
> > >>>>> recover
> > >>>>>>>> from it. The good thing is that it does produce results now,
> where
> > >>> the
> > >>>>>>>> previous Current/Reduce would simply hang and not produce any
> > >> output.
> > >>>>>>>>
> > >>>>>>>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <gga...@gmail.com>
> > wrote:
> > >>>>>>>>
> > >>>>>>>>> Hello,
> > >>>>>>>>>
> > >>>>>>>>> Aljoscha, can you please try the performance test of
> > >> Current/Reduce
> > >>>>>>>>> with the InversePreReducer in PR 856? (If you just call sum, it
> > >> will
> > >>>>>>>>> use an InversePreReducer.) It would be an interesting test,
> > >> because
> > >>>>>>>>> the inverse function optimization really depends on the stream
> > >> being
> > >>>>>>>>> ordered, and I think it has the potential of being faster then
> > >>>>>>>>> Next/Reduce. Especially if the window size is much larger than
> > the
> > >>>>>>>>> slide size.
> > >>>>>>>>>
> > >>>>>>>>> Best regards,
> > >>>>>>>>> Gabor
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <
> aljos...@apache.org
> > >>> :
> > >>>>>>>>>> I think I'll have to elaborate a bit so I created a
> > >>> proof-of-concept
> > >>>>>>>>>> implementation of my Ideas and ran some throughput
> measurements
> > >> to
> > >>>>>>>>>> alleviate concerns about performance.
> > >>>>>>>>>>
> > >>>>>>>>>> First, though, I want to highlight again why the current
> > approach
> > >>>>> does
> > >>>>>>>>> not
> > >>>>>>>>>> work with out-of-order elements (which, again, occur
> constantly
> > >> due
> > >>>>> to
> > >>>>>>>>> the
> > >>>>>>>>>> distributed nature of the system). This is the example I
> posted
> > >>>>>>> earlier:
> > >>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27. The
> plan
> > >>>>> looks
> > >>>>>>>>> like
> > >>>>>>>>>> this:
> > >>>>>>>>>>
> > >>>>>>>>>> +--+
> > >>>>>>>>>> | | Source
> > >>>>>>>>>> +--+
> > >>>>>>>>>> |
> > >>>>>>>>>> +-----+
> > >>>>>>>>>> | |
> > >>>>>>>>>> | +--+
> > >>>>>>>>>> | | | Identity Map
> > >>>>>>>>>> | +--+
> > >>>>>>>>>> | |
> > >>>>>>>>>> +-----+
> > >>>>>>>>>> |
> > >>>>>>>>>> +--+
> > >>>>>>>>>> | | Window
> > >>>>>>>>>> +--+
> > >>>>>>>>>> |
> > >>>>>>>>>> |
> > >>>>>>>>>> +--+
> > >>>>>>>>>> | | Sink
> > >>>>>>>>>> +--+
> > >>>>>>>>>>
> > >>>>>>>>>> So all it does is pass the elements through an identity map
> and
> > >>> then
> > >>>>>>>>> merge
> > >>>>>>>>>> them again before the window operator. The source emits
> > ascending
> > >>>>>>>>> integers
> > >>>>>>>>>> and the window operator has a custom timestamp extractor that
> > >> uses
> > >>>>> the
> > >>>>>>>>>> integer itself as the timestamp and should create windows of
> > >> size 4
> > >>>>>>> (that
> > >>>>>>>>>> is elements with timestamp 0-3 are one window, the next are
> the
> > >>>>>>> elements
> > >>>>>>>>>> with timestamp 4-8, and so on). Since the topology basically
> > >>> doubles
> > >>>>>>> the
> > >>>>>>>>>> elements form the source I would expect to get these windows:
> > >>>>>>>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3
> > >>>>>>>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8
> > >>>>>>>>>>
> > >>>>>>>>>> The output is this, however:
> > >>>>>>>>>> Window: 0, 1, 2, 3,
> > >>>>>>>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7,
> > >>>>>>>>>> Window: 8, 9, 10, 11,
> > >>>>>>>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
> > >>>>>>>>>> Window: 16, 17, 18, 19,
> > >>>>>>>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
> > >>>>>>>>>> Window: 24, 25, 26, 27,
> > >>>>>>>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
> > >>>>>>>>>>
> > >>>>>>>>>> The reason is that the elements simply arrive out-of-order.
> > >> Imagine
> > >>>>>>> what
> > >>>>>>>>>> would happen if the elements actually arrived with some delay
> > >> from
> > >>>>>>>>>> different operations.
> > >>>>>>>>>>
> > >>>>>>>>>> Now, on to the performance numbers. The proof-of-concept I
> > >> created
> > >>> is
> > >>>>>>>>>> available here:
> > >>>>>>>>>>
> > https://github.com/aljoscha/flink/tree/event-time-window-fn-mock
> > >> .
> > >>>>> The
> > >>>>>>>>> basic
> > >>>>>>>>>> idea is that sources assign the current timestamp when
> emitting
> > >>>>>>> elements.
> > >>>>>>>>>> They also periodically emit watermarks that tell us that no
> > >>> elements
> > >>>>>>> with
> > >>>>>>>>>> an earlier timestamp will be emitted. The watermarks propagate
> > >>>>> through
> > >>>>>>>>> the
> > >>>>>>>>>> operators. The window operator looks at the timestamp of an
> > >> element
> > >>>>>>> and
> > >>>>>>>>>> puts it into the buffer that corresponds to that window. When
> > the
> > >>>>>>> window
> > >>>>>>>>>> operator receives a watermark it will look at the in-flight
> > >> windows
> > >>>>>>>>>> (basically the buffers) and emit those windows where the
> > >> window-end
> > >>>>> is
> > >>>>>>>>>> before the watermark.
> > >>>>>>>>>>
> > >>>>>>>>>> For measuring throughput I did the following: The source emits
> > >>> tuples
> > >>>>>>> of
> > >>>>>>>>>> the form ("tuple", 1) in an infinite loop. The window operator
> > >> sums
> > >>>>> up
> > >>>>>>>>> the
> > >>>>>>>>>> tuples, thereby counting how many tuples the window operator
> can
> > >>>>>>> handle
> > >>>>>>>>> in
> > >>>>>>>>>> a given time window. There are two different implementations
> for
> > >>> the
> > >>>>>>>>>> summation: 1) simply summing up the values in a mapWindow(),
> > >> there
> > >>>>> you
> > >>>>>>>>> get
> > >>>>>>>>>> a List of all tuples and simple iterate over it. 2) using
> > sum(1),
> > >>>>>>> which
> > >>>>>>>>> is
> > >>>>>>>>>> implemented as a reduce() (that uses the pre-reducer
> > >>> optimisations).
> > >>>>>>>>>>
> > >>>>>>>>>> These are the performance numbers (Current is the current
> > >>>>>>> implementation,
> > >>>>>>>>>> Next is my proof-of-concept):
> > >>>>>>>>>>
> > >>>>>>>>>> Tumbling (1 sec):
> > >>>>>>>>>> - Current/Map: 1.6 mio
> > >>>>>>>>>> - Current/Reduce: 2 mio
> > >>>>>>>>>> - Next/Map: 2.2 mio
> > >>>>>>>>>> - Next/Reduce: 4 mio
> > >>>>>>>>>>
> > >>>>>>>>>> Sliding (5 sec, slide 1 sec):
> > >>>>>>>>>> - Current/Map: ca 3 mio (fluctuates a lot)
> > >>>>>>>>>> - Current/Reduce: No output
> > >>>>>>>>>> - Next/Map: ca 4 mio (fluctuates)
> > >>>>>>>>>> - Next/Reduce: 10 mio
> > >>>>>>>>>>
> > >>>>>>>>>> The Next/Reduce variant can basically scale indefinitely with
> > >>> window
> > >>>>>>> size
> > >>>>>>>>>> because the internal state does not rely on the number of
> > >> elements
> > >>>>>>> (it is
> > >>>>>>>>>> just the current sum). The pre-reducer for sliding elements
> > >> cannot
> > >>>>>>> handle
> > >>>>>>>>>> the amount of tuples, it produces no output. For the two Map
> > >>> variants
> > >>>>>>> the
> > >>>>>>>>>> performance fluctuates because they always keep all the
> elements
> > >> in
> > >>>>> an
> > >>>>>>>>>> internal buffer before emission, this seems to tax the garbage
> > >>>>>>> collector
> > >>>>>>>>> a
> > >>>>>>>>>> bit and leads to random pauses.
> > >>>>>>>>>>
> > >>>>>>>>>> One thing that should be noted is that I had to disable the
> > >>>>>>> fake-element
> > >>>>>>>>>> emission thread, otherwise the Current versions would
> deadlock.
> > >>>>>>>>>>
> > >>>>>>>>>> So, I started working on this because I thought that
> > out-of-order
> > >>>>>>>>>> processing would be necessary for correctness. And it is
> > >> certainly,
> > >>>>>>> But
> > >>>>>>>>> the
> > >>>>>>>>>> proof-of-concept also shows that performance can be greatly
> > >>> improved.
> > >>>>>>>>>>
> > >>>>>>>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <gyula.f...@gmail.com
> >
> > >>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>> I agree lets separate these topics from each other so we can
> > get
> > >>>>>>> faster
> > >>>>>>>>>>> resolution.
> > >>>>>>>>>>>
> > >>>>>>>>>>> There is already a state discussion in the thread we started
> > >> with
> > >>>>>>> Paris.
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <
> > >>> ktzou...@apache.org
> > >>>>>>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> I agree with supporting out-of-order out of the box :-),
> even
> > >> if
> > >>>>>>> this
> > >>>>>>>>>> means
> > >>>>>>>>>>>> a major refactoring. This is the right time to refactor the
> > >>>>>>> streaming
> > >>>>>>>>>> API
> > >>>>>>>>>>>> before we pull it out of beta. I think that this is more
> > >>> important
> > >>>>>>>>> than
> > >>>>>>>>>> new
> > >>>>>>>>>>>> features in the streaming API, which can be prioritized once
> > >> the
> > >>>>>>> API
> > >>>>>>>>> is
> > >>>>>>>>>> out
> > >>>>>>>>>>>> of beta (meaning, that IMO this is the right time to stall
> PRs
> > >>>>>>> until
> > >>>>>>>>> we
> > >>>>>>>>>>>> agree on the design).
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> There are three sections in the document: windowing, state,
> > and
> > >>>>>>> API.
> > >>>>>>>>> How
> > >>>>>>>>>>>> convoluted are those with each other? Can we separate the
> > >>>>>>> discussion
> > >>>>>>>>> or
> > >>>>>>>>>> do
> > >>>>>>>>>>>> we need to discuss those all together? I think part of the
> > >>>>>>> difficulty
> > >>>>>>>>> is
> > >>>>>>>>>>>> that we are discussing three design choices at once.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <
> > >>>>>>> ted.dunn...@gmail.com>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Out of order is ubiquitous in the real-world.  Typically,
> > what
> > >>>>>>>>>> happens is
> > >>>>>>>>>>>>> that businesses will declare a maximum allowable delay for
> > >>>>>>> delayed
> > >>>>>>>>>>>>> transactions and will commit to results when that delay is
> > >>>>>>> reached.
> > >>>>>>>>>>>>> Transactions that arrive later than this cutoff are
> collected
> > >>>>>>>>>> specially
> > >>>>>>>>>>>> as
> > >>>>>>>>>>>>> corrections which are reported/used when possible.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Clearly, ordering can also be violated during processing,
> but
> > >> if
> > >>>>>>> the
> > >>>>>>>>>> data
> > >>>>>>>>>>>>> is originally out of order the situation can't be repaired
> by
> > >>> any
> > >>>>>>>>>>>> protocol
> > >>>>>>>>>>>>> fixes that prevent transactions from becoming disordered
> but
> > >> has
> > >>>>>>> to
> > >>>>>>>>>>>> handled
> > >>>>>>>>>>>>> at the data level.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <
> > >>>>>>>>> aljos...@apache.org
> > >>>>>>>>>>>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I also don't like big changes but sometimes they are
> > >> necessary.
> > >>>>>>>>> The
> > >>>>>>>>>>>>> reason
> > >>>>>>>>>>>>>> why I'm so adamant about out-of-order processing is that
> > >>>>>>>>>> out-of-order
> > >>>>>>>>>>>>>> elements are not some exception that occurs once in a
> while;
> > >>>>>>> they
> > >>>>>>>>>> occur
> > >>>>>>>>>>>>>> constantly in a distributed system. For example, in this:
> > >>>>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 the
> > >>>>>>>>> resulting
> > >>>>>>>>>>>>>> windows
> > >>>>>>>>>>>>>> are completely bogus because the current windowing system
> > >>>>>>> assumes
> > >>>>>>>>>>>>> elements
> > >>>>>>>>>>>>>> to globally arrive in order, which is simply not true.
> (The
> > >>>>>>>>> example
> > >>>>>>>>>>>> has a
> > >>>>>>>>>>>>>> source that generates increasing integers. Then these pass
> > >>>>>>>>> through a
> > >>>>>>>>>>>> map
> > >>>>>>>>>>>>>> and are unioned with the original DataStream before a
> window
> > >>>>>>>>>> operator.)
> > >>>>>>>>>>>>>> This simulates elements arriving from different operators
> at
> > >> a
> > >>>>>>>>>>>> windowing
> > >>>>>>>>>>>>>> operator. The example is also DOP=1, I imagine this to get
> > >>>>>>> worse
> > >>>>>>>>>> with
> > >>>>>>>>>>>>>> higher DOP.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> What do you mean by costly? As I said, I have a
> > >>>>>>> proof-of-concept
> > >>>>>>>>>>>>> windowing
> > >>>>>>>>>>>>>> operator that can handle out-or-order elements. This is an
> > >>>>>>> example
> > >>>>>>>>>>>> using
> > >>>>>>>>>>>>>> the current Flink API:
> > >>>>>>>>>>>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
> > >>>>>>>>>>>>>> (It is an infinite source of tuples and a 5 second window
> > >>>>>>> operator
> > >>>>>>>>>> that
> > >>>>>>>>>>>>>> counts the tuples.) The first problem is that this code
> > >>>>>>> deadlocks
> > >>>>>>>>>>>> because
> > >>>>>>>>>>>>>> of the thread that emits fake elements. If I disable the
> > fake
> > >>>>>>>>>> element
> > >>>>>>>>>>>>> code
> > >>>>>>>>>>>>>> it works, but the throughput using my mockup is 4 times
> > >> higher
> > >>>>>>> .
> > >>>>>>>>> The
> > >>>>>>>>>>>> gap
> > >>>>>>>>>>>>>> widens dramatically if the window size increases.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> So, it actually increases performance (unless I'm making a
> > >>>>>>> mistake
> > >>>>>>>>>> in
> > >>>>>>>>>>>> my
> > >>>>>>>>>>>>>> explorations) and can handle elements that arrive
> > >> out-of-order
> > >>>>>>>>>> (which
> > >>>>>>>>>>>>>> happens basically always in a real-world windowing
> > >> use-cases).
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <
> se...@apache.org
> > >
> > >>>>>>>>> wrote:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> What I like a lot about Aljoscha's proposed design is
> that
> > >> we
> > >>>>>>>>>> need no
> > >>>>>>>>>>>>>>> different code for "system time" vs. "event time". It
> only
> > >>>>>>>>>> differs in
> > >>>>>>>>>>>>>> where
> > >>>>>>>>>>>>>>> the timestamps are assigned.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> The OOP approach also gives you the semantics of total
> > >>>>>>> ordering
> > >>>>>>>>>>>> without
> > >>>>>>>>>>>>>>> imposing merges on the streams.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
> > >>>>>>>>>>>>>>> mj...@informatik.hu-berlin.de> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I agree that there should be multiple alternatives the
> > >>>>>>> user(!)
> > >>>>>>>>>> can
> > >>>>>>>>>>>>>>>> choose from. Partial out-of-order processing works for
> > >>>>>>>>> many/most
> > >>>>>>>>>>>>>>>> aggregates. However, if you consider
> > >>>>>>> Event-Pattern-Matching,
> > >>>>>>>>>> global
> > >>>>>>>>>>>>>>>> ordering in necessary (even if the performance penalty
> > >>>>>>> might
> > >>>>>>>>> be
> > >>>>>>>>>>>>> high).
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I would also keep "system-time windows" as an
> alternative
> > >>>>>>> to
> > >>>>>>>>>>>> "source
> > >>>>>>>>>>>>>>>> assigned ts-windows".
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> It might also be interesting to consider the following
> > >>>>>>> paper
> > >>>>>>>>> for
> > >>>>>>>>>>>>>>>> overlapping windows: "Resource sharing in continuous
> > >>>>>>>>>> sliding-window
> > >>>>>>>>>>>>>>>> aggregates"
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote:
> > >>>>>>>>>>>>>>>>> Hey
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> I think we should not block PRs unnecessarily if your
> > >>>>>>>>>> suggested
> > >>>>>>>>>>>>>> changes
> > >>>>>>>>>>>>>>>>> might touch them at some point.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Also I still think we should not put everything in the
> > >>>>>>>>>> Datastream
> > >>>>>>>>>>>>>>> because
> > >>>>>>>>>>>>>>>>> it will be a huge mess.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Also we need to agree on the out of order processing,
> > >>>>>>>>> whether
> > >>>>>>>>>> we
> > >>>>>>>>>>>>> want
> > >>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>> the way you proposed it(which is quite costly). Another
> > >>>>>>>>>>>> alternative
> > >>>>>>>>>>>>>>>>> approach there which fits in the current windowing is
> to
> > >>>>>>>>>> filter
> > >>>>>>>>>>>> out
> > >>>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>> order events and apply a special handling operator on
> > >>>>>>> them.
> > >>>>>>>>>> This
> > >>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>> fairly lightweight.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> My point is that we need to consider some alternative
> > >>>>>>>>>> solutions.
> > >>>>>>>>>>>>> And
> > >>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>> should not block contributions along the way.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Cheers
> > >>>>>>>>>>>>>>>>> Gyula
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <
> > >>>>>>>>>>>>>> aljos...@apache.org>
> > >>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> The reason I posted this now is that we need to think
> > >>>>>>> about
> > >>>>>>>>>> the
> > >>>>>>>>>>>>> API
> > >>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>> windowing before proceeding with the PRs of Gabor
> > >>>>>>> (inverse
> > >>>>>>>>>>>> reduce)
> > >>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>> Gyula (removal of "aggregate" functions on
> DataStream).
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> For the windowing, I think that the current model does
> > >>>>>>> not
> > >>>>>>>>>> work
> > >>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>> out-of-order processing. Therefore, the whole
> windowing
> > >>>>>>>>>>>>>> infrastructure
> > >>>>>>>>>>>>>>>> will
> > >>>>>>>>>>>>>>>>>> basically have to be redone. Meaning also that any
> work
> > >>>>>>> on
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> pre-aggregators or optimizations that we do now
> becomes
> > >>>>>>>>>> useless.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> For the API, I proposed to restructure the
> interactions
> > >>>>>>>>>> between
> > >>>>>>>>>>>>> all
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> different *DataStream classes and grouping/windowing.
> > >>>>>>> (See
> > >>>>>>>>>> API
> > >>>>>>>>>>>>>> section
> > >>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>> the doc I posted.)
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <
> > >>>>>>>>> gyula.f...@gmail.com
> > >>>>>>>>>>>
> > >>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Hi Aljoscha,
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Thanks for the nice summary, this is a very good
> > >>>>>>>>> initiative.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I added some comments to the respective sections
> > >>>>>>> (where I
> > >>>>>>>>>> didnt
> > >>>>>>>>>>>>>> fully
> > >>>>>>>>>>>>>>>>>> agree
> > >>>>>>>>>>>>>>>>>>> :).).
> > >>>>>>>>>>>>>>>>>>> At some point I think it would be good to have a
> public
> > >>>>>>>>>> hangout
> > >>>>>>>>>>>>>>> session
> > >>>>>>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>>> this, which could make a more dynamic discussion.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>>>>> Gyula
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> ezt írta
> > >>>>>>> (időpont:
> > >>>>>>>>>>>> 2015.
> > >>>>>>>>>>>>>> jún.
> > >>>>>>>>>>>>>>>>>> 22.,
> > >>>>>>>>>>>>>>>>>>> H, 21:34):
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Hi,
> > >>>>>>>>>>>>>>>>>>>> with people proposing changes to the streaming part
> I
> > >>>>>>>>> also
> > >>>>>>>>>>>>> wanted
> > >>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>> throw
> > >>>>>>>>>>>>>>>>>>>> my hat into the ring. :D
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> During the last few months, while I was getting
> > >>>>>>>>> acquainted
> > >>>>>>>>>>>> with
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> streaming system, I wrote down some thoughts I had
> > >>>>>>> about
> > >>>>>>>>>> how
> > >>>>>>>>>>>>>> things
> > >>>>>>>>>>>>>>>>>> could
> > >>>>>>>>>>>>>>>>>>>> be improved. Hopefully, they are in somewhat
> coherent
> > >>>>>>>>> shape
> > >>>>>>>>>>>> now,
> > >>>>>>>>>>>>>> so
> > >>>>>>>>>>>>>>>>>>> please
> > >>>>>>>>>>>>>>>>>>>> have a look if you are interested in this:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>
> > >>
> >
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> This mostly covers:
> > >>>>>>>>>>>>>>>>>>>> - Timestamps assigned at sources
> > >>>>>>>>>>>>>>>>>>>> - Out-of-order processing of elements in window
> > >>>>>>>>> operators
> > >>>>>>>>>>>>>>>>>>>> - API design
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Please let me know what you think. Comment in the
> > >>>>>>>>> document
> > >>>>>>>>>> or
> > >>>>>>>>>>>>> here
> > >>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> mailing list.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> I have a PR in the makings that would introduce
> source
> > >>>>>>>>>>>>> timestamps
> > >>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>> watermarks for keeping track of them. I also hacked
> a
> > >>>>>>>>>>>>>>> proof-of-concept
> > >>>>>>>>>>>>>>>>>>> of a
> > >>>>>>>>>>>>>>>>>>>> windowing system that is able to process
> out-of-order
> > >>>>>>>>>> elements
> > >>>>>>>>>>>>>>> using a
> > >>>>>>>>>>>>>>>>>>>> FlatMap operator. (It uses panes to perform
> efficient
> > >>>>>>>>>>>>>>>>>> pre-aggregations.)
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>>>>>> Aljoscha
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>>
> > >>
> >
> >
>

Reply via email to