[ 
https://issues.apache.org/jira/browse/FLINK-32603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-32603:
-------------------------------
    Fix Version/s: 2.1.0
                       (was: 2.0.0)

> Avoid consuming twice when two pipelines have the same table source
> -------------------------------------------------------------------
>
>                 Key: FLINK-32603
>                 URL: https://issues.apache.org/jira/browse/FLINK-32603
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>    Affects Versions: 2.1.0
>            Reporter: Jiang Xin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.1.0
>
>
> Here is an example to describe the issue. We have a source table that 
> generates numbers from 1 to 5. Then we derive two tables from the source and 
> convert them to datastream and sink to console.
> If we debug the program, we can find that the 
> `org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource` 
> is created twice and the numbers are generated twice. It is a waste to 
> consume the same source data twice.
> {code:java}
> public static void main(String[] args) throws Exception {
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.getConfig().enableObjectReuse();
>     env.setParallelism(1);
>     StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>     tEnv.executeSql(
>                     "CREATE TABLE source(\n"
>                             + "  f_sequence INT\n"
>                             + ") WITH (\n"
>                             + "  'connector' = 'datagen',\n"
>                             + "  'rows-per-second' ='1',\n"
>                             + "  'fields.f_sequence.kind' ='sequence',\n"
>                             + "  'fields.f_sequence.start'='1',\n"
>                             + "  'fields.f_sequence.end'='5'\n"
>                             + ")")
>             .await();
>     Table source = tEnv.from("source");
>     Table left = source.filter($("f_sequence").isGreater(3));
>     Table right = source.filter($("f_sequence").isLessOrEqual(3));
>     DataStream<Row> leftDataStream = tEnv.toDataStream(left);
>     DataStream<Row> rightDataStream = tEnv.toDataStream(right);
>     leftDataStream.addSink(new PrintSinkFunction<>());
>     rightDataStream.addSink(new PrintSinkFunction<>());
>     env.execute();
> } {code}
> The reason is that every time the `StreamTableEnvironmentImpl#toDataStream` 
> is called, a new planner is created and translates the graph from the end 
> nodes. So the two graphs do not aware that they have the same source node.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to