2019.2.10

Hi,Stephan


Thank you very much for such detailed and constructive comments.


*binary vs. n-ary* and *enum vs. integer*


Considering the N-ary, as you mentioned, using integers may be a better
choice.


*generic selectable interface*


You are right. This interface can be removed.


*end-input*

It is true that the Operator does not need to store the end-input state,
which can be inferred by the system and notify the Operator at the right
time. We can consider using this mechanism when the system can checkpoint
the topology with the Finish Tasks.


*early-out*

It is reasonable for me not to consider this situation at present.


*distributed stream deadlocks*


At present, there is no deadlock for the streaming, but I think it might
be  still necessary to do some validation(Warning or Reject) in JobGraph.
Because once Flink introduces this TwoInputSelectable interface, the user
of the streaming would also construct a diamond-style topology that may be
deadlocked.


*empty input / selection timeout*

It is reasonable for me not to consider this situation at present.


*timers*

When all the inputs are finished, TimeService will wait until all timers
are triggered. So there should be no problem. I and others guys are
confirming the details to see if there are other considerations


Best

GuoWei

Stephan Ewen <se...@apache.org> 于2019年2月8日周五 下午7:56写道:

> Nice design proposal, and +1 to the general idea.
>
> A few thoughts / suggestions:
>
> *binary vs. n-ary*
>
> I would plan ahead for N-ary operators. Not because we necessarily need
> n-ary inputs (one can probably build that purely in the API) but because of
> future side inputs. The proposal should be able to handle that as well.
>
> *enum vs. integer*
>
> The above might be easier is to realize when going directly with integer
> and having ANY, FIRST, SECOND, etc. as pre-defined constants.
> Performance wise, it is probably not difference whether to use int or enum.
>
> *generic selectable interface*
>
> From the proposal, I don't understand quite what that interface is for. My
> understanding is that the input processor or task that calls the
> operators's functions would anyways work on the TwoInputStreamOperator
> interface, for efficiency.
>
> *end-input*
>
> I think we should not make storing the end-input the operator's
> responsibility
> There is a simple way to handle this, which is also consistent with other
> aspects of handling finished tasks:
>
>   - If a task is finished, that should be stored in the checkpoint.
>  - Upon restoring a finished task, if it has still running successors, we
> deploy a "finished input channel", which immediately send the "end of
> input" when task is started.
>  - the operator will hence set the end of input immediately again upon
>
> *early-out*
>
> Letting nextSelection() return “NONE” or “FINISHED" may be relevant for
> early-out cases, but I would remove this from the scope of this proposal.
> There are most likely other big changes involved, like communicating this
> to the upstream operators.
>
> *distributed stream deadlocks*
>
> We had this issue in the DataSet API. Earlier versions of the DataSet API
> made an analysis of the flow detecting dams and whether the pipeline
> breaking behavior in the flow would cause deadlocks, and introduce
> artificial pipeline breakers in response.
>
> The logic was really complicated and it took a while to become stable. We
> had several issues that certain user functions (like mapPartition) could
> either be pipelined or have a full dam (not possible to know for the
> system), so we had to insert artificial pipeline breakers in all paths.
>
> In the end we simply decided that in the case of a diamond-style flow, we
> make the point where the flow first forks as blocking shuffle. That was
> super simple, solved all issues, and has the additional nice property that
> it great point to materialize data for recovery, because it helps both
> paths of the diamond upon failure.
>
> My suggestion:
> ==> For streaming, no problem so far, nothing to do
> ==> For batch, would suggest to go with the simple solution described above
> first, and improve when we see cases where this impacts performance
> significantly
>
> *empty input / selection timeout*
>
> I can see that being relevant in future streaming cases, for example with
> side inputs. You want to wait for the side input data, but with a timeout,
> so the program can still proceed with non-perfect context data in case that
> context data is very late.
>
> Because we do not support side inputs at the moment, we may want to defer
> this for now. Let's not over-design for problems that are not well
> understood at this point.
>
> *timers*
>
> I don't understand the problem with timers. Timers are bound to the
> operator, not the input, so they should still work if an input ends.
> There are cases where some state in the operator that is only relevant as
> long as an input still has data (like in symmetric joins) and the timers
> are relevant to that state.
> When the state is dropped, the timers should also be dropped, but that is
> the operator's logic on "endInput()". So there is no inherent issue between
> input and timers.
>
> Best,
> Stephan
>
>
> On Sat, Feb 2, 2019 at 3:55 AM Guowei Ma <guowei....@gmail.com> wrote:
>
> > Hi, guys:
> > I propose a design to enhance Stream Operator API for Batch’s
> requirements.
> > This is also the Flink’s goal that Batch is a special case of Streaming.
> > This
> > proposal mainly contains two changes to operator api:
> >
> > 1. Allow "StreamOperator" can choose which input to read;
> > 2. Notify "StreamOperator" that an input has ended.
> >
> >
> > This proposal was discussed with Piotr Nowojski, Kostas Kloudas, Haibo
> Sun
> > offlline.
> > It will be great to hear the feed backs and suggestions from the
> community.
> > Please kindly share your comments and suggestions.
> >
> > Best
> > GuoWei Ma.
> >
> >  Enhance Operator API to Support Dynamically Sel...
> > <
> >
> https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=drive_web
> > >
> >
>

Reply via email to