So I gather I should go forward with this? If no-one objects I will
open a Jira and work on this.

On Tue, May 5, 2015 at 4:14 PM, Aljoscha Krettek <aljos...@apache.org> wrote:
> Yes, because the handling of punctuations depends on the operator: A
> MapOperator can just forward them while a windowed join or reduce can
> only forward them after emitting the correct windows or results.
>
> On Tue, May 5, 2015 at 3:58 PM, Paris Carbone <par...@kth.se> wrote:
>> By watermark handling I meant making punctuations explicit and 
>> forwarding/modifying them at the operator level. I think this is clear so 
>> far.
>>> On 05 May 2015, at 15:41, Aljoscha Krettek <aljos...@apache.org> wrote:
>>>
>>> There is no watermark handling yet. :D
>>>
>>> But this would enable me to do this.
>>>
>>> On Tue, May 5, 2015 at 3:39 PM, Paris Carbone <par...@kth.se> wrote:
>>>> I agree with Gyula on this one. Barriers should better not be exposed to 
>>>> the operator. They are system events for state management. Apart from 
>>>> that, watermark handling seems to be on a right track, I like it so far.
>>>>
>>>>> On 05 May 2015, at 15:26, Aljoscha Krettek <aljos...@apache.org> wrote:
>>>>>
>>>>> I don't know, I just put that there because other people are working
>>>>> on the checkpointing/barrier thing. So there would need to be some
>>>>> functionality there at some point.
>>>>>
>>>>> Or maybe it is not required there and can be handled in the
>>>>> StreamTask. Others might know this better than I do right now.
>>>>>
>>>>> On Tue, May 5, 2015 at 3:24 PM, Gyula Fóra <gyula.f...@gmail.com> wrote:
>>>>>> What would the processBarrier method do?
>>>>>>
>>>>>> On Tuesday, May 5, 2015, Aljoscha Krettek <aljos...@apache.org> wrote:
>>>>>>
>>>>>>> I'm using the term punctuation and watermark interchangeably here
>>>>>>> because for practical purposes they do the same thing. I'm not sure
>>>>>>> what you meant with your comment about those.
>>>>>>>
>>>>>>> For the Operator interface I'm thinking about something like this:
>>>>>>>
>>>>>>> abstract class OneInputStreamOperator<IN, OUT, F extends Function>  {
>>>>>>>  public processElement(IN element);
>>>>>>>  public processBarrier(...);
>>>>>>>  public processPunctuation/lowWatermark(...):
>>>>>>> }
>>>>>>>
>>>>>>> The operator also has access to the TaskContext and ExecutionConfig
>>>>>>> and Serializers. The operator would emit values using an emit() method
>>>>>>> or the Collector interface, not sure about that yet.
>>>>>>>
>>>>>>> On Tue, May 5, 2015 at 3:12 PM, Gyula Fóra <gyf...@apache.org
>>>>>>> <javascript:;>> wrote:
>>>>>>>> I think this a good idea in general. I would try to minimize the 
>>>>>>>> methods
>>>>>>> we
>>>>>>>> include and make the ones that we keep very concrete. For instance i
>>>>>>> would
>>>>>>>> not have the receive barrier method as that is handled on a totally
>>>>>>>> different level already. And instead of punctuation I would directly 
>>>>>>>> add
>>>>>>> a
>>>>>>>> method to work on watermarks.
>>>>>>>>
>>>>>>>> On Tuesday, May 5, 2015, Aljoscha Krettek <aljos...@apache.org
>>>>>>> <javascript:;>> wrote:
>>>>>>>>
>>>>>>>>> What do you mean by "losing iterations"?
>>>>>>>>>
>>>>>>>>> For the pros and cons:
>>>>>>>>>
>>>>>>>>> Cons: I can't think of any, since most of the operators are chainable
>>>>>>>>> already and already behave like a collector.
>>>>>>>>>
>>>>>>>>> Pros:
>>>>>>>>> - Unified model for operators, chainable operators don't have to
>>>>>>>>> worry about input iterators and the collect interface.
>>>>>>>>> - Enables features that we want in the future, such as barriers and
>>>>>>>>> punctuations because they don't work with the
>>>>>>>>> simple Collector interface.
>>>>>>>>> - The while-loop is moved outside of the operators, now the Task (the
>>>>>>>>> thing that runs Operators) can control the flow of data better and
>>>>>>>>> deal with
>>>>>>>>> stuff like barriers and punctuations. If we want to keep the
>>>>>>>>> main-loop inside each operator, then they all have to manage input
>>>>>>>>> readers and inline events manually.
>>>>>>>>>
>>>>>>>>> On Tue, May 5, 2015 at 2:41 PM, Kostas Tzoumas <ktzou...@apache.org
>>>>>>> <javascript:;>
>>>>>>>>> <javascript:;>> wrote:
>>>>>>>>>> Can you give us a rough idea of the pros and cons? Do we lose some
>>>>>>>>>> functionality by getting rid of iterations?
>>>>>>>>>>
>>>>>>>>>> Kostas
>>>>>>>>>>
>>>>>>>>>> On Tue, May 5, 2015 at 1:37 PM, Aljoscha Krettek <aljos...@apache.org
>>>>>>> <javascript:;>
>>>>>>>>> <javascript:;>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Folks,
>>>>>>>>>>> while working on introducing source-assigned timestamps into
>>>>>>> streaming
>>>>>>>>>>> (https://issues.apache.org/jira/browse/FLINK-1967) I thought about
>>>>>>> how
>>>>>>>>>>> the punctuations (low watermarks) can be pushed through the system.
>>>>>>>>>>> The problem is, that operators can have two ways of getting input: 
>>>>>>>>>>> 1.
>>>>>>>>>>> They read directly from input iterators, and 2. They act as a
>>>>>>>>>>> Collector and get elements via collect() from the previous operator
>>>>>>> in
>>>>>>>>>>> a chain.
>>>>>>>>>>>
>>>>>>>>>>> This makes it hard to push things through a chain that are not
>>>>>>>>>>> elements, such as barriers and/or punctuations.
>>>>>>>>>>>
>>>>>>>>>>> I propose to change all streaming operators to be push based, with a
>>>>>>>>>>> slightly improved interface: In addition to collect(), which I would
>>>>>>>>>>> call receiveElement() I would add receivePunctuation() and
>>>>>>>>>>> receiveBarrier(). The first operator in the chain would also get 
>>>>>>>>>>> data
>>>>>>>>>>> from the outside invokable that reads from the input iterator and
>>>>>>>>>>> calls receiveElement() for the first operator in a chain.
>>>>>>>>>>>
>>>>>>>>>>> What do you think? I would of course be willing to implement this
>>>>>>>>> myself.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Aljoscha
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>
>>

Reply via email to