So I gather I should go forward with this? If no-one objects I will open a Jira and work on this.
On Tue, May 5, 2015 at 4:14 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > Yes, because the handling of punctuations depends on the operator: A > MapOperator can just forward them while a windowed join or reduce can > only forward them after emitting the correct windows or results. > > On Tue, May 5, 2015 at 3:58 PM, Paris Carbone <par...@kth.se> wrote: >> By watermark handling I meant making punctuations explicit and >> forwarding/modifying them at the operator level. I think this is clear so >> far. >>> On 05 May 2015, at 15:41, Aljoscha Krettek <aljos...@apache.org> wrote: >>> >>> There is no watermark handling yet. :D >>> >>> But this would enable me to do this. >>> >>> On Tue, May 5, 2015 at 3:39 PM, Paris Carbone <par...@kth.se> wrote: >>>> I agree with Gyula on this one. Barriers should better not be exposed to >>>> the operator. They are system events for state management. Apart from >>>> that, watermark handling seems to be on a right track, I like it so far. >>>> >>>>> On 05 May 2015, at 15:26, Aljoscha Krettek <aljos...@apache.org> wrote: >>>>> >>>>> I don't know, I just put that there because other people are working >>>>> on the checkpointing/barrier thing. So there would need to be some >>>>> functionality there at some point. >>>>> >>>>> Or maybe it is not required there and can be handled in the >>>>> StreamTask. Others might know this better than I do right now. >>>>> >>>>> On Tue, May 5, 2015 at 3:24 PM, Gyula Fóra <gyula.f...@gmail.com> wrote: >>>>>> What would the processBarrier method do? >>>>>> >>>>>> On Tuesday, May 5, 2015, Aljoscha Krettek <aljos...@apache.org> wrote: >>>>>> >>>>>>> I'm using the term punctuation and watermark interchangeably here >>>>>>> because for practical purposes they do the same thing. I'm not sure >>>>>>> what you meant with your comment about those. >>>>>>> >>>>>>> For the Operator interface I'm thinking about something like this: >>>>>>> >>>>>>> abstract class OneInputStreamOperator<IN, OUT, F extends Function> { >>>>>>> public processElement(IN element); >>>>>>> public processBarrier(...); >>>>>>> public processPunctuation/lowWatermark(...): >>>>>>> } >>>>>>> >>>>>>> The operator also has access to the TaskContext and ExecutionConfig >>>>>>> and Serializers. The operator would emit values using an emit() method >>>>>>> or the Collector interface, not sure about that yet. >>>>>>> >>>>>>> On Tue, May 5, 2015 at 3:12 PM, Gyula Fóra <gyf...@apache.org >>>>>>> <javascript:;>> wrote: >>>>>>>> I think this a good idea in general. I would try to minimize the >>>>>>>> methods >>>>>>> we >>>>>>>> include and make the ones that we keep very concrete. For instance i >>>>>>> would >>>>>>>> not have the receive barrier method as that is handled on a totally >>>>>>>> different level already. And instead of punctuation I would directly >>>>>>>> add >>>>>>> a >>>>>>>> method to work on watermarks. >>>>>>>> >>>>>>>> On Tuesday, May 5, 2015, Aljoscha Krettek <aljos...@apache.org >>>>>>> <javascript:;>> wrote: >>>>>>>> >>>>>>>>> What do you mean by "losing iterations"? >>>>>>>>> >>>>>>>>> For the pros and cons: >>>>>>>>> >>>>>>>>> Cons: I can't think of any, since most of the operators are chainable >>>>>>>>> already and already behave like a collector. >>>>>>>>> >>>>>>>>> Pros: >>>>>>>>> - Unified model for operators, chainable operators don't have to >>>>>>>>> worry about input iterators and the collect interface. >>>>>>>>> - Enables features that we want in the future, such as barriers and >>>>>>>>> punctuations because they don't work with the >>>>>>>>> simple Collector interface. >>>>>>>>> - The while-loop is moved outside of the operators, now the Task (the >>>>>>>>> thing that runs Operators) can control the flow of data better and >>>>>>>>> deal with >>>>>>>>> stuff like barriers and punctuations. If we want to keep the >>>>>>>>> main-loop inside each operator, then they all have to manage input >>>>>>>>> readers and inline events manually. >>>>>>>>> >>>>>>>>> On Tue, May 5, 2015 at 2:41 PM, Kostas Tzoumas <ktzou...@apache.org >>>>>>> <javascript:;> >>>>>>>>> <javascript:;>> wrote: >>>>>>>>>> Can you give us a rough idea of the pros and cons? Do we lose some >>>>>>>>>> functionality by getting rid of iterations? >>>>>>>>>> >>>>>>>>>> Kostas >>>>>>>>>> >>>>>>>>>> On Tue, May 5, 2015 at 1:37 PM, Aljoscha Krettek <aljos...@apache.org >>>>>>> <javascript:;> >>>>>>>>> <javascript:;>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Folks, >>>>>>>>>>> while working on introducing source-assigned timestamps into >>>>>>> streaming >>>>>>>>>>> (https://issues.apache.org/jira/browse/FLINK-1967) I thought about >>>>>>> how >>>>>>>>>>> the punctuations (low watermarks) can be pushed through the system. >>>>>>>>>>> The problem is, that operators can have two ways of getting input: >>>>>>>>>>> 1. >>>>>>>>>>> They read directly from input iterators, and 2. They act as a >>>>>>>>>>> Collector and get elements via collect() from the previous operator >>>>>>> in >>>>>>>>>>> a chain. >>>>>>>>>>> >>>>>>>>>>> This makes it hard to push things through a chain that are not >>>>>>>>>>> elements, such as barriers and/or punctuations. >>>>>>>>>>> >>>>>>>>>>> I propose to change all streaming operators to be push based, with a >>>>>>>>>>> slightly improved interface: In addition to collect(), which I would >>>>>>>>>>> call receiveElement() I would add receivePunctuation() and >>>>>>>>>>> receiveBarrier(). The first operator in the chain would also get >>>>>>>>>>> data >>>>>>>>>>> from the outside invokable that reads from the input iterator and >>>>>>>>>>> calls receiveElement() for the first operator in a chain. >>>>>>>>>>> >>>>>>>>>>> What do you think? I would of course be willing to implement this >>>>>>>>> myself. >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Aljoscha >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>> >>