Yes, because the handling of punctuations depends on the operator: A
MapOperator can just forward them while a windowed join or reduce can
only forward them after emitting the correct windows or results.

On Tue, May 5, 2015 at 3:58 PM, Paris Carbone <par...@kth.se> wrote:
> By watermark handling I meant making punctuations explicit and 
> forwarding/modifying them at the operator level. I think this is clear so far.
>> On 05 May 2015, at 15:41, Aljoscha Krettek <aljos...@apache.org> wrote:
>>
>> There is no watermark handling yet. :D
>>
>> But this would enable me to do this.
>>
>> On Tue, May 5, 2015 at 3:39 PM, Paris Carbone <par...@kth.se> wrote:
>>> I agree with Gyula on this one. Barriers should better not be exposed to 
>>> the operator. They are system events for state management. Apart from that, 
>>> watermark handling seems to be on a right track, I like it so far.
>>>
>>>> On 05 May 2015, at 15:26, Aljoscha Krettek <aljos...@apache.org> wrote:
>>>>
>>>> I don't know, I just put that there because other people are working
>>>> on the checkpointing/barrier thing. So there would need to be some
>>>> functionality there at some point.
>>>>
>>>> Or maybe it is not required there and can be handled in the
>>>> StreamTask. Others might know this better than I do right now.
>>>>
>>>> On Tue, May 5, 2015 at 3:24 PM, Gyula Fóra <gyula.f...@gmail.com> wrote:
>>>>> What would the processBarrier method do?
>>>>>
>>>>> On Tuesday, May 5, 2015, Aljoscha Krettek <aljos...@apache.org> wrote:
>>>>>
>>>>>> I'm using the term punctuation and watermark interchangeably here
>>>>>> because for practical purposes they do the same thing. I'm not sure
>>>>>> what you meant with your comment about those.
>>>>>>
>>>>>> For the Operator interface I'm thinking about something like this:
>>>>>>
>>>>>> abstract class OneInputStreamOperator<IN, OUT, F extends Function>  {
>>>>>>  public processElement(IN element);
>>>>>>  public processBarrier(...);
>>>>>>  public processPunctuation/lowWatermark(...):
>>>>>> }
>>>>>>
>>>>>> The operator also has access to the TaskContext and ExecutionConfig
>>>>>> and Serializers. The operator would emit values using an emit() method
>>>>>> or the Collector interface, not sure about that yet.
>>>>>>
>>>>>> On Tue, May 5, 2015 at 3:12 PM, Gyula Fóra <gyf...@apache.org
>>>>>> <javascript:;>> wrote:
>>>>>>> I think this a good idea in general. I would try to minimize the methods
>>>>>> we
>>>>>>> include and make the ones that we keep very concrete. For instance i
>>>>>> would
>>>>>>> not have the receive barrier method as that is handled on a totally
>>>>>>> different level already. And instead of punctuation I would directly add
>>>>>> a
>>>>>>> method to work on watermarks.
>>>>>>>
>>>>>>> On Tuesday, May 5, 2015, Aljoscha Krettek <aljos...@apache.org
>>>>>> <javascript:;>> wrote:
>>>>>>>
>>>>>>>> What do you mean by "losing iterations"?
>>>>>>>>
>>>>>>>> For the pros and cons:
>>>>>>>>
>>>>>>>> Cons: I can't think of any, since most of the operators are chainable
>>>>>>>> already and already behave like a collector.
>>>>>>>>
>>>>>>>> Pros:
>>>>>>>> - Unified model for operators, chainable operators don't have to
>>>>>>>> worry about input iterators and the collect interface.
>>>>>>>> - Enables features that we want in the future, such as barriers and
>>>>>>>> punctuations because they don't work with the
>>>>>>>> simple Collector interface.
>>>>>>>> - The while-loop is moved outside of the operators, now the Task (the
>>>>>>>> thing that runs Operators) can control the flow of data better and
>>>>>>>> deal with
>>>>>>>> stuff like barriers and punctuations. If we want to keep the
>>>>>>>> main-loop inside each operator, then they all have to manage input
>>>>>>>> readers and inline events manually.
>>>>>>>>
>>>>>>>> On Tue, May 5, 2015 at 2:41 PM, Kostas Tzoumas <ktzou...@apache.org
>>>>>> <javascript:;>
>>>>>>>> <javascript:;>> wrote:
>>>>>>>>> Can you give us a rough idea of the pros and cons? Do we lose some
>>>>>>>>> functionality by getting rid of iterations?
>>>>>>>>>
>>>>>>>>> Kostas
>>>>>>>>>
>>>>>>>>> On Tue, May 5, 2015 at 1:37 PM, Aljoscha Krettek <aljos...@apache.org
>>>>>> <javascript:;>
>>>>>>>> <javascript:;>>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Folks,
>>>>>>>>>> while working on introducing source-assigned timestamps into
>>>>>> streaming
>>>>>>>>>> (https://issues.apache.org/jira/browse/FLINK-1967) I thought about
>>>>>> how
>>>>>>>>>> the punctuations (low watermarks) can be pushed through the system.
>>>>>>>>>> The problem is, that operators can have two ways of getting input: 1.
>>>>>>>>>> They read directly from input iterators, and 2. They act as a
>>>>>>>>>> Collector and get elements via collect() from the previous operator
>>>>>> in
>>>>>>>>>> a chain.
>>>>>>>>>>
>>>>>>>>>> This makes it hard to push things through a chain that are not
>>>>>>>>>> elements, such as barriers and/or punctuations.
>>>>>>>>>>
>>>>>>>>>> I propose to change all streaming operators to be push based, with a
>>>>>>>>>> slightly improved interface: In addition to collect(), which I would
>>>>>>>>>> call receiveElement() I would add receivePunctuation() and
>>>>>>>>>> receiveBarrier(). The first operator in the chain would also get data
>>>>>>>>>> from the outside invokable that reads from the input iterator and
>>>>>>>>>> calls receiveElement() for the first operator in a chain.
>>>>>>>>>>
>>>>>>>>>> What do you think? I would of course be willing to implement this
>>>>>>>> myself.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Aljoscha
>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>
>

Reply via email to