To move this forward, would suggest the following: - Let's quickly check which other classes need to change. I assume the TwoInputStreamTask and StreamTwoInputProcessor ? - Can those changes be new classes that are used when the new operator is used? The current TwoInputStreamTask and StreamTwoInputProcessor remain until they are fully subsumed and are then removed.
- Do we need and other refactorings before, like some cleanup of the Operator Config or the Operator Chain? Best, Stephan On Sun, Feb 10, 2019 at 7:25 AM Guowei Ma <guowei....@gmail.com> wrote: > 2019.2.10 > > > Hi,Stephan > > > Thank you very much for such detailed and constructive comments. > > > *binary vs. n-ary* and *enum vs. integer* > > > Considering the N-ary, as you mentioned, using integers may be a better > choice. > > > *generic selectable interface* > > > You are right. This interface can be removed. > > > *end-input* > > It is true that the Operator does not need to store the end-input state, > which can be inferred by the system and notify the Operator at the right > time. We can consider using this mechanism when the system can checkpoint > the topology with the Finish Tasks. > > > *early-out* > > It is reasonable for me not to consider this situation at present. > > > *distributed stream deadlocks* > > > At present, there is no deadlock for the streaming, but I think it might > be still necessary to do some validation(Warning or Reject) in JobGraph. > Because once Flink introduces this TwoInputSelectable interface, the user > of the streaming would also construct a diamond-style topology that may be > deadlocked. > > > *empty input / selection timeout* > > It is reasonable for me not to consider this situation at present. > > > *timers* > > When all the inputs are finished, TimeService will wait until all timers > are triggered. So there should be no problem. I and others guys are > confirming the details to see if there are other considerations > > > Best > > GuoWei > > Stephan Ewen <se...@apache.org> 于2019年2月8日周五 下午7:56写道: > > > Nice design proposal, and +1 to the general idea. > > > > A few thoughts / suggestions: > > > > *binary vs. n-ary* > > > > I would plan ahead for N-ary operators. Not because we necessarily need > > n-ary inputs (one can probably build that purely in the API) but because > of > > future side inputs. The proposal should be able to handle that as well. > > > > *enum vs. integer* > > > > The above might be easier is to realize when going directly with integer > > and having ANY, FIRST, SECOND, etc. as pre-defined constants. > > Performance wise, it is probably not difference whether to use int or > enum. > > > > *generic selectable interface* > > > > From the proposal, I don't understand quite what that interface is for. > My > > understanding is that the input processor or task that calls the > > operators's functions would anyways work on the TwoInputStreamOperator > > interface, for efficiency. > > > > *end-input* > > > > I think we should not make storing the end-input the operator's > > responsibility > > There is a simple way to handle this, which is also consistent with other > > aspects of handling finished tasks: > > > > - If a task is finished, that should be stored in the checkpoint. > > - Upon restoring a finished task, if it has still running successors, we > > deploy a "finished input channel", which immediately send the "end of > > input" when task is started. > > - the operator will hence set the end of input immediately again upon > > > > *early-out* > > > > Letting nextSelection() return “NONE” or “FINISHED" may be relevant for > > early-out cases, but I would remove this from the scope of this proposal. > > There are most likely other big changes involved, like communicating this > > to the upstream operators. > > > > *distributed stream deadlocks* > > > > We had this issue in the DataSet API. Earlier versions of the DataSet API > > made an analysis of the flow detecting dams and whether the pipeline > > breaking behavior in the flow would cause deadlocks, and introduce > > artificial pipeline breakers in response. > > > > The logic was really complicated and it took a while to become stable. We > > had several issues that certain user functions (like mapPartition) could > > either be pipelined or have a full dam (not possible to know for the > > system), so we had to insert artificial pipeline breakers in all paths. > > > > In the end we simply decided that in the case of a diamond-style flow, we > > make the point where the flow first forks as blocking shuffle. That was > > super simple, solved all issues, and has the additional nice property > that > > it great point to materialize data for recovery, because it helps both > > paths of the diamond upon failure. > > > > My suggestion: > > ==> For streaming, no problem so far, nothing to do > > ==> For batch, would suggest to go with the simple solution described > above > > first, and improve when we see cases where this impacts performance > > significantly > > > > *empty input / selection timeout* > > > > I can see that being relevant in future streaming cases, for example with > > side inputs. You want to wait for the side input data, but with a > timeout, > > so the program can still proceed with non-perfect context data in case > that > > context data is very late. > > > > Because we do not support side inputs at the moment, we may want to defer > > this for now. Let's not over-design for problems that are not well > > understood at this point. > > > > *timers* > > > > I don't understand the problem with timers. Timers are bound to the > > operator, not the input, so they should still work if an input ends. > > There are cases where some state in the operator that is only relevant as > > long as an input still has data (like in symmetric joins) and the timers > > are relevant to that state. > > When the state is dropped, the timers should also be dropped, but that is > > the operator's logic on "endInput()". So there is no inherent issue > between > > input and timers. > > > > Best, > > Stephan > > > > > > On Sat, Feb 2, 2019 at 3:55 AM Guowei Ma <guowei....@gmail.com> wrote: > > > > > Hi, guys: > > > I propose a design to enhance Stream Operator API for Batch’s > > requirements. > > > This is also the Flink’s goal that Batch is a special case of > Streaming. > > > This > > > proposal mainly contains two changes to operator api: > > > > > > 1. Allow "StreamOperator" can choose which input to read; > > > 2. Notify "StreamOperator" that an input has ended. > > > > > > > > > This proposal was discussed with Piotr Nowojski, Kostas Kloudas, Haibo > > Sun > > > offlline. > > > It will be great to hear the feed backs and suggestions from the > > community. > > > Please kindly share your comments and suggestions. > > > > > > Best > > > GuoWei Ma. > > > > > > Enhance Operator API to Support Dynamically Sel... > > > < > > > > > > https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=drive_web > > > > > > > > > >