[ https://issues.apache.org/jira/browse/FLINK-20637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17251103#comment-17251103 ]
Jark Wu commented on FLINK-20637: --------------------------------- [~Janze], {{Table#toRetractStream}} will convert the Table into DataStream, therefore calling this method multiple times will result in multiple data streams, it can't reuse sub-graph. In order to reuse, you should convert the Table only once, and apply different transformations based on the converted DataStream. > Table convert to dataStream twice will result in two data streams > ----------------------------------------------------------------- > > Key: FLINK-20637 > URL: https://issues.apache.org/jira/browse/FLINK-20637 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream, Connectors / Kafka, Table SQL / API, > Table SQL / Planner > Affects Versions: 1.11.2 > Reporter: Wu > Priority: Major > > > Code > {code:java} > //代码占位符 > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > env.enableCheckpointing(50000); > env.setParallelism(10); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, > settings); > > tableEnv.executeSql("create table feeds_expose_click_profile ( docId > string ,buuid string ,predictId string ,docType int ,clickLabel int ,viewTime > int ,exposeEventTime bigint ,clickEventTime string ,authorId string ,category > string ,subCategory string ,keywords string ,tags string, eventTime bigint, > rowTime as TO_TIMESTAMP(from_unixtime(eventTime / 1000)), WATERMARK FOR > rowTime AS rowTime - INTERVAL '5' SECOND) WITH ('connector' = 'kafka', > 'topic' = '', 'properties.bootstrap.servers' = '', 'properties.group.id' = > '', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', > 'json.ignore-parse-errors' = 'false' )"); > Table table = tableEnv.from("feeds_expose_click_profile"); > TypeInformation<Row> typeInfo = table.getSchema().toRowType(); > DataStream dataStream = tableEnv .toRetractStream(table, typeInfo) > .filter(row -> row.f0) > .map(row -> row.f1) > .returns(typeInfo); > Table tableFilter = tableEnv.sqlQuery("select buuid, authorId, viewTime, > rowTime from feeds_expose_click_profile"); > tableEnv.createTemporaryView("tableFilter", tableFilter); > TypeInformation<Row> typeInfo1 = tableFilter.getSchema().toRowType(); > DataStream dataStream1 = tableEnv .toRetractStream(tableFilter, typeInfo1) > .filter(row -> row.f0) > .map(row -> row.f1) > .returns(typeInfo1); dataStream1.print(); > System.out.println(env.getExecutionPlan()); > {code} > > > ExecutionPlan > > {code:java} > //代码占位符 > { > "nodes" : [ { > "id" : 1, > "type" : "Source: TableSourceScan(table=[[default_catalog, > default_database, feeds_expose_click_profile]], fields=[docId, buuid, > predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, > authorId, category, subCategory, keywords, tags, eventTime])", > "pact" : "Data Source", > "contents" : "Source: TableSourceScan(table=[[default_catalog, > default_database, feeds_expose_click_profile]], fields=[docId, buuid, > predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, > authorId, category, subCategory, keywords, tags, eventTime])", > "parallelism" : 10 > }, { > "id" : 2, > "type" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, > viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, > keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS > rowTime])", > "pact" : "Operator", > "contents" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, > viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, > keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS > rowTime])", > "parallelism" : 10, > "predecessors" : [ { > "id" : 1, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > }, { > "id" : 3, > "type" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - > 5000:INTERVAL SECOND)])", > "pact" : "Operator", > "contents" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - > 5000:INTERVAL SECOND)])", > "parallelism" : 10, > "predecessors" : [ { > "id" : 2, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > }, { > "id" : 4, > "type" : "SinkConversionToTuple2", > "pact" : "Operator", > "contents" : "SinkConversionToTuple2", > "parallelism" : 10, > "predecessors" : [ { > "id" : 3, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > }, { > "id" : 5, > "type" : "Filter", > "pact" : "Operator", > "contents" : "Filter", > "parallelism" : 10, > "predecessors" : [ { > "id" : 4, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > }, { > "id" : 6, > "type" : "Map", > "pact" : "Operator", > "contents" : "Map", > "parallelism" : 10, > "predecessors" : [ { > "id" : 5, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > }, { > "id" : 7, > "type" : "Source: TableSourceScan(table=[[default_catalog, > default_database, feeds_expose_click_profile]], fields=[docId, buuid, > predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, > authorId, category, subCategory, keywords, tags, eventTime])", > "pact" : "Data Source", > "contents" : "Source: TableSourceScan(table=[[default_catalog, > default_database, feeds_expose_click_profile]], fields=[docId, buuid, > predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, > authorId, category, subCategory, keywords, tags, eventTime])", > "parallelism" : 10 > }, { > "id" : 8, > "type" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, > viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, > keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS > rowTime])", > "pact" : "Operator", > "contents" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, > viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, > keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS > rowTime])", > "parallelism" : 10, > "predecessors" : [ { > "id" : 7, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > }, { > "id" : 9, > "type" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - > 5000:INTERVAL SECOND)])", > "pact" : "Operator", > "contents" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - > 5000:INTERVAL SECOND)])", > "parallelism" : 10, > "predecessors" : [ { > "id" : 8, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > }, { > "id" : 10, > "type" : "Calc(select=[buuid, authorId, viewTime, rowTime])", > "pact" : "Operator", > "contents" : "Calc(select=[buuid, authorId, viewTime, rowTime])", > "parallelism" : 10, > "predecessors" : [ { > "id" : 9, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > }, { > "id" : 11, > "type" : "SinkConversionToTuple2", > "pact" : "Operator", > "contents" : "SinkConversionToTuple2", > "parallelism" : 10, > "predecessors" : [ { > "id" : 10, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > }, { > "id" : 12, > "type" : "Filter", > "pact" : "Operator", > "contents" : "Filter", > "parallelism" : 10, > "predecessors" : [ { > "id" : 11, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > }, { > "id" : 13, > "type" : "Map", > "pact" : "Operator", > "contents" : "Map", > "parallelism" : 10, > "predecessors" : [ { > "id" : 12, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > }, { > "id" : 14, > "type" : "Sink: Print to Std. Out", > "pact" : "Data Sink", > "contents" : "Sink: Print to Std. Out", > "parallelism" : 10, > "predecessors" : [ { > "id" : 13, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > } ] > } > {code} > > I encountered this problem while using waterdrop. How to fix this problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)