Benoît Paris created FLINK-15775:
------------------------------------

             Summary: SourceFunctions are instanciated twice when pulled on 
from 2 Sinks
                 Key: FLINK-15775
                 URL: https://issues.apache.org/jira/browse/FLINK-15775
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.9.1, 1.10.0
            Reporter: Benoît Paris
         Attachments: flink-test-duplicated-sources.zip

When pulled on by two sinks, the SourceFunctions of a TableSource will get 
instantiated twice; (and subsequently opened by the parallelism number, which 
is expected behavior):

The following will instantiate the FooTableSource's SourceFunction once (OK 
behavior, but not the processing we want):

 
{code:java}
tEnv.registerTableSource("foo_table", new FooTableSource());
Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0");
tEnv.registerTableSink("syso_sink_0", new SysoSink());
out0.insertInto("syso_sink_0");
{code}
 

This will instantiate the FooTableSource's SourceFunction twice (Not OK, as 
we're missing half the inputs in each SysoSink):

 
{code:java}
tEnv.registerTableSource("foo_table", new FooTableSource());
Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0");
Table out1 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 1");
tEnv.registerTableSink("syso_sink_0", new SysoSink());
tEnv.registerTableSink("syso_sink_1", new SysoSink());
out0.insertInto("syso_sink_0");
out1.insertInto("syso_sink_1"); 
{code}
 

This might not be a problem for Kafka's SourceFunctions, as we can always 
reread from a log; but it is a data loss problem when the source data can't be 
reproduced.

Actually, this might be me not understanding the API. Is there a way to make 
the runtime read from the same opened SourceFunctions?

Attached is Java code that logs the faulty opening of the SourceFunctions, 
pom.xml, and logical execution plans for the duplicated case, and the 
workaround.

 
----
Workaround: make a conversion to an appendStream. Somehow this makes the 
planner think it has to put a materialization barrier after the Source and read 
from that:

 
{code:java}
tEnv.registerTableSource("foo_table_source", new FooTableSource());
Table sourceTable = tEnv.sqlQuery("SELECT * FROM foo_table_source");
Table appendingSourceTable = tEnv.fromDataStream(
 tEnv.toAppendStream(sourceTable, Types.ROW(new String[]{"field_1"}, new 
TypeInformation[]{Types.LONG()}))
);
tEnv.registerTable("foo_table", appendingSourceTable);{code}
 

 

Best Regards,

Ben



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to