Jiang Xin created FLINK-32603: --------------------------------- Summary: Avoid consuming twice when two pipelines have the same table source Key: FLINK-32603 URL: https://issues.apache.org/jira/browse/FLINK-32603 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Jiang Xin Fix For: 1.19.0
Here is an example to describe the issue. We have a source table that generates numbers from 1 to 5. Then we derive two tables from the source and convert them to datastream and sink to console. If we debug the program, we can find that the `org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource` is created twice and the numbers are generated twice. It is a waste to consume the same source data twice. {code:java} public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().enableObjectReuse(); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.executeSql( "CREATE TABLE source(\n" + " f_sequence INT\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' ='1',\n" + " 'fields.f_sequence.kind' ='sequence',\n" + " 'fields.f_sequence.start'='1',\n" + " 'fields.f_sequence.end'='5'\n" + ")") .await(); Table source = tEnv.from("source"); Table left = source.filter($("f_sequence").isGreater(3)); Table right = source.filter($("f_sequence").isLessOrEqual(3)); DataStream<Row> leftDataStream = tEnv.toDataStream(left); DataStream<Row> rightDataStream = tEnv.toDataStream(right); leftDataStream.addSink(new PrintSinkFunction<>()); rightDataStream.addSink(new PrintSinkFunction<>()); env.execute(); } {code} The reason is that every time the `StreamTableEnvironmentImpl#toDataStream` is called, a new planner is created and translates the graph from the end nodes. So the two graphs do not aware that they have the same source node. -- This message was sent by Atlassian Jira (v8.20.10#820010)