[ https://issues.apache.org/jira/browse/FLINK-11974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16819287#comment-16819287 ]
Piotr Nowojski edited comment on FLINK-11974 at 4/16/19 5:14 PM: ----------------------------------------------------------------- I've done a little bit more research and simplified this to super simple micro benchmark. The answer seems to be that JVM is astonishingly bad at devirtualizing. To avoid it we would either need to code generate whole operator chain and collectors or to process records in some micro batches, but that's completely different story. I agree that we need some kind of {{StreamOperatorFactory}}, and I talked with [~StephanEwen], [~srichter] and [~till.rohrmann]. Our consensus was that this proposed {{StreamOperatorSubstitutor}} is not the idea solution ({{StreamOperatorSubstitutor}} is basically an {{StreamOperatorFactory}} that was merged with {{StreamOperator}} interface). Apparently my previously mentioned idea of introducing proper {{StreamOperatorFactory}} was on some TODO list (with future ideas) after all - we wanted to introduce some intermediate operator representation anyway. Adding {{StreamOperatorSubstitutor}} would be unfortunately a wasted effort from this perspective that would complicate our operator APIs and would add more code to rewrite in the future. We are proposing to connect those two efforts (performance improvement from this ticket with {{StreamOperatorFactory}}). There are couple of other things that we could improve along the way, so I think it would be best to sync up and discuss all of this. What do you think? > Are we going to change the existing codes, .e.g. how DataStream API transfer > stream operators to runtime? > If not, we must find a solution to let OperatorFactory co-exist with current > path, this will involve more thinking and design. I think that we should allow for {{StreamOperatorFactory}} to co-exist with {{StreamOperator}}, otherwise that would require huge amount of changes. After a quick research it looks like it could be relatively easily do-able to either add {{StreamFactoryTransformation}} (parallel and co-existing with current {{StreamTransformation}}) or just wrap all operators passed to existing {{StreamTransformation}} into a simple/dummy {{StreamOperatorFactory}}. There are couple of other things that we could do, like we could drop current {{StreamOperator#setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output)}} method in favour of {{StreamOperatorFactory#create(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output)}}. This would allow us to make those parameters {{final}} in the new {{StreamOperator}} implementations. was (Author: pnowojski): I've done a little bit more research and simplified this to super simple micro benchmark. The answer seems to be that JVM is astonishingly bad at devirtualizing. To avoid it we would either need to code generate whole operator chain and collectors or to process records in some micro batches, but that's completely different story. I agree that we need some kind of {{OperatorFactory}}, and I talked with [~StephanEwen], [~srichter] and [~till.rohrmann]. Our consensus was that this proposed {{StreamOperatorSubstitutor}} is not the idea solution ({{StreamOperatorSubstitutor}} is basically an {{OperatorFactory}} that was merged with {{Operator}} interface). Apparently my previously mentioned idea of introducing proper {{OperatorFactory}} was on some TODO list (with future ideas) after all - we wanted to introduce some intermediate operator representation anyway. Adding {{StreamOperatorSubstitutor}} would be unfortunately a wasted effort from this perspective that would complicate our operator APIs and would add more code to rewrite in the future. We are proposing to connect those two efforts (performance improvement from this ticket with {{OperatorFactory}}). There are couple of other things that we could improve along the way, so I think it would be best to sync up and discuss all of this. What do you think? > Are we going to change the existing codes, .e.g. how DataStream API transfer > stream operators to runtime? > If not, we must find a solution to let OperatorFactory co-exist with current > path, this will involve more thinking and design. I think that we should allow for {{OperatorFactory}} to co-exist with {{Operator}}, otherwise that would require huge amount of changes. After a quick research it looks like it could be relatively easily do-able to either add {{StreamFactoryTransformation}} (parallel and co-existing with current {{StreamTransformation}}) or just wrap all operators passed to existing {{StreamTransformation}} into a simple/dummy {{StreamOperatorFactory}}. There are couple of other things that we could do, like we could drop current {{StreamOperator#setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output)}} method in favour of {{StreamOperatorFactory#create(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output)}}. This would allow us to make those parameters {{final}} in the new {{StreamOperator}} implementations. > Introduce StreamOperatorSubstitutor to help table perform the whole Operator > CodeGen > ------------------------------------------------------------------------------------ > > Key: FLINK-11974 > URL: https://issues.apache.org/jira/browse/FLINK-11974 > Project: Flink > Issue Type: New Feature > Components: Runtime / Operators > Reporter: Jingsong Lee > Assignee: Jingsong Lee > Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > If we need CodeGen an entire Operator, one possible solution is to introduce > an OperatorWrapper, then generate a CodeGen sub-Operator in OperatorWrapper's > open, and then proxy all methods to the sub-Operator. > Doing so results in multiple virtual function calls, so we introduce a > StreamOperatorSubstitutor: > {code:java} > /** > * Basic interface for stream operator substitutes. It is transferred to the > streamTask by > * serialization, and produce an actual stream operator to the streamTask, > who uses the actual > * stream operator to run. > * > * @param <OUT> output type of the actual stream operator > */ > public interface StreamOperatorSubstitutor<OUT> { > /** > * Produces the actual stream operator. > * > * @param userCodeClassLoader the user code class loader to use. > * @return the actual stream operator created on {@code StreamTask}. > */ > StreamOperator<OUT> getActualStreamOperator(ClassLoader > userCodeClassLoader); > } > {code} > In StreamConfig.getStreamOperator, we need: > {code:java} > if (operator != null && operator instanceof StreamOperatorSubstitutor) { > return (T) ((StreamOperatorSubstitutor) > operator).getActualStreamOperator(cl); > } else { > return (T) operator; > } > {code} > to get the real operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)