[ 
https://issues.apache.org/jira/browse/FLINK-20637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17251103#comment-17251103
 ] 

Jark Wu commented on FLINK-20637:
---------------------------------

[~Janze], {{Table#toRetractStream}} will convert the Table into DataStream, 
therefore calling this method multiple times will result in multiple data 
streams, it can't reuse sub-graph.

In order to reuse, you should convert the Table only once, and apply different 
transformations based on the converted DataStream.


> Table convert to dataStream twice will result in two data streams
> -----------------------------------------------------------------
>
>                 Key: FLINK-20637
>                 URL: https://issues.apache.org/jira/browse/FLINK-20637
>             Project: Flink
>          Issue Type: Sub-task
>          Components: API / DataStream, Connectors / Kafka, Table SQL / API, 
> Table SQL / Planner
>    Affects Versions: 1.11.2
>            Reporter: Wu
>            Priority: Major
>
>  
> Code
> {code:java}
> //代码占位符
>     EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>     env.enableCheckpointing(50000);
>     env.setParallelism(10);    
>     StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
> settings);
>    
>     tableEnv.executeSql("create table feeds_expose_click_profile ( docId 
> string ,buuid string ,predictId string ,docType int ,clickLabel int ,viewTime 
> int ,exposeEventTime bigint ,clickEventTime string ,authorId string ,category 
> string ,subCategory string ,keywords string ,tags  string, eventTime bigint,  
> rowTime as TO_TIMESTAMP(from_unixtime(eventTime / 1000)), WATERMARK FOR 
> rowTime AS rowTime - INTERVAL '5' SECOND) WITH ('connector' = 'kafka', 
> 'topic' = '', 'properties.bootstrap.servers' = '', 'properties.group.id' = 
> '', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 
> 'json.ignore-parse-errors' = 'false' )");
>     Table table = tableEnv.from("feeds_expose_click_profile");
>     TypeInformation<Row> typeInfo = table.getSchema().toRowType();    
> DataStream dataStream = tableEnv .toRetractStream(table, typeInfo)
>         .filter(row -> row.f0)
>         .map(row -> row.f1)
>         .returns(typeInfo);    
>     Table tableFilter = tableEnv.sqlQuery("select buuid, authorId, viewTime, 
> rowTime from feeds_expose_click_profile");
>     tableEnv.createTemporaryView("tableFilter", tableFilter);    
> TypeInformation<Row> typeInfo1 = tableFilter.getSchema().toRowType();
>     DataStream dataStream1 = tableEnv .toRetractStream(tableFilter, typeInfo1)
>         .filter(row -> row.f0)
>         .map(row -> row.f1)
>         .returns(typeInfo1);    dataStream1.print();
>     System.out.println(env.getExecutionPlan());
> {code}
>  
>  
> ExecutionPlan
>  
> {code:java}
> //代码占位符
> {
>   "nodes" : [ {
>     "id" : 1,
>     "type" : "Source: TableSourceScan(table=[[default_catalog, 
> default_database, feeds_expose_click_profile]], fields=[docId, buuid, 
> predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, 
> authorId, category, subCategory, keywords, tags, eventTime])",
>     "pact" : "Data Source",
>     "contents" : "Source: TableSourceScan(table=[[default_catalog, 
> default_database, feeds_expose_click_profile]], fields=[docId, buuid, 
> predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, 
> authorId, category, subCategory, keywords, tags, eventTime])",
>     "parallelism" : 10
>   }, {
>     "id" : 2,
>     "type" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, 
> viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, 
> keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS 
> rowTime])",
>     "pact" : "Operator",
>     "contents" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, 
> viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, 
> keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS 
> rowTime])",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 1,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 3,
>     "type" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 
> 5000:INTERVAL SECOND)])",
>     "pact" : "Operator",
>     "contents" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 
> 5000:INTERVAL SECOND)])",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 2,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 4,
>     "type" : "SinkConversionToTuple2",
>     "pact" : "Operator",
>     "contents" : "SinkConversionToTuple2",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 3,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 5,
>     "type" : "Filter",
>     "pact" : "Operator",
>     "contents" : "Filter",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 4,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 6,
>     "type" : "Map",
>     "pact" : "Operator",
>     "contents" : "Map",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 5,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 7,
>     "type" : "Source: TableSourceScan(table=[[default_catalog, 
> default_database, feeds_expose_click_profile]], fields=[docId, buuid, 
> predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, 
> authorId, category, subCategory, keywords, tags, eventTime])",
>     "pact" : "Data Source",
>     "contents" : "Source: TableSourceScan(table=[[default_catalog, 
> default_database, feeds_expose_click_profile]], fields=[docId, buuid, 
> predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, 
> authorId, category, subCategory, keywords, tags, eventTime])",
>     "parallelism" : 10
>   }, {
>     "id" : 8,
>     "type" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, 
> viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, 
> keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS 
> rowTime])",
>     "pact" : "Operator",
>     "contents" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, 
> viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, 
> keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS 
> rowTime])",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 7,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 9,
>     "type" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 
> 5000:INTERVAL SECOND)])",
>     "pact" : "Operator",
>     "contents" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 
> 5000:INTERVAL SECOND)])",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 8,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 10,
>     "type" : "Calc(select=[buuid, authorId, viewTime, rowTime])",
>     "pact" : "Operator",
>     "contents" : "Calc(select=[buuid, authorId, viewTime, rowTime])",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 9,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 11,
>     "type" : "SinkConversionToTuple2",
>     "pact" : "Operator",
>     "contents" : "SinkConversionToTuple2",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 10,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 12,
>     "type" : "Filter",
>     "pact" : "Operator",
>     "contents" : "Filter",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 11,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 13,
>     "type" : "Map",
>     "pact" : "Operator",
>     "contents" : "Map",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 12,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 14,
>     "type" : "Sink: Print to Std. Out",
>     "pact" : "Data Sink",
>     "contents" : "Sink: Print to Std. Out",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 13,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   } ]
> }
> {code}
>  
> I encountered this problem while using waterdrop. How to fix this problem.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to