During implementing n-ary input operator in table, please keep
this pattern in mind:

Build1 ---+

          |

          +---> HshJoin1 --—> HashAgg ---+

          |                              |

Probe1 ---+                              +---> HashJoin2

                                         |

                               Build2 ---+

It's quite interesting that both `Build1`, `Build2` and `Probe1` can
be read simultaneously. But we need to control `HashAgg`'s output
before `Build2` finished. I don't have a clear solution for now, but
it's a common pattern we will face.

Best,
Kurt


On Thu, Dec 5, 2019 at 4:37 PM Jingsong Li <jingsongl...@gmail.com> wrote:

> Hi Piotr,
>
> > a) two input operator X -> one input operator Y -> one input operator Z
> (ALLOWED)
> > b) n input operator X -> one input operator Y -> one input operator Z
> (ALLOWED)
> > c) two input operator X -> one input operator Y -> two input operator Z
> (NOT ALLOWED as a single chain)
>
> NOT ALLOWED to c) sounds good to me. I understand that it is very difficult
> to propose a general support for any input selectable two input operators
> chain with high performance.
> And it is not necessary for table layer too. b) has already excited us.
>
> Actually, we have supported n output chain too:
> d) one/two/n op X -> one op Y -> one op A1 -> one op B1 -> one op C1
>                                                  -> one op A2 -> one op B2
> -> one op C2
> d) is a very useful feature too.
>
> > Do you mean that those Table API/SQL use cases (HashJoin/SortMergeJoin)
> could be easily handled by a single N-Ary Stream Operator, so this would be
> covered by steps 1. and 2. from my plan from my previous e-mail? That would
> be real nice (avoiding the input selection chaining).
>
> Yes, because in the table layer, the typical scenarios currently only have
> static order. (We don't consider MergeJoin here, because it's too complex
> to be optimized, and not deserved to be optimized at present.).
> For example, the current TwoInputOperators: HashJoin and NestedLoopJoin.
> They are all static reading order. We must read the build input before we
> can read the probe input.
> So after we analyze chain, we put all the operators that can chain into a N
> input operator, We can analyze the static order required by this operator,
> and divide the reading order into several levels:
> - fist level: input4, input5, input1
> - second level: input2, input6
> - third level: input1, input7
> Note that these analyses are at the compile time of the client.
> At runtime, we just need to read in a fixed order.
>
> Best,
> Jingsong Lee
>
> On Wed, Dec 4, 2019 at 10:15 PM Piotr Nowojski <pi...@ververica.com>
> wrote:
>
> > Hi Jingsong,
> >
> > Thanks for the feedback :)
> >
> > Could you clarify a little bit what do you mean by your wished use cases?
> >
> > > There are a large number jobs (in production environment) that their
> > > TwoInputOperators that can be chained. We used to only watch the last
> > > ten tasks transmit data through disk and network, which could have been
> > >  done in one task.
> > > For performance, if we can chain them, the average is 30%+, and there
> > >  is an order of magnitude in extreme cases.
> >
> > As I mentioned at the end, I would like to avoid/post pone chaining of
> > multiple/two input operators one after another because of the complexity
> of
> > input selection. For the first version I would like to aim only to allow
> > chaining the single input operators with something (2 or N input must be
> > always head of the chain) . For example chains:
> >
> > a) two input operator X -> one input operator Y -> one input operator Z
> > (ALLOWED)
> > b) n input operator X -> one input operator Y -> one input operator Z
> > (ALLOWED)
> > c) two input operator X -> one input operator Y -> two input operator Z
> > (NOT ALLOWED as a single chain)
> >
> > The example above sounds to me like c)
> >
> > I think as a follow up, we could allow c), by extend chaining to a simple
> > rule: there can only be a single input selectable operator in the chain
> > (again, it’s the chaining of multiple input selectable operators that’s
> > causing some problems).
> >
> > > The table layer has many special features. which give us the chance to
> > optimize
> > >  it, but also results that it is hard to let underlying layer to
> provide
> > an abstract
> > > mechanism to implement it. For example:
> > > - HashJoin must read all the data on one side(build side) and then read
> > the
> > > other side (probe side).
> > > - HashJoin only emit data when read probe side.
> > > - SortMergeJoin read random, but if we have SortMergeJoin chain another
> > >  MergeJoin(Sort attribute re-use), that make things complicated.
> > > - HashAggregate/Sort only emit data in endInput.
> > >
> > > Provide an N-Ary stream operator to make everything possible. The upper
> > >  layer can do anything. These things can be specific optimization,
> which
> > is much
> > >  more natural than the lower layer.
> >
> > Do you mean that those Table API/SQL use cases (HashJoin/SortMergeJoin)
> > could be easily handled by a single N-Ary Stream Operator, so this would
> be
> > covered by steps 1. and 2. from my plan from my previous e-mail? That
> would
> > be real nice (avoiding the input selection chaining).
> >
> > Piotrek
> >
> > > On 4 Dec 2019, at 14:29, Jingsong Li <jingsongl...@gmail.com> wrote:
> > >
> > > Hi Piotr,
> > >
> > > Huge +1 for N-Ary Stream Operator.
> > > And I love this Golden Shovel award very much!
> > >
> > > There are a large number jobs (in production environment) that their
> > > TwoInputOperators that can be chained. We used to only watch the last
> > > ten tasks transmit data through disk and network, which could have been
> > >  done in one task.
> > > For performance, if we can chain them, the average is 30%+, and there
> > >  is an order of magnitude in extreme cases.
> > >
> > > The table layer has many special features. which give us the chance to
> > optimize
> > >  it, but also results that it is hard to let underlying layer to
> provide
> > an abstract
> > > mechanism to implement it. For example:
> > > - HashJoin must read all the data on one side(build side) and then read
> > the
> > > other side (probe side).
> > > - HashJoin only emit data when read probe side.
> > > - SortMergeJoin read random, but if we have SortMergeJoin chain another
> > >  MergeJoin(Sort attribute re-use), that make things complicated.
> > > - HashAggregate/Sort only emit data in endInput.
> > >
> > > Provide an N-Ary stream operator to make everything possible. The upper
> > >  layer can do anything. These things can be specific optimization,
> which
> > is much
> > >  more natural than the lower layer.
> > >
> > > In addition to the two optimizations you mentioned, it also gives more
> > space to
> > >  eliminate virtual function calls:
> > > Because following this way, the table layer has to consider the
> operator
> > chain.
> > > And in the future, we can optimize a whole N-Ary stream operator to a
> > >  JIT-friendly operator. Without virtual function calls, JIT can play
> its
> > real strength.
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Wed, Dec 4, 2019 at 5:24 PM Piotr Nowojski <pi...@ververica.com
> > <mailto:pi...@ververica.com>> wrote:
> > > Hi,
> > >
> > > First and foremost I would like to nominate myself to the Golden Shovel
> > award for digging out this topic:
> > >
> > >
> > >
> > > Secondly, I would like to discuss coming back to this particular idea
> of
> > implementing N-Ary Stream Operator. This time motivation doesn’t come
> from
> > the Side Inputs, but to efficiently support multi joins in SQL, without
> > extra network exchanges. I’ve reviewed the design doc proposed by
> Aljoscha,
> > I quite like it and I think we could start from that.
> > >
> > > Specifically the end-goal is to allow for example Blink, to:
> > >
> > > I. Implement A* multi broadcast join - to have a single operator chain,
> > where probe table (source) is read locally (inside the task that’s is
> > actually doing the join), then joined with multiple other broadcasted
> > tables.
> > > II. Another example might be when we have 2 or more sources,
> > pre-partitioned on the same key. In that case we should also be able to
> > perform all of the table reading and the join inside a single Task.
> > >
> > > In order to achieve that, I would propose the following plan:
> > >
> > > 1. Implement N-Ary Stream Operator as proposed in the design doc below,
> > however with added support for the input selection [1].
> > >   - initially it can be just exposed via the `StreamTransformation`,
> > without direct access from the `DataStream API`
> > >
> > > 2. Allow it to be chained with sources (implemented using the FLIP-27
> > SourceReader [2])
> > >
> > > 3. Think about whether we need to support more complex chaining.
> Without
> > this point, motivating examples (I and II) could be implemented if all of
> > the joins/filtering/mappings are compiled/composed into a single N-Ary
> > Stream Operator (which could be chained with some other single input
> > operators at the tail). We could also think about supporting of chaining
> a
> > tree of for example TwoInputStreamOperators inside a single Task. However
> > I’m leaving this as a follow up, since in that case, it’s not so easy to
> > handle the `InputSelection` of multiple operators inside the tree.
> > >
> > > Piotrek
> > >
> > > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
> > <
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
> > >
> > > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> > <
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
> > >
> > >>> On 21. Apr 2016, at 17:09, Aljoscha Krettek <aljos...@apache.org
> > <mailto:aljos...@apache.org>> wrote:
> > >>>
> > >>> Hi,
> > >>> yes, I see operators of this style as very much an internal thing.
> You
> > are probably talking about use cases for OneInputOperator and
> > TwoInputOperator where users have a very specific need and require access
> > the the low-level details such as watermarks, state and timers to get
> stuff
> > done. Maybe we could have a wrapper for these so that users can still use
> > them but internally we wrap them in an N-Ary Operator.
> > >>>
> > >>> Cheers,
> > >>> Aljoscha
> > >>>
> > >>> On Thu, 21 Apr 2016 at 16:31 Gyula Fóra <gyf...@apache.org <mailto:
> > gyf...@apache.org>> wrote:
> > >>> Hey,
> > >>>
> > >>> Some initial feedback from side:
> > >>>
> > >>> I think this a very important problem to deal with as a lot of
> > applications
> > >>> depend on it. I like the proposed runtime model and that is probably
> > the
> > >>> good way to handle this task, it is very clean what is happening.
> > >>>
> > >>> My main concern is how to handle this from the API and UDFs. What you
> > >>> proposed seems like a very internal thing from the API perspective
> and
> > I
> > >>> would be against exposing it in the way you wrote in your example. We
> > >>> should make all effort to streamline this with the functional style
> > >>> operators in some way. (so in that sense the way broadcastsets are
> > handled
> > >>> is pretty nice) Maybe we could extend ds.connect() to many streams
> > >>>
> > >>> But in any case this is awesome initiative :)
> > >>>
> > >>> Cheers,
> > >>> Gyula
> > >>>
> > >>>
> > >>> Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org>>
> > ezt írta (időpont: 2016. ápr. 21.,
> > >>> Cs, 15:56):
> > >>>
> > >>>> Hi Team,
> > >>>> I'm currently thinking about how we can bring the broadcast
> > set/broadcast
> > >>>> input feature form the DataSet API to the DataStream API. I think
> this
> > >>>> would be a valuable addition since it would enable use cases that
> join
> > >>>> streams with constant (or slowly changing) side information.
> > >>>>
> > >>>> For this purpose, I think that we need to change the way we handle
> > stream
> > >>>> operators. The new model would have one unified operator that
> handles
> > all
> > >>>> cases and allows to add inputs after the operator was constructed,
> > thus
> > >>>> allowing the specification of broadcast inputs.
> > >>>>
> > >>>> I wrote up this preliminary document detailing the reason why we
> need
> > such
> > >>>> a new operator for broadcast inputs and also what the API of such an
> > >>>> operator would be. It also quickly touches on the required changes
> of
> > >>>> existing per-operation stream operations such as StreamMap:
> > >>>>
> > >>>>
> > >>>>
> >
> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
> > <
> >
> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
> > >
> > >>>>
> > >>>> Please have a look if you're interested. Feedback/insights are very
> > >>>> welcome. :-)
> > >>>>
> > >>>> Cheers,
> > >>>> Aljoscha
> > >>>>
> > >>
> > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> >
> >
>
> --
> Best, Jingsong Lee
>

Reply via email to