By watermark handling I meant making punctuations explicit and
forwarding/modifying them at the operator level. I think this is clear so far.
> On 05 May 2015, at 15:41, Aljoscha Krettek <aljos...@apache.org> wrote:
>
> There is no watermark handling yet. :D
>
> But this would enable me to do this.
>
> On Tue, May 5, 2015 at 3:39 PM, Paris Carbone <par...@kth.se> wrote:
>> I agree with Gyula on this one. Barriers should better not be exposed to the
>> operator. They are system events for state management. Apart from that,
>> watermark handling seems to be on a right track, I like it so far.
>>
>>> On 05 May 2015, at 15:26, Aljoscha Krettek <aljos...@apache.org> wrote:
>>>
>>> I don't know, I just put that there because other people are working
>>> on the checkpointing/barrier thing. So there would need to be some
>>> functionality there at some point.
>>>
>>> Or maybe it is not required there and can be handled in the
>>> StreamTask. Others might know this better than I do right now.
>>>
>>> On Tue, May 5, 2015 at 3:24 PM, Gyula Fóra <gyula.f...@gmail.com> wrote:
>>>> What would the processBarrier method do?
>>>>
>>>> On Tuesday, May 5, 2015, Aljoscha Krettek <aljos...@apache.org> wrote:
>>>>
>>>>> I'm using the term punctuation and watermark interchangeably here
>>>>> because for practical purposes they do the same thing. I'm not sure
>>>>> what you meant with your comment about those.
>>>>>
>>>>> For the Operator interface I'm thinking about something like this:
>>>>>
>>>>> abstract class OneInputStreamOperator<IN, OUT, F extends Function> {
>>>>> public processElement(IN element);
>>>>> public processBarrier(...);
>>>>> public processPunctuation/lowWatermark(...):
>>>>> }
>>>>>
>>>>> The operator also has access to the TaskContext and ExecutionConfig
>>>>> and Serializers. The operator would emit values using an emit() method
>>>>> or the Collector interface, not sure about that yet.
>>>>>
>>>>> On Tue, May 5, 2015 at 3:12 PM, Gyula Fóra <gyf...@apache.org
>>>>> <javascript:;>> wrote:
>>>>>> I think this a good idea in general. I would try to minimize the methods
>>>>> we
>>>>>> include and make the ones that we keep very concrete. For instance i
>>>>> would
>>>>>> not have the receive barrier method as that is handled on a totally
>>>>>> different level already. And instead of punctuation I would directly add
>>>>> a
>>>>>> method to work on watermarks.
>>>>>>
>>>>>> On Tuesday, May 5, 2015, Aljoscha Krettek <aljos...@apache.org
>>>>> <javascript:;>> wrote:
>>>>>>
>>>>>>> What do you mean by "losing iterations"?
>>>>>>>
>>>>>>> For the pros and cons:
>>>>>>>
>>>>>>> Cons: I can't think of any, since most of the operators are chainable
>>>>>>> already and already behave like a collector.
>>>>>>>
>>>>>>> Pros:
>>>>>>> - Unified model for operators, chainable operators don't have to
>>>>>>> worry about input iterators and the collect interface.
>>>>>>> - Enables features that we want in the future, such as barriers and
>>>>>>> punctuations because they don't work with the
>>>>>>> simple Collector interface.
>>>>>>> - The while-loop is moved outside of the operators, now the Task (the
>>>>>>> thing that runs Operators) can control the flow of data better and
>>>>>>> deal with
>>>>>>> stuff like barriers and punctuations. If we want to keep the
>>>>>>> main-loop inside each operator, then they all have to manage input
>>>>>>> readers and inline events manually.
>>>>>>>
>>>>>>> On Tue, May 5, 2015 at 2:41 PM, Kostas Tzoumas <ktzou...@apache.org
>>>>> <javascript:;>
>>>>>>> <javascript:;>> wrote:
>>>>>>>> Can you give us a rough idea of the pros and cons? Do we lose some
>>>>>>>> functionality by getting rid of iterations?
>>>>>>>>
>>>>>>>> Kostas
>>>>>>>>
>>>>>>>> On Tue, May 5, 2015 at 1:37 PM, Aljoscha Krettek <aljos...@apache.org
>>>>> <javascript:;>
>>>>>>> <javascript:;>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Folks,
>>>>>>>>> while working on introducing source-assigned timestamps into
>>>>> streaming
>>>>>>>>> (https://issues.apache.org/jira/browse/FLINK-1967) I thought about
>>>>> how
>>>>>>>>> the punctuations (low watermarks) can be pushed through the system.
>>>>>>>>> The problem is, that operators can have two ways of getting input: 1.
>>>>>>>>> They read directly from input iterators, and 2. They act as a
>>>>>>>>> Collector and get elements via collect() from the previous operator
>>>>> in
>>>>>>>>> a chain.
>>>>>>>>>
>>>>>>>>> This makes it hard to push things through a chain that are not
>>>>>>>>> elements, such as barriers and/or punctuations.
>>>>>>>>>
>>>>>>>>> I propose to change all streaming operators to be push based, with a
>>>>>>>>> slightly improved interface: In addition to collect(), which I would
>>>>>>>>> call receiveElement() I would add receivePunctuation() and
>>>>>>>>> receiveBarrier(). The first operator in the chain would also get data
>>>>>>>>> from the outside invokable that reads from the input iterator and
>>>>>>>>> calls receiveElement() for the first operator in a chain.
>>>>>>>>>
>>>>>>>>> What do you think? I would of course be willing to implement this
>>>>>>> myself.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Aljoscha
>>>>>>>>>
>>>>>>>
>>>>>
>>