[ 
https://issues.apache.org/jira/browse/FLINK-11974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16819287#comment-16819287
 ] 

Piotr Nowojski edited comment on FLINK-11974 at 4/16/19 5:14 PM:
-----------------------------------------------------------------

I've done a little bit more research and simplified this to super simple micro 
benchmark. The answer seems to be that JVM is astonishingly bad at 
devirtualizing. To avoid it we would either need to code generate whole 
operator chain and collectors or to process records in some micro batches, but 
that's completely different story.

I agree that we need some kind of {{StreamOperatorFactory}}, and I talked with 
[~StephanEwen], [~srichter] and [~till.rohrmann]. Our consensus was that this 
proposed {{StreamOperatorSubstitutor}} is not the idea solution 
({{StreamOperatorSubstitutor}} is basically an {{StreamOperatorFactory}} that 
was merged with {{StreamOperator}} interface). Apparently my previously 
mentioned idea of introducing proper {{StreamOperatorFactory}} was on some TODO 
list (with future ideas) after all - we wanted to introduce some intermediate 
operator representation anyway. Adding {{StreamOperatorSubstitutor}} would be 
unfortunately a wasted effort from this perspective that would complicate our 
operator APIs and would add more code to rewrite in the future. 

We are proposing to connect those two efforts (performance improvement from 
this ticket with {{StreamOperatorFactory}}). There are couple of other things 
that we could improve along the way, so I think it would be best to sync up and 
discuss all of this. What do you think?

> Are we going to change the existing codes, .e.g. how DataStream API transfer 
> stream operators to runtime?
> If not, we must find a solution to let OperatorFactory co-exist with current 
> path, this will involve more thinking and design.

I think that we should allow for {{StreamOperatorFactory}} to co-exist with 
{{StreamOperator}}, otherwise that would require huge amount of changes. After 
a quick research it looks like it could be relatively easily do-able to either 
add {{StreamFactoryTransformation}} (parallel and co-existing with current 
{{StreamTransformation}}) or just wrap all operators passed to existing 
{{StreamTransformation}} into a simple/dummy {{StreamOperatorFactory}}. 

There are couple of other things that we could do, like we could drop current 
{{StreamOperator#setup(StreamTask<?, ?> containingTask, StreamConfig config, 
Output<StreamRecord<OUT>> output)}} method in favour of 
{{StreamOperatorFactory#create(StreamTask<?, ?> containingTask, StreamConfig 
config, Output<StreamRecord<OUT>> output)}}. This would allow us to make those 
parameters {{final}} in the new {{StreamOperator}} implementations.


was (Author: pnowojski):
I've done a little bit more research and simplified this to super simple micro 
benchmark. The answer seems to be that JVM is astonishingly bad at 
devirtualizing. To avoid it we would either need to code generate whole 
operator chain and collectors or to process records in some micro batches, but 
that's completely different story.

I agree that we need some kind of {{OperatorFactory}}, and I talked with 
[~StephanEwen], [~srichter] and [~till.rohrmann]. Our consensus was that this 
proposed {{StreamOperatorSubstitutor}} is not the idea solution 
({{StreamOperatorSubstitutor}} is basically an {{OperatorFactory}} that was 
merged with {{Operator}} interface). Apparently my previously mentioned idea of 
introducing proper {{OperatorFactory}} was on some TODO list (with future 
ideas) after all - we wanted to introduce some intermediate operator 
representation anyway. Adding {{StreamOperatorSubstitutor}} would be 
unfortunately a wasted effort from this perspective that would complicate our 
operator APIs and would add more code to rewrite in the future. 

We are proposing to connect those two efforts (performance improvement from 
this ticket with {{OperatorFactory}}). There are couple of other things that we 
could improve along the way, so I think it would be best to sync up and discuss 
all of this. What do you think?

> Are we going to change the existing codes, .e.g. how DataStream API transfer 
> stream operators to runtime?
> If not, we must find a solution to let OperatorFactory co-exist with current 
> path, this will involve more thinking and design.

I think that we should allow for {{OperatorFactory}} to co-exist with 
{{Operator}}, otherwise that would require huge amount of changes. After a 
quick research it looks like it could be relatively easily do-able to either 
add {{StreamFactoryTransformation}} (parallel and co-existing with current 
{{StreamTransformation}}) or just wrap all operators passed to existing 
{{StreamTransformation}} into a simple/dummy {{StreamOperatorFactory}}. There 
are couple of other things that we could do, like we could drop current 
{{StreamOperator#setup(StreamTask<?, ?> containingTask, StreamConfig config, 
Output<StreamRecord<OUT>> output)}} method in favour of 
{{StreamOperatorFactory#create(StreamTask<?, ?> containingTask, StreamConfig 
config, Output<StreamRecord<OUT>> output)}}. This would allow us to make those 
parameters {{final}} in the new {{StreamOperator}} implementations.

> Introduce StreamOperatorSubstitutor to help table perform the whole Operator 
> CodeGen
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-11974
>                 URL: https://issues.apache.org/jira/browse/FLINK-11974
>             Project: Flink
>          Issue Type: New Feature
>          Components: Runtime / Operators
>            Reporter: Jingsong Lee
>            Assignee: Jingsong Lee
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> If we need CodeGen an entire Operator, one possible solution is to introduce 
> an OperatorWrapper, then generate a CodeGen sub-Operator in OperatorWrapper's 
> open, and then proxy all methods to the sub-Operator.
> Doing so results in multiple virtual function calls, so we introduce a 
> StreamOperatorSubstitutor:
> {code:java}
> /**
>  * Basic interface for stream operator substitutes. It is transferred to the 
> streamTask by
>  * serialization, and produce an actual stream operator to the streamTask, 
> who uses the actual
>  * stream operator to run.
>  *
>  * @param <OUT> output type of the actual stream operator
>  */
> public interface StreamOperatorSubstitutor<OUT> {
>    /**
>     * Produces the actual stream operator.
>     *
>     * @param userCodeClassLoader the user code class loader to use.
>     * @return the actual stream operator created on {@code StreamTask}.
>     */
>    StreamOperator<OUT> getActualStreamOperator(ClassLoader 
> userCodeClassLoader);
> }
> {code}
> In StreamConfig.getStreamOperator, we need:
> {code:java}
> if (operator != null && operator instanceof StreamOperatorSubstitutor) {
>    return (T) ((StreamOperatorSubstitutor) 
> operator).getActualStreamOperator(cl);
> } else {
>    return (T) operator;
> }
> {code}
> to get the real operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to