Jiang Xin created FLINK-32603:
---------------------------------

             Summary: Avoid consuming twice when two pipelines have the same 
table source
                 Key: FLINK-32603
                 URL: https://issues.apache.org/jira/browse/FLINK-32603
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
            Reporter: Jiang Xin
             Fix For: 1.19.0


Here is an example to describe the issue. We have a source table that generates 
numbers from 1 to 5. Then we derive two tables from the source and convert them 
to datastream and sink to console.

If we debug the program, we can find that the 
`org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource` 
is created twice and the numbers are generated twice. It is a waste to consume 
the same source data twice.
{code:java}
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().enableObjectReuse();
    env.setParallelism(1);
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

    tEnv.executeSql(
                    "CREATE TABLE source(\n"
                            + "  f_sequence INT\n"
                            + ") WITH (\n"
                            + "  'connector' = 'datagen',\n"
                            + "  'rows-per-second' ='1',\n"
                            + "  'fields.f_sequence.kind' ='sequence',\n"
                            + "  'fields.f_sequence.start'='1',\n"
                            + "  'fields.f_sequence.end'='5'\n"
                            + ")")
            .await();

    Table source = tEnv.from("source");
    Table left = source.filter($("f_sequence").isGreater(3));
    Table right = source.filter($("f_sequence").isLessOrEqual(3));

    DataStream<Row> leftDataStream = tEnv.toDataStream(left);
    DataStream<Row> rightDataStream = tEnv.toDataStream(right);

    leftDataStream.addSink(new PrintSinkFunction<>());
    rightDataStream.addSink(new PrintSinkFunction<>());

    env.execute();
} {code}
The reason is that every time the `StreamTableEnvironmentImpl#toDataStream` is 
called, a new planner is created and translates the graph from the end nodes. 
So the two graphs do not aware that they have the same source node.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to