[ https://issues.apache.org/jira/browse/FLINK-11974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815329#comment-16815329 ]
Jingsong Lee edited comment on FLINK-11974 at 4/11/19 11:22 AM: ---------------------------------------------------------------- [~ykt836] StreamOperator needs to be wrapped in StreamTransformation to form StreamGraph (See StreamGraphGenerator), and StreamTransformation needs to be a specific OneInputTransformation or TwoInputTransformation. So same to StreamOperator, we must implement OneInputStreamOperator. Unless we modify OneInputTransformation and TwoInputTransformation to let their operator be StreamOperator instead of XXXStreamOperator. was (Author: lzljs3620320): [~ykt836] StreamOperator needs to be wrapped in StreamTransformation to form StreamGraph (See StreamGraphGenerator), and StreamTransformation needs to be a specific OneInputTransformation or TwoInputTransformation. So same to StreamOperator, we must implement OneInputStreamOperator to avoid change a lot of logic of StreamGraphGenerator. > Introduce StreamOperatorSubstitutor to help table perform the whole Operator > CodeGen > ------------------------------------------------------------------------------------ > > Key: FLINK-11974 > URL: https://issues.apache.org/jira/browse/FLINK-11974 > Project: Flink > Issue Type: New Feature > Components: Runtime / Operators > Reporter: Jingsong Lee > Assignee: Jingsong Lee > Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > If we need CodeGen an entire Operator, one possible solution is to introduce > an OperatorWrapper, then generate a CodeGen sub-Operator in OperatorWrapper's > open, and then proxy all methods to the sub-Operator. > Doing so results in multiple virtual function calls, so we introduce a > StreamOperatorSubstitutor: > {code:java} > /** > * Basic interface for stream operator substitutes. It is transferred to the > streamTask by > * serialization, and produce an actual stream operator to the streamTask, > who uses the actual > * stream operator to run. > * > * @param <OUT> output type of the actual stream operator > */ > public interface StreamOperatorSubstitutor<OUT> { > /** > * Produces the actual stream operator. > * > * @param userCodeClassLoader the user code class loader to use. > * @return the actual stream operator created on {@code StreamTask}. > */ > StreamOperator<OUT> getActualStreamOperator(ClassLoader > userCodeClassLoader); > } > {code} > In StreamConfig.getStreamOperator, we need: > {code:java} > if (operator != null && operator instanceof StreamOperatorSubstitutor) { > return (T) ((StreamOperatorSubstitutor) > operator).getActualStreamOperator(cl); > } else { > return (T) operator; > } > {code} > to get the real operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)