[ https://issues.apache.org/jira/browse/FLINK-32603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Weijie Guo updated FLINK-32603: ------------------------------- Fix Version/s: 2.1.0 (was: 2.0.0) > Avoid consuming twice when two pipelines have the same table source > ------------------------------------------------------------------- > > Key: FLINK-32603 > URL: https://issues.apache.org/jira/browse/FLINK-32603 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner > Affects Versions: 2.1.0 > Reporter: Jiang Xin > Priority: Major > Labels: pull-request-available > Fix For: 2.1.0 > > > Here is an example to describe the issue. We have a source table that > generates numbers from 1 to 5. Then we derive two tables from the source and > convert them to datastream and sink to console. > If we debug the program, we can find that the > `org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource` > is created twice and the numbers are generated twice. It is a waste to > consume the same source data twice. > {code:java} > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().enableObjectReuse(); > env.setParallelism(1); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); > tEnv.executeSql( > "CREATE TABLE source(\n" > + " f_sequence INT\n" > + ") WITH (\n" > + " 'connector' = 'datagen',\n" > + " 'rows-per-second' ='1',\n" > + " 'fields.f_sequence.kind' ='sequence',\n" > + " 'fields.f_sequence.start'='1',\n" > + " 'fields.f_sequence.end'='5'\n" > + ")") > .await(); > Table source = tEnv.from("source"); > Table left = source.filter($("f_sequence").isGreater(3)); > Table right = source.filter($("f_sequence").isLessOrEqual(3)); > DataStream<Row> leftDataStream = tEnv.toDataStream(left); > DataStream<Row> rightDataStream = tEnv.toDataStream(right); > leftDataStream.addSink(new PrintSinkFunction<>()); > rightDataStream.addSink(new PrintSinkFunction<>()); > env.execute(); > } {code} > The reason is that every time the `StreamTableEnvironmentImpl#toDataStream` > is called, a new planner is created and translates the graph from the end > nodes. So the two graphs do not aware that they have the same source node. -- This message was sent by Atlassian Jira (v8.20.10#820010)