There is already a Jira and a Pull Request: https://github.com/apache/flink/pull/659
On Mon, May 11, 2015 at 6:29 PM, Stephan Ewen <se...@apache.org> wrote: > Yep, I would say: Move ahead :-) > > On Tue, May 5, 2015 at 4:48 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> So I gather I should go forward with this? If no-one objects I will >> open a Jira and work on this. >> >> On Tue, May 5, 2015 at 4:14 PM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> > Yes, because the handling of punctuations depends on the operator: A >> > MapOperator can just forward them while a windowed join or reduce can >> > only forward them after emitting the correct windows or results. >> > >> > On Tue, May 5, 2015 at 3:58 PM, Paris Carbone <par...@kth.se> wrote: >> >> By watermark handling I meant making punctuations explicit and >> forwarding/modifying them at the operator level. I think this is clear so >> far. >> >>> On 05 May 2015, at 15:41, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> >> >>> There is no watermark handling yet. :D >> >>> >> >>> But this would enable me to do this. >> >>> >> >>> On Tue, May 5, 2015 at 3:39 PM, Paris Carbone <par...@kth.se> wrote: >> >>>> I agree with Gyula on this one. Barriers should better not be exposed >> to the operator. They are system events for state management. Apart from >> that, watermark handling seems to be on a right track, I like it so far. >> >>>> >> >>>>> On 05 May 2015, at 15:26, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>>>> >> >>>>> I don't know, I just put that there because other people are working >> >>>>> on the checkpointing/barrier thing. So there would need to be some >> >>>>> functionality there at some point. >> >>>>> >> >>>>> Or maybe it is not required there and can be handled in the >> >>>>> StreamTask. Others might know this better than I do right now. >> >>>>> >> >>>>> On Tue, May 5, 2015 at 3:24 PM, Gyula Fóra <gyula.f...@gmail.com> >> wrote: >> >>>>>> What would the processBarrier method do? >> >>>>>> >> >>>>>> On Tuesday, May 5, 2015, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>>>>> >> >>>>>>> I'm using the term punctuation and watermark interchangeably here >> >>>>>>> because for practical purposes they do the same thing. I'm not sure >> >>>>>>> what you meant with your comment about those. >> >>>>>>> >> >>>>>>> For the Operator interface I'm thinking about something like this: >> >>>>>>> >> >>>>>>> abstract class OneInputStreamOperator<IN, OUT, F extends >> Function> { >> >>>>>>> public processElement(IN element); >> >>>>>>> public processBarrier(...); >> >>>>>>> public processPunctuation/lowWatermark(...): >> >>>>>>> } >> >>>>>>> >> >>>>>>> The operator also has access to the TaskContext and ExecutionConfig >> >>>>>>> and Serializers. The operator would emit values using an emit() >> method >> >>>>>>> or the Collector interface, not sure about that yet. >> >>>>>>> >> >>>>>>> On Tue, May 5, 2015 at 3:12 PM, Gyula Fóra <gyf...@apache.org >> >>>>>>> <javascript:;>> wrote: >> >>>>>>>> I think this a good idea in general. I would try to minimize the >> methods >> >>>>>>> we >> >>>>>>>> include and make the ones that we keep very concrete. For >> instance i >> >>>>>>> would >> >>>>>>>> not have the receive barrier method as that is handled on a >> totally >> >>>>>>>> different level already. And instead of punctuation I would >> directly add >> >>>>>>> a >> >>>>>>>> method to work on watermarks. >> >>>>>>>> >> >>>>>>>> On Tuesday, May 5, 2015, Aljoscha Krettek <aljos...@apache.org >> >>>>>>> <javascript:;>> wrote: >> >>>>>>>> >> >>>>>>>>> What do you mean by "losing iterations"? >> >>>>>>>>> >> >>>>>>>>> For the pros and cons: >> >>>>>>>>> >> >>>>>>>>> Cons: I can't think of any, since most of the operators are >> chainable >> >>>>>>>>> already and already behave like a collector. >> >>>>>>>>> >> >>>>>>>>> Pros: >> >>>>>>>>> - Unified model for operators, chainable operators don't have to >> >>>>>>>>> worry about input iterators and the collect interface. >> >>>>>>>>> - Enables features that we want in the future, such as barriers >> and >> >>>>>>>>> punctuations because they don't work with the >> >>>>>>>>> simple Collector interface. >> >>>>>>>>> - The while-loop is moved outside of the operators, now the Task >> (the >> >>>>>>>>> thing that runs Operators) can control the flow of data better >> and >> >>>>>>>>> deal with >> >>>>>>>>> stuff like barriers and punctuations. If we want to keep the >> >>>>>>>>> main-loop inside each operator, then they all have to manage >> input >> >>>>>>>>> readers and inline events manually. >> >>>>>>>>> >> >>>>>>>>> On Tue, May 5, 2015 at 2:41 PM, Kostas Tzoumas < >> ktzou...@apache.org >> >>>>>>> <javascript:;> >> >>>>>>>>> <javascript:;>> wrote: >> >>>>>>>>>> Can you give us a rough idea of the pros and cons? Do we lose >> some >> >>>>>>>>>> functionality by getting rid of iterations? >> >>>>>>>>>> >> >>>>>>>>>> Kostas >> >>>>>>>>>> >> >>>>>>>>>> On Tue, May 5, 2015 at 1:37 PM, Aljoscha Krettek < >> aljos...@apache.org >> >>>>>>> <javascript:;> >> >>>>>>>>> <javascript:;>> >> >>>>>>>>>> wrote: >> >>>>>>>>>> >> >>>>>>>>>>> Hi Folks, >> >>>>>>>>>>> while working on introducing source-assigned timestamps into >> >>>>>>> streaming >> >>>>>>>>>>> (https://issues.apache.org/jira/browse/FLINK-1967) I thought >> about >> >>>>>>> how >> >>>>>>>>>>> the punctuations (low watermarks) can be pushed through the >> system. >> >>>>>>>>>>> The problem is, that operators can have two ways of getting >> input: 1. >> >>>>>>>>>>> They read directly from input iterators, and 2. They act as a >> >>>>>>>>>>> Collector and get elements via collect() from the previous >> operator >> >>>>>>> in >> >>>>>>>>>>> a chain. >> >>>>>>>>>>> >> >>>>>>>>>>> This makes it hard to push things through a chain that are not >> >>>>>>>>>>> elements, such as barriers and/or punctuations. >> >>>>>>>>>>> >> >>>>>>>>>>> I propose to change all streaming operators to be push based, >> with a >> >>>>>>>>>>> slightly improved interface: In addition to collect(), which I >> would >> >>>>>>>>>>> call receiveElement() I would add receivePunctuation() and >> >>>>>>>>>>> receiveBarrier(). The first operator in the chain would also >> get data >> >>>>>>>>>>> from the outside invokable that reads from the input iterator >> and >> >>>>>>>>>>> calls receiveElement() for the first operator in a chain. >> >>>>>>>>>>> >> >>>>>>>>>>> What do you think? I would of course be willing to implement >> this >> >>>>>>>>> myself. >> >>>>>>>>>>> >> >>>>>>>>>>> Cheers, >> >>>>>>>>>>> Aljoscha >> >>>>>>>>>>> >> >>>>>>>>> >> >>>>>>> >> >>>> >> >> >>