[ https://issues.apache.org/jira/browse/FLINK-21013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17267617#comment-17267617 ]
Jark Wu edited comment on FLINK-21013 at 1/19/21, 3:26 AM: ----------------------------------------------------------- I think this is a missing feature since Blink merge. See the difference between blink-planner[1] and old-planner [2]. [1] https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala#L133 [2] https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/DataStreamConversions.scala#L108 was (Author: jark): I think this is an missing feature since Blink merge. See the difference between blink-planner[1] and old-planner [2]. [1] https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala#L133 [2] https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/DataStreamConversions.scala#L108 > Blink planner does not ingest timestamp into StreamRecord > --------------------------------------------------------- > > Key: FLINK-21013 > URL: https://issues.apache.org/jira/browse/FLINK-21013 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Table SQL / Runtime > Affects Versions: 1.12.0 > Reporter: Timo Walther > Priority: Blocker > Fix For: 1.12.2 > > > Currently, the rowtime attribute is not put into the StreamRecord when > leaving the Table API to DataStream API. The legacy planner supports this, > but the timestamp is null when using the Blink planner. > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > EnvironmentSettings settings = > > EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, > settings); > DataStream<Order> orderA = > env.fromCollection( > Arrays.asList( > new Order(1L, "beer", 3), > new Order(1L, "diaper", 4), > new Order(3L, "rubber", 2))); > DataStream<Order> orderB = > orderA.assignTimestampsAndWatermarks( > new AssignerWithPunctuatedWatermarks<Order>() { > @Nullable > @Override > public Watermark checkAndGetNextWatermark( > Order lastElement, long > extractedTimestamp) { > return new Watermark(extractedTimestamp); > } > @Override > public long extractTimestamp(Order element, long > recordTimestamp) { > return element.user; > } > }); > Table tableA = tEnv.fromDataStream(orderB, $("user").rowtime(), > $("product"), $("amount")); > // union the two tables > Table result = tEnv.sqlQuery("SELECT * FROM " + tableA); > tEnv.toAppendStream(result, Row.class) > .process( > new ProcessFunction<Row, Row>() { > @Override > public void processElement(Row value, Context > ctx, Collector<Row> out) > throws Exception { > System.out.println("TIMESTAMP" + > ctx.timestamp()); > } > }) > .print(); > env.execute(); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)