Hi Piotr,

Huge +1 for N-Ary Stream Operator.
And I love this Golden Shovel award very much!

There are a large number jobs (in production environment) that their
TwoInputOperators that can be chained. We used to only watch the last
ten tasks transmit data through disk and network, which could have been
 done in one task.
For performance, if we can chain them, the average is 30%+, and there
 is an order of magnitude in extreme cases.

The table layer has many special features. which give us the chance to
optimize
 it, but also results that it is hard to let underlying layer to provide an
abstract
mechanism to implement it. For example:
- HashJoin must read all the data on one side(build side) and then read the
other side (probe side).
- HashJoin only emit data when read probe side.
- SortMergeJoin read random, but if we have SortMergeJoin chain another
 MergeJoin(Sort attribute re-use), that make things complicated.
- HashAggregate/Sort only emit data in endInput.

Provide an N-Ary stream operator to make everything possible. The upper
 layer can do anything. These things can be specific optimization, which is
much
 more natural than the lower layer.

In addition to the two optimizations you mentioned, it also gives more
space to
 eliminate virtual function calls:
Because following this way, the table layer has to consider the operator
chain.
And in the future, we can optimize a whole N-Ary stream operator to a
 JIT-friendly operator. Without virtual function calls, JIT can play its
real strength.

Best,
Jingsong Lee

On Wed, Dec 4, 2019 at 5:24 PM Piotr Nowojski <pi...@ververica.com> wrote:

> Hi,
>
> First and foremost I would like to nominate myself to the Golden Shovel
> award for digging out this topic:
>
>
> Secondly, I would like to discuss coming back to this particular idea of
> implementing N-Ary Stream Operator. This time motivation doesn’t come from
> the Side Inputs, but to efficiently support multi joins in SQL, without
> extra network exchanges. I’ve reviewed the design doc proposed by Aljoscha,
> I quite like it and I think we could start from that.
>
> Specifically the end-goal is to allow for example Blink, to:
>
> I. Implement A* multi broadcast join - to have a single operator chain,
> where probe table (source) is read locally (inside the task that’s is
> actually doing the join), then joined with multiple other broadcasted
> tables.
> II. Another example might be when we have 2 or more sources,
> pre-partitioned on the same key. In that case we should also be able to
> perform all of the table reading and the join inside a single Task.
>
> In order to achieve that, I would propose the following plan:
>
> 1. Implement N-Ary Stream Operator as proposed in the design doc below,
> however with added support for the input selection [1].
>   - initially it can be just exposed via the `StreamTransformation`,
> without direct access from the `DataStream API`
>
> 2. Allow it to be chained with sources (implemented using the FLIP-27
> SourceReader [2])
>
> 3. Think about whether we need to support more complex chaining. Without
> this point, motivating examples (I and II) could be implemented if all of
> the joins/filtering/mappings are compiled/composed into a single N-Ary
> Stream Operator (which could be chained with some other single input
> operators at the tail). We could also think about supporting of chaining a
> tree of for example TwoInputStreamOperators inside a single Task. However
> I’m leaving this as a follow up, since in that case, it’s not so easy to
> handle the `InputSelection` of multiple operators inside the tree.
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface>
>
> On 21. Apr 2016, at 17:09, Aljoscha Krettek <aljos...@apache.org> wrote:
>
> Hi,
> yes, I see operators of this style as very much an internal thing. You are
> probably talking about use cases for OneInputOperator and TwoInputOperator
> where users have a very specific need and require access the the low-level
> details such as watermarks, state and timers to get stuff done. Maybe we
> could have a wrapper for these so that users can still use them but
> internally we wrap them in an N-Ary Operator.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 16:31 Gyula Fóra <gyf...@apache.org> wrote:
> Hey,
>
> Some initial feedback from side:
>
> I think this a very important problem to deal with as a lot of applications
> depend on it. I like the proposed runtime model and that is probably the
> good way to handle this task, it is very clean what is happening.
>
> My main concern is how to handle this from the API and UDFs. What you
> proposed seems like a very internal thing from the API perspective and I
> would be against exposing it in the way you wrote in your example. We
> should make all effort to streamline this with the functional style
> operators in some way. (so in that sense the way broadcastsets are handled
> is pretty nice) Maybe we could extend ds.connect() to many streams
>
> But in any case this is awesome initiative :)
>
> Cheers,
> Gyula
>
>
> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2016. ápr. 21.,
> Cs, 15:56):
>
> Hi Team,
> I'm currently thinking about how we can bring the broadcast set/broadcast
> input feature form the DataSet API to the DataStream API. I think this
> would be a valuable addition since it would enable use cases that join
> streams with constant (or slowly changing) side information.
>
> For this purpose, I think that we need to change the way we handle stream
> operators. The new model would have one unified operator that handles all
> cases and allows to add inputs after the operator was constructed, thus
> allowing the specification of broadcast inputs.
>
> I wrote up this preliminary document detailing the reason why we need such
> a new operator for broadcast inputs and also what the API of such an
> operator would be. It also quickly touches on the required changes of
> existing per-operation stream operations such as StreamMap:
>
>
>
> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
>
> Please have a look if you're interested. Feedback/insights are very
> welcome. :-)
>
> Cheers,
> Aljoscha
>
>
>
>

-- 
Best, Jingsong Lee

Reply via email to