There is already a Jira and a Pull Request:
https://github.com/apache/flink/pull/659

On Mon, May 11, 2015 at 6:29 PM, Stephan Ewen <se...@apache.org> wrote:
> Yep, I would say: Move ahead :-)
>
> On Tue, May 5, 2015 at 4:48 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> So I gather I should go forward with this? If no-one objects I will
>> open a Jira and work on this.
>>
>> On Tue, May 5, 2015 at 4:14 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>> > Yes, because the handling of punctuations depends on the operator: A
>> > MapOperator can just forward them while a windowed join or reduce can
>> > only forward them after emitting the correct windows or results.
>> >
>> > On Tue, May 5, 2015 at 3:58 PM, Paris Carbone <par...@kth.se> wrote:
>> >> By watermark handling I meant making punctuations explicit and
>> forwarding/modifying them at the operator level. I think this is clear so
>> far.
>> >>> On 05 May 2015, at 15:41, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>> >>>
>> >>> There is no watermark handling yet. :D
>> >>>
>> >>> But this would enable me to do this.
>> >>>
>> >>> On Tue, May 5, 2015 at 3:39 PM, Paris Carbone <par...@kth.se> wrote:
>> >>>> I agree with Gyula on this one. Barriers should better not be exposed
>> to the operator. They are system events for state management. Apart from
>> that, watermark handling seems to be on a right track, I like it so far.
>> >>>>
>> >>>>> On 05 May 2015, at 15:26, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>> >>>>>
>> >>>>> I don't know, I just put that there because other people are working
>> >>>>> on the checkpointing/barrier thing. So there would need to be some
>> >>>>> functionality there at some point.
>> >>>>>
>> >>>>> Or maybe it is not required there and can be handled in the
>> >>>>> StreamTask. Others might know this better than I do right now.
>> >>>>>
>> >>>>> On Tue, May 5, 2015 at 3:24 PM, Gyula Fóra <gyula.f...@gmail.com>
>> wrote:
>> >>>>>> What would the processBarrier method do?
>> >>>>>>
>> >>>>>> On Tuesday, May 5, 2015, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>> >>>>>>
>> >>>>>>> I'm using the term punctuation and watermark interchangeably here
>> >>>>>>> because for practical purposes they do the same thing. I'm not sure
>> >>>>>>> what you meant with your comment about those.
>> >>>>>>>
>> >>>>>>> For the Operator interface I'm thinking about something like this:
>> >>>>>>>
>> >>>>>>> abstract class OneInputStreamOperator<IN, OUT, F extends
>> Function>  {
>> >>>>>>>  public processElement(IN element);
>> >>>>>>>  public processBarrier(...);
>> >>>>>>>  public processPunctuation/lowWatermark(...):
>> >>>>>>> }
>> >>>>>>>
>> >>>>>>> The operator also has access to the TaskContext and ExecutionConfig
>> >>>>>>> and Serializers. The operator would emit values using an emit()
>> method
>> >>>>>>> or the Collector interface, not sure about that yet.
>> >>>>>>>
>> >>>>>>> On Tue, May 5, 2015 at 3:12 PM, Gyula Fóra <gyf...@apache.org
>> >>>>>>> <javascript:;>> wrote:
>> >>>>>>>> I think this a good idea in general. I would try to minimize the
>> methods
>> >>>>>>> we
>> >>>>>>>> include and make the ones that we keep very concrete. For
>> instance i
>> >>>>>>> would
>> >>>>>>>> not have the receive barrier method as that is handled on a
>> totally
>> >>>>>>>> different level already. And instead of punctuation I would
>> directly add
>> >>>>>>> a
>> >>>>>>>> method to work on watermarks.
>> >>>>>>>>
>> >>>>>>>> On Tuesday, May 5, 2015, Aljoscha Krettek <aljos...@apache.org
>> >>>>>>> <javascript:;>> wrote:
>> >>>>>>>>
>> >>>>>>>>> What do you mean by "losing iterations"?
>> >>>>>>>>>
>> >>>>>>>>> For the pros and cons:
>> >>>>>>>>>
>> >>>>>>>>> Cons: I can't think of any, since most of the operators are
>> chainable
>> >>>>>>>>> already and already behave like a collector.
>> >>>>>>>>>
>> >>>>>>>>> Pros:
>> >>>>>>>>> - Unified model for operators, chainable operators don't have to
>> >>>>>>>>> worry about input iterators and the collect interface.
>> >>>>>>>>> - Enables features that we want in the future, such as barriers
>> and
>> >>>>>>>>> punctuations because they don't work with the
>> >>>>>>>>> simple Collector interface.
>> >>>>>>>>> - The while-loop is moved outside of the operators, now the Task
>> (the
>> >>>>>>>>> thing that runs Operators) can control the flow of data better
>> and
>> >>>>>>>>> deal with
>> >>>>>>>>> stuff like barriers and punctuations. If we want to keep the
>> >>>>>>>>> main-loop inside each operator, then they all have to manage
>> input
>> >>>>>>>>> readers and inline events manually.
>> >>>>>>>>>
>> >>>>>>>>> On Tue, May 5, 2015 at 2:41 PM, Kostas Tzoumas <
>> ktzou...@apache.org
>> >>>>>>> <javascript:;>
>> >>>>>>>>> <javascript:;>> wrote:
>> >>>>>>>>>> Can you give us a rough idea of the pros and cons? Do we lose
>> some
>> >>>>>>>>>> functionality by getting rid of iterations?
>> >>>>>>>>>>
>> >>>>>>>>>> Kostas
>> >>>>>>>>>>
>> >>>>>>>>>> On Tue, May 5, 2015 at 1:37 PM, Aljoscha Krettek <
>> aljos...@apache.org
>> >>>>>>> <javascript:;>
>> >>>>>>>>> <javascript:;>>
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>>> Hi Folks,
>> >>>>>>>>>>> while working on introducing source-assigned timestamps into
>> >>>>>>> streaming
>> >>>>>>>>>>> (https://issues.apache.org/jira/browse/FLINK-1967) I thought
>> about
>> >>>>>>> how
>> >>>>>>>>>>> the punctuations (low watermarks) can be pushed through the
>> system.
>> >>>>>>>>>>> The problem is, that operators can have two ways of getting
>> input: 1.
>> >>>>>>>>>>> They read directly from input iterators, and 2. They act as a
>> >>>>>>>>>>> Collector and get elements via collect() from the previous
>> operator
>> >>>>>>> in
>> >>>>>>>>>>> a chain.
>> >>>>>>>>>>>
>> >>>>>>>>>>> This makes it hard to push things through a chain that are not
>> >>>>>>>>>>> elements, such as barriers and/or punctuations.
>> >>>>>>>>>>>
>> >>>>>>>>>>> I propose to change all streaming operators to be push based,
>> with a
>> >>>>>>>>>>> slightly improved interface: In addition to collect(), which I
>> would
>> >>>>>>>>>>> call receiveElement() I would add receivePunctuation() and
>> >>>>>>>>>>> receiveBarrier(). The first operator in the chain would also
>> get data
>> >>>>>>>>>>> from the outside invokable that reads from the input iterator
>> and
>> >>>>>>>>>>> calls receiveElement() for the first operator in a chain.
>> >>>>>>>>>>>
>> >>>>>>>>>>> What do you think? I would of course be willing to implement
>> this
>> >>>>>>>>> myself.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Cheers,
>> >>>>>>>>>>> Aljoscha
>> >>>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>
>> >>>>
>> >>
>>

Reply via email to