Yes, because the handling of punctuations depends on the operator: A MapOperator can just forward them while a windowed join or reduce can only forward them after emitting the correct windows or results.
On Tue, May 5, 2015 at 3:58 PM, Paris Carbone <par...@kth.se> wrote: > By watermark handling I meant making punctuations explicit and > forwarding/modifying them at the operator level. I think this is clear so far. >> On 05 May 2015, at 15:41, Aljoscha Krettek <aljos...@apache.org> wrote: >> >> There is no watermark handling yet. :D >> >> But this would enable me to do this. >> >> On Tue, May 5, 2015 at 3:39 PM, Paris Carbone <par...@kth.se> wrote: >>> I agree with Gyula on this one. Barriers should better not be exposed to >>> the operator. They are system events for state management. Apart from that, >>> watermark handling seems to be on a right track, I like it so far. >>> >>>> On 05 May 2015, at 15:26, Aljoscha Krettek <aljos...@apache.org> wrote: >>>> >>>> I don't know, I just put that there because other people are working >>>> on the checkpointing/barrier thing. So there would need to be some >>>> functionality there at some point. >>>> >>>> Or maybe it is not required there and can be handled in the >>>> StreamTask. Others might know this better than I do right now. >>>> >>>> On Tue, May 5, 2015 at 3:24 PM, Gyula Fóra <gyula.f...@gmail.com> wrote: >>>>> What would the processBarrier method do? >>>>> >>>>> On Tuesday, May 5, 2015, Aljoscha Krettek <aljos...@apache.org> wrote: >>>>> >>>>>> I'm using the term punctuation and watermark interchangeably here >>>>>> because for practical purposes they do the same thing. I'm not sure >>>>>> what you meant with your comment about those. >>>>>> >>>>>> For the Operator interface I'm thinking about something like this: >>>>>> >>>>>> abstract class OneInputStreamOperator<IN, OUT, F extends Function> { >>>>>> public processElement(IN element); >>>>>> public processBarrier(...); >>>>>> public processPunctuation/lowWatermark(...): >>>>>> } >>>>>> >>>>>> The operator also has access to the TaskContext and ExecutionConfig >>>>>> and Serializers. The operator would emit values using an emit() method >>>>>> or the Collector interface, not sure about that yet. >>>>>> >>>>>> On Tue, May 5, 2015 at 3:12 PM, Gyula Fóra <gyf...@apache.org >>>>>> <javascript:;>> wrote: >>>>>>> I think this a good idea in general. I would try to minimize the methods >>>>>> we >>>>>>> include and make the ones that we keep very concrete. For instance i >>>>>> would >>>>>>> not have the receive barrier method as that is handled on a totally >>>>>>> different level already. And instead of punctuation I would directly add >>>>>> a >>>>>>> method to work on watermarks. >>>>>>> >>>>>>> On Tuesday, May 5, 2015, Aljoscha Krettek <aljos...@apache.org >>>>>> <javascript:;>> wrote: >>>>>>> >>>>>>>> What do you mean by "losing iterations"? >>>>>>>> >>>>>>>> For the pros and cons: >>>>>>>> >>>>>>>> Cons: I can't think of any, since most of the operators are chainable >>>>>>>> already and already behave like a collector. >>>>>>>> >>>>>>>> Pros: >>>>>>>> - Unified model for operators, chainable operators don't have to >>>>>>>> worry about input iterators and the collect interface. >>>>>>>> - Enables features that we want in the future, such as barriers and >>>>>>>> punctuations because they don't work with the >>>>>>>> simple Collector interface. >>>>>>>> - The while-loop is moved outside of the operators, now the Task (the >>>>>>>> thing that runs Operators) can control the flow of data better and >>>>>>>> deal with >>>>>>>> stuff like barriers and punctuations. If we want to keep the >>>>>>>> main-loop inside each operator, then they all have to manage input >>>>>>>> readers and inline events manually. >>>>>>>> >>>>>>>> On Tue, May 5, 2015 at 2:41 PM, Kostas Tzoumas <ktzou...@apache.org >>>>>> <javascript:;> >>>>>>>> <javascript:;>> wrote: >>>>>>>>> Can you give us a rough idea of the pros and cons? Do we lose some >>>>>>>>> functionality by getting rid of iterations? >>>>>>>>> >>>>>>>>> Kostas >>>>>>>>> >>>>>>>>> On Tue, May 5, 2015 at 1:37 PM, Aljoscha Krettek <aljos...@apache.org >>>>>> <javascript:;> >>>>>>>> <javascript:;>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Folks, >>>>>>>>>> while working on introducing source-assigned timestamps into >>>>>> streaming >>>>>>>>>> (https://issues.apache.org/jira/browse/FLINK-1967) I thought about >>>>>> how >>>>>>>>>> the punctuations (low watermarks) can be pushed through the system. >>>>>>>>>> The problem is, that operators can have two ways of getting input: 1. >>>>>>>>>> They read directly from input iterators, and 2. They act as a >>>>>>>>>> Collector and get elements via collect() from the previous operator >>>>>> in >>>>>>>>>> a chain. >>>>>>>>>> >>>>>>>>>> This makes it hard to push things through a chain that are not >>>>>>>>>> elements, such as barriers and/or punctuations. >>>>>>>>>> >>>>>>>>>> I propose to change all streaming operators to be push based, with a >>>>>>>>>> slightly improved interface: In addition to collect(), which I would >>>>>>>>>> call receiveElement() I would add receivePunctuation() and >>>>>>>>>> receiveBarrier(). The first operator in the chain would also get data >>>>>>>>>> from the outside invokable that reads from the input iterator and >>>>>>>>>> calls receiveElement() for the first operator in a chain. >>>>>>>>>> >>>>>>>>>> What do you think? I would of course be willing to implement this >>>>>>>> myself. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Aljoscha >>>>>>>>>> >>>>>>>> >>>>>> >>> >