Yep, I would say: Move ahead :-) On Tue, May 5, 2015 at 4:48 PM, Aljoscha Krettek <aljos...@apache.org> wrote:
> So I gather I should go forward with this? If no-one objects I will > open a Jira and work on this. > > On Tue, May 5, 2015 at 4:14 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > > Yes, because the handling of punctuations depends on the operator: A > > MapOperator can just forward them while a windowed join or reduce can > > only forward them after emitting the correct windows or results. > > > > On Tue, May 5, 2015 at 3:58 PM, Paris Carbone <par...@kth.se> wrote: > >> By watermark handling I meant making punctuations explicit and > forwarding/modifying them at the operator level. I think this is clear so > far. > >>> On 05 May 2015, at 15:41, Aljoscha Krettek <aljos...@apache.org> > wrote: > >>> > >>> There is no watermark handling yet. :D > >>> > >>> But this would enable me to do this. > >>> > >>> On Tue, May 5, 2015 at 3:39 PM, Paris Carbone <par...@kth.se> wrote: > >>>> I agree with Gyula on this one. Barriers should better not be exposed > to the operator. They are system events for state management. Apart from > that, watermark handling seems to be on a right track, I like it so far. > >>>> > >>>>> On 05 May 2015, at 15:26, Aljoscha Krettek <aljos...@apache.org> > wrote: > >>>>> > >>>>> I don't know, I just put that there because other people are working > >>>>> on the checkpointing/barrier thing. So there would need to be some > >>>>> functionality there at some point. > >>>>> > >>>>> Or maybe it is not required there and can be handled in the > >>>>> StreamTask. Others might know this better than I do right now. > >>>>> > >>>>> On Tue, May 5, 2015 at 3:24 PM, Gyula Fóra <gyula.f...@gmail.com> > wrote: > >>>>>> What would the processBarrier method do? > >>>>>> > >>>>>> On Tuesday, May 5, 2015, Aljoscha Krettek <aljos...@apache.org> > wrote: > >>>>>> > >>>>>>> I'm using the term punctuation and watermark interchangeably here > >>>>>>> because for practical purposes they do the same thing. I'm not sure > >>>>>>> what you meant with your comment about those. > >>>>>>> > >>>>>>> For the Operator interface I'm thinking about something like this: > >>>>>>> > >>>>>>> abstract class OneInputStreamOperator<IN, OUT, F extends > Function> { > >>>>>>> public processElement(IN element); > >>>>>>> public processBarrier(...); > >>>>>>> public processPunctuation/lowWatermark(...): > >>>>>>> } > >>>>>>> > >>>>>>> The operator also has access to the TaskContext and ExecutionConfig > >>>>>>> and Serializers. The operator would emit values using an emit() > method > >>>>>>> or the Collector interface, not sure about that yet. > >>>>>>> > >>>>>>> On Tue, May 5, 2015 at 3:12 PM, Gyula Fóra <gyf...@apache.org > >>>>>>> <javascript:;>> wrote: > >>>>>>>> I think this a good idea in general. I would try to minimize the > methods > >>>>>>> we > >>>>>>>> include and make the ones that we keep very concrete. For > instance i > >>>>>>> would > >>>>>>>> not have the receive barrier method as that is handled on a > totally > >>>>>>>> different level already. And instead of punctuation I would > directly add > >>>>>>> a > >>>>>>>> method to work on watermarks. > >>>>>>>> > >>>>>>>> On Tuesday, May 5, 2015, Aljoscha Krettek <aljos...@apache.org > >>>>>>> <javascript:;>> wrote: > >>>>>>>> > >>>>>>>>> What do you mean by "losing iterations"? > >>>>>>>>> > >>>>>>>>> For the pros and cons: > >>>>>>>>> > >>>>>>>>> Cons: I can't think of any, since most of the operators are > chainable > >>>>>>>>> already and already behave like a collector. > >>>>>>>>> > >>>>>>>>> Pros: > >>>>>>>>> - Unified model for operators, chainable operators don't have to > >>>>>>>>> worry about input iterators and the collect interface. > >>>>>>>>> - Enables features that we want in the future, such as barriers > and > >>>>>>>>> punctuations because they don't work with the > >>>>>>>>> simple Collector interface. > >>>>>>>>> - The while-loop is moved outside of the operators, now the Task > (the > >>>>>>>>> thing that runs Operators) can control the flow of data better > and > >>>>>>>>> deal with > >>>>>>>>> stuff like barriers and punctuations. If we want to keep the > >>>>>>>>> main-loop inside each operator, then they all have to manage > input > >>>>>>>>> readers and inline events manually. > >>>>>>>>> > >>>>>>>>> On Tue, May 5, 2015 at 2:41 PM, Kostas Tzoumas < > ktzou...@apache.org > >>>>>>> <javascript:;> > >>>>>>>>> <javascript:;>> wrote: > >>>>>>>>>> Can you give us a rough idea of the pros and cons? Do we lose > some > >>>>>>>>>> functionality by getting rid of iterations? > >>>>>>>>>> > >>>>>>>>>> Kostas > >>>>>>>>>> > >>>>>>>>>> On Tue, May 5, 2015 at 1:37 PM, Aljoscha Krettek < > aljos...@apache.org > >>>>>>> <javascript:;> > >>>>>>>>> <javascript:;>> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi Folks, > >>>>>>>>>>> while working on introducing source-assigned timestamps into > >>>>>>> streaming > >>>>>>>>>>> (https://issues.apache.org/jira/browse/FLINK-1967) I thought > about > >>>>>>> how > >>>>>>>>>>> the punctuations (low watermarks) can be pushed through the > system. > >>>>>>>>>>> The problem is, that operators can have two ways of getting > input: 1. > >>>>>>>>>>> They read directly from input iterators, and 2. They act as a > >>>>>>>>>>> Collector and get elements via collect() from the previous > operator > >>>>>>> in > >>>>>>>>>>> a chain. > >>>>>>>>>>> > >>>>>>>>>>> This makes it hard to push things through a chain that are not > >>>>>>>>>>> elements, such as barriers and/or punctuations. > >>>>>>>>>>> > >>>>>>>>>>> I propose to change all streaming operators to be push based, > with a > >>>>>>>>>>> slightly improved interface: In addition to collect(), which I > would > >>>>>>>>>>> call receiveElement() I would add receivePunctuation() and > >>>>>>>>>>> receiveBarrier(). The first operator in the chain would also > get data > >>>>>>>>>>> from the outside invokable that reads from the input iterator > and > >>>>>>>>>>> calls receiveElement() for the first operator in a chain. > >>>>>>>>>>> > >>>>>>>>>>> What do you think? I would of course be willing to implement > this > >>>>>>>>> myself. > >>>>>>>>>>> > >>>>>>>>>>> Cheers, > >>>>>>>>>>> Aljoscha > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>> > >> >