[ https://issues.apache.org/jira/browse/FLINK-11974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16810900#comment-16810900 ]
Piotr Nowojski edited comment on FLINK-11974 at 4/5/19 2:47 PM: ---------------------------------------------------------------- Can we take a step back here? Maybe I'm just confused what are you trying to solve/achieve. Could you write down with more detail: 1. what is the problem 2. explain more about the rejected alternate solution(s) 3. provide benchmarks proving the desired result was (Author: pnowojski): Can we take a step back here? Maybe I'm just confused what are you trying to solve achieve. Could you write down with more detail: 1. what is the problem 2. explain more about the rejected alternate solution(s) 3. provide benchmarks proving the desired result > Introduce StreamOperatorSubstitutor to help table perform the whole Operator > CodeGen > ------------------------------------------------------------------------------------ > > Key: FLINK-11974 > URL: https://issues.apache.org/jira/browse/FLINK-11974 > Project: Flink > Issue Type: New Feature > Components: Runtime / Operators > Reporter: Jingsong Lee > Assignee: Jingsong Lee > Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > If we need CodeGen an entire Operator, one possible solution is to introduce > an OperatorWrapper, then generate a CodeGen sub-Operator in OperatorWrapper's > open, and then proxy all methods to the sub-Operator. > Doing so results in multiple virtual function calls, so we introduce a > StreamOperatorSubstitutor: > {code:java} > /** > * Basic interface for stream operator substitutes. It is transferred to the > streamTask by > * serialization, and produce an actual stream operator to the streamTask, > who uses the actual > * stream operator to run. > * > * @param <OUT> output type of the actual stream operator > */ > public interface StreamOperatorSubstitutor<OUT> { > /** > * Produces the actual stream operator. > * > * @param userCodeClassLoader the user code class loader to use. > * @return the actual stream operator created on {@code StreamTask}. > */ > StreamOperator<OUT> getActualStreamOperator(ClassLoader > userCodeClassLoader); > } > {code} > In StreamConfig.getStreamOperator, we need: > {code:java} > if (operator != null && operator instanceof StreamOperatorSubstitutor) { > return (T) ((StreamOperatorSubstitutor) > operator).getActualStreamOperator(cl); > } else { > return (T) operator; > } > {code} > to get the real operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)