[ https://issues.apache.org/jira/browse/FLINK-21013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-21013: ----------------------------------- Labels: pull-request-available (was: ) > Blink planner does not ingest timestamp into StreamRecord > --------------------------------------------------------- > > Key: FLINK-21013 > URL: https://issues.apache.org/jira/browse/FLINK-21013 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Table SQL / Runtime > Affects Versions: 1.12.0 > Reporter: Timo Walther > Assignee: Leonard Xu > Priority: Blocker > Labels: pull-request-available > Fix For: 1.12.2 > > > Currently, the rowtime attribute is not put into the StreamRecord when > leaving the Table API to DataStream API. The legacy planner supports this, > but the timestamp is null when using the Blink planner. > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > EnvironmentSettings settings = > > EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, > settings); > DataStream<Order> orderA = > env.fromCollection( > Arrays.asList( > new Order(1L, "beer", 3), > new Order(1L, "diaper", 4), > new Order(3L, "rubber", 2))); > DataStream<Order> orderB = > orderA.assignTimestampsAndWatermarks( > new AssignerWithPunctuatedWatermarks<Order>() { > @Nullable > @Override > public Watermark checkAndGetNextWatermark( > Order lastElement, long > extractedTimestamp) { > return new Watermark(extractedTimestamp); > } > @Override > public long extractTimestamp(Order element, long > recordTimestamp) { > return element.user; > } > }); > Table tableA = tEnv.fromDataStream(orderB, $("user").rowtime(), > $("product"), $("amount")); > // union the two tables > Table result = tEnv.sqlQuery("SELECT * FROM " + tableA); > tEnv.toAppendStream(result, Row.class) > .process( > new ProcessFunction<Row, Row>() { > @Override > public void processElement(Row value, Context > ctx, Collector<Row> out) > throws Exception { > System.out.println("TIMESTAMP" + > ctx.timestamp()); > } > }) > .print(); > env.execute(); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)