Jiang Xin created FLINK-32603:
---------------------------------
Summary: Avoid consuming twice when two pipelines have the same
table source
Key: FLINK-32603
URL: https://issues.apache.org/jira/browse/FLINK-32603
Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Reporter: Jiang Xin
Fix For: 1.19.0
Here is an example to describe the issue. We have a source table that generates
numbers from 1 to 5. Then we derive two tables from the source and convert them
to datastream and sink to console.
If we debug the program, we can find that the
`org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource`
is created twice and the numbers are generated twice. It is a waste to consume
the same source data twice.
{code:java}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(
"CREATE TABLE source(\n"
+ " f_sequence INT\n"
+ ") WITH (\n"
+ " 'connector' = 'datagen',\n"
+ " 'rows-per-second' ='1',\n"
+ " 'fields.f_sequence.kind' ='sequence',\n"
+ " 'fields.f_sequence.start'='1',\n"
+ " 'fields.f_sequence.end'='5'\n"
+ ")")
.await();
Table source = tEnv.from("source");
Table left = source.filter($("f_sequence").isGreater(3));
Table right = source.filter($("f_sequence").isLessOrEqual(3));
DataStream<Row> leftDataStream = tEnv.toDataStream(left);
DataStream<Row> rightDataStream = tEnv.toDataStream(right);
leftDataStream.addSink(new PrintSinkFunction<>());
rightDataStream.addSink(new PrintSinkFunction<>());
env.execute();
} {code}
The reason is that every time the `StreamTableEnvironmentImpl#toDataStream` is
called, a new planner is created and translates the graph from the end nodes.
So the two graphs do not aware that they have the same source node.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)