During implementing n-ary input operator in table, please keep this pattern in mind:
Build1 ---+ | +---> HshJoin1 --—> HashAgg ---+ | | Probe1 ---+ +---> HashJoin2 | Build2 ---+ It's quite interesting that both `Build1`, `Build2` and `Probe1` can be read simultaneously. But we need to control `HashAgg`'s output before `Build2` finished. I don't have a clear solution for now, but it's a common pattern we will face. Best, Kurt On Thu, Dec 5, 2019 at 4:37 PM Jingsong Li <jingsongl...@gmail.com> wrote: > Hi Piotr, > > > a) two input operator X -> one input operator Y -> one input operator Z > (ALLOWED) > > b) n input operator X -> one input operator Y -> one input operator Z > (ALLOWED) > > c) two input operator X -> one input operator Y -> two input operator Z > (NOT ALLOWED as a single chain) > > NOT ALLOWED to c) sounds good to me. I understand that it is very difficult > to propose a general support for any input selectable two input operators > chain with high performance. > And it is not necessary for table layer too. b) has already excited us. > > Actually, we have supported n output chain too: > d) one/two/n op X -> one op Y -> one op A1 -> one op B1 -> one op C1 > -> one op A2 -> one op B2 > -> one op C2 > d) is a very useful feature too. > > > Do you mean that those Table API/SQL use cases (HashJoin/SortMergeJoin) > could be easily handled by a single N-Ary Stream Operator, so this would be > covered by steps 1. and 2. from my plan from my previous e-mail? That would > be real nice (avoiding the input selection chaining). > > Yes, because in the table layer, the typical scenarios currently only have > static order. (We don't consider MergeJoin here, because it's too complex > to be optimized, and not deserved to be optimized at present.). > For example, the current TwoInputOperators: HashJoin and NestedLoopJoin. > They are all static reading order. We must read the build input before we > can read the probe input. > So after we analyze chain, we put all the operators that can chain into a N > input operator, We can analyze the static order required by this operator, > and divide the reading order into several levels: > - fist level: input4, input5, input1 > - second level: input2, input6 > - third level: input1, input7 > Note that these analyses are at the compile time of the client. > At runtime, we just need to read in a fixed order. > > Best, > Jingsong Lee > > On Wed, Dec 4, 2019 at 10:15 PM Piotr Nowojski <pi...@ververica.com> > wrote: > > > Hi Jingsong, > > > > Thanks for the feedback :) > > > > Could you clarify a little bit what do you mean by your wished use cases? > > > > > There are a large number jobs (in production environment) that their > > > TwoInputOperators that can be chained. We used to only watch the last > > > ten tasks transmit data through disk and network, which could have been > > > done in one task. > > > For performance, if we can chain them, the average is 30%+, and there > > > is an order of magnitude in extreme cases. > > > > As I mentioned at the end, I would like to avoid/post pone chaining of > > multiple/two input operators one after another because of the complexity > of > > input selection. For the first version I would like to aim only to allow > > chaining the single input operators with something (2 or N input must be > > always head of the chain) . For example chains: > > > > a) two input operator X -> one input operator Y -> one input operator Z > > (ALLOWED) > > b) n input operator X -> one input operator Y -> one input operator Z > > (ALLOWED) > > c) two input operator X -> one input operator Y -> two input operator Z > > (NOT ALLOWED as a single chain) > > > > The example above sounds to me like c) > > > > I think as a follow up, we could allow c), by extend chaining to a simple > > rule: there can only be a single input selectable operator in the chain > > (again, it’s the chaining of multiple input selectable operators that’s > > causing some problems). > > > > > The table layer has many special features. which give us the chance to > > optimize > > > it, but also results that it is hard to let underlying layer to > provide > > an abstract > > > mechanism to implement it. For example: > > > - HashJoin must read all the data on one side(build side) and then read > > the > > > other side (probe side). > > > - HashJoin only emit data when read probe side. > > > - SortMergeJoin read random, but if we have SortMergeJoin chain another > > > MergeJoin(Sort attribute re-use), that make things complicated. > > > - HashAggregate/Sort only emit data in endInput. > > > > > > Provide an N-Ary stream operator to make everything possible. The upper > > > layer can do anything. These things can be specific optimization, > which > > is much > > > more natural than the lower layer. > > > > Do you mean that those Table API/SQL use cases (HashJoin/SortMergeJoin) > > could be easily handled by a single N-Ary Stream Operator, so this would > be > > covered by steps 1. and 2. from my plan from my previous e-mail? That > would > > be real nice (avoiding the input selection chaining). > > > > Piotrek > > > > > On 4 Dec 2019, at 14:29, Jingsong Li <jingsongl...@gmail.com> wrote: > > > > > > Hi Piotr, > > > > > > Huge +1 for N-Ary Stream Operator. > > > And I love this Golden Shovel award very much! > > > > > > There are a large number jobs (in production environment) that their > > > TwoInputOperators that can be chained. We used to only watch the last > > > ten tasks transmit data through disk and network, which could have been > > > done in one task. > > > For performance, if we can chain them, the average is 30%+, and there > > > is an order of magnitude in extreme cases. > > > > > > The table layer has many special features. which give us the chance to > > optimize > > > it, but also results that it is hard to let underlying layer to > provide > > an abstract > > > mechanism to implement it. For example: > > > - HashJoin must read all the data on one side(build side) and then read > > the > > > other side (probe side). > > > - HashJoin only emit data when read probe side. > > > - SortMergeJoin read random, but if we have SortMergeJoin chain another > > > MergeJoin(Sort attribute re-use), that make things complicated. > > > - HashAggregate/Sort only emit data in endInput. > > > > > > Provide an N-Ary stream operator to make everything possible. The upper > > > layer can do anything. These things can be specific optimization, > which > > is much > > > more natural than the lower layer. > > > > > > In addition to the two optimizations you mentioned, it also gives more > > space to > > > eliminate virtual function calls: > > > Because following this way, the table layer has to consider the > operator > > chain. > > > And in the future, we can optimize a whole N-Ary stream operator to a > > > JIT-friendly operator. Without virtual function calls, JIT can play > its > > real strength. > > > > > > Best, > > > Jingsong Lee > > > > > > On Wed, Dec 4, 2019 at 5:24 PM Piotr Nowojski <pi...@ververica.com > > <mailto:pi...@ververica.com>> wrote: > > > Hi, > > > > > > First and foremost I would like to nominate myself to the Golden Shovel > > award for digging out this topic: > > > > > > > > > > > > Secondly, I would like to discuss coming back to this particular idea > of > > implementing N-Ary Stream Operator. This time motivation doesn’t come > from > > the Side Inputs, but to efficiently support multi joins in SQL, without > > extra network exchanges. I’ve reviewed the design doc proposed by > Aljoscha, > > I quite like it and I think we could start from that. > > > > > > Specifically the end-goal is to allow for example Blink, to: > > > > > > I. Implement A* multi broadcast join - to have a single operator chain, > > where probe table (source) is read locally (inside the task that’s is > > actually doing the join), then joined with multiple other broadcasted > > tables. > > > II. Another example might be when we have 2 or more sources, > > pre-partitioned on the same key. In that case we should also be able to > > perform all of the table reading and the join inside a single Task. > > > > > > In order to achieve that, I would propose the following plan: > > > > > > 1. Implement N-Ary Stream Operator as proposed in the design doc below, > > however with added support for the input selection [1]. > > > - initially it can be just exposed via the `StreamTransformation`, > > without direct access from the `DataStream API` > > > > > > 2. Allow it to be chained with sources (implemented using the FLIP-27 > > SourceReader [2]) > > > > > > 3. Think about whether we need to support more complex chaining. > Without > > this point, motivating examples (I and II) could be implemented if all of > > the joins/filtering/mappings are compiled/composed into a single N-Ary > > Stream Operator (which could be chained with some other single input > > operators at the tail). We could also think about supporting of chaining > a > > tree of for example TwoInputStreamOperators inside a single Task. However > > I’m leaving this as a follow up, since in that case, it’s not so easy to > > handle the `InputSelection` of multiple operators inside the tree. > > > > > > Piotrek > > > > > > [1] > > > https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html > > < > > > https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html > > > > > > [2] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > > < > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface > > > > > >>> On 21. Apr 2016, at 17:09, Aljoscha Krettek <aljos...@apache.org > > <mailto:aljos...@apache.org>> wrote: > > >>> > > >>> Hi, > > >>> yes, I see operators of this style as very much an internal thing. > You > > are probably talking about use cases for OneInputOperator and > > TwoInputOperator where users have a very specific need and require access > > the the low-level details such as watermarks, state and timers to get > stuff > > done. Maybe we could have a wrapper for these so that users can still use > > them but internally we wrap them in an N-Ary Operator. > > >>> > > >>> Cheers, > > >>> Aljoscha > > >>> > > >>> On Thu, 21 Apr 2016 at 16:31 Gyula Fóra <gyf...@apache.org <mailto: > > gyf...@apache.org>> wrote: > > >>> Hey, > > >>> > > >>> Some initial feedback from side: > > >>> > > >>> I think this a very important problem to deal with as a lot of > > applications > > >>> depend on it. I like the proposed runtime model and that is probably > > the > > >>> good way to handle this task, it is very clean what is happening. > > >>> > > >>> My main concern is how to handle this from the API and UDFs. What you > > >>> proposed seems like a very internal thing from the API perspective > and > > I > > >>> would be against exposing it in the way you wrote in your example. We > > >>> should make all effort to streamline this with the functional style > > >>> operators in some way. (so in that sense the way broadcastsets are > > handled > > >>> is pretty nice) Maybe we could extend ds.connect() to many streams > > >>> > > >>> But in any case this is awesome initiative :) > > >>> > > >>> Cheers, > > >>> Gyula > > >>> > > >>> > > >>> Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org>> > > ezt írta (időpont: 2016. ápr. 21., > > >>> Cs, 15:56): > > >>> > > >>>> Hi Team, > > >>>> I'm currently thinking about how we can bring the broadcast > > set/broadcast > > >>>> input feature form the DataSet API to the DataStream API. I think > this > > >>>> would be a valuable addition since it would enable use cases that > join > > >>>> streams with constant (or slowly changing) side information. > > >>>> > > >>>> For this purpose, I think that we need to change the way we handle > > stream > > >>>> operators. The new model would have one unified operator that > handles > > all > > >>>> cases and allows to add inputs after the operator was constructed, > > thus > > >>>> allowing the specification of broadcast inputs. > > >>>> > > >>>> I wrote up this preliminary document detailing the reason why we > need > > such > > >>>> a new operator for broadcast inputs and also what the API of such an > > >>>> operator would be. It also quickly touches on the required changes > of > > >>>> existing per-operation stream operations such as StreamMap: > > >>>> > > >>>> > > >>>> > > > https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing > > < > > > https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing > > > > > >>>> > > >>>> Please have a look if you're interested. Feedback/insights are very > > >>>> welcome. :-) > > >>>> > > >>>> Cheers, > > >>>> Aljoscha > > >>>> > > >> > > > > > > > > > > > > -- > > > Best, Jingsong Lee > > > > > > -- > Best, Jingsong Lee >