[ 
https://issues.apache.org/jira/browse/FLINK-15775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17025175#comment-17025175
 ] 

Timo Walther commented on FLINK-15775:
--------------------------------------

This issue came up a couple of times in the past. The question is how 
independent multiple SQL pipelines should be executed. Currently, as Aljoscha 
said, they are completely isolated from each other. We are in the process of 
discussing how to reuse parts of the pipeline. FLIP-84 is a part of this 
effort. However, this is also not trivial as it would require to rethink 
predicate and projection push down as well that might come from multiple 
queries and should not interfere.

> SourceFunctions are instantiated twice when pulled on from 2 Sinks
> ------------------------------------------------------------------
>
>                 Key: FLINK-15775
>                 URL: https://issues.apache.org/jira/browse/FLINK-15775
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.9.1, 1.10.0
>            Reporter: Benoît Paris
>            Priority: Major
>         Attachments: flink-test-duplicated-sources.zip
>
>
> When pulled on by two sinks, the SourceFunctions of a TableSource will get 
> instantiated twice; (and subsequently opened by the parallelism number, which 
> is expected behavior):
> The following will instantiate the FooTableSource's SourceFunction once (OK 
> behavior, but not the processing we want):
>  
> {code:java}
> tEnv.registerTableSource("foo_table", new FooTableSource());
> Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0");
> tEnv.registerTableSink("syso_sink_0", new SysoSink());
> out0.insertInto("syso_sink_0");
> {code}
>  
> This will instantiate the FooTableSource's SourceFunction twice (Not OK, as 
> we're missing half the inputs in each SysoSink):
>  
> {code:java}
> tEnv.registerTableSource("foo_table", new FooTableSource());
> Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0");
> Table out1 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 1");
> tEnv.registerTableSink("syso_sink_0", new SysoSink());
> tEnv.registerTableSink("syso_sink_1", new SysoSink());
> out0.insertInto("syso_sink_0");
> out1.insertInto("syso_sink_1"); 
> {code}
>  
> This might not be a problem for Kafka's SourceFunctions, as we can always 
> reread from a log; but it is a data loss problem when the source data can't 
> be reproduced.
> Actually, this might be me not understanding the API. Is there a way to make 
> the runtime read from the same opened SourceFunctions?
> Attached is Java code that logs the faulty opening of the SourceFunctions, 
> pom.xml, and logical execution plans for the duplicated case, and the 
> workaround.
>  
> ----
> Workaround: make a conversion to an appendStream. Somehow this makes the 
> planner think it has to put a materialization barrier after the Source and 
> read from that:
>  
> {code:java}
> tEnv.registerTableSource("foo_table_source", new FooTableSource());
> Table sourceTable = tEnv.sqlQuery("SELECT * FROM foo_table_source");
> Table appendingSourceTable = tEnv.fromDataStream(
>  tEnv.toAppendStream(sourceTable, Types.ROW(new String[]{"field_1"}, new 
> TypeInformation[]{Types.LONG()}))
> );
> tEnv.registerTable("foo_table", appendingSourceTable);{code}
>  
>  
> Best Regards,
> Ben



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to