Mark You created FLINK-6862: ------------------------------- Summary: Tumble window rowtime not resolve at logic plan validation Key: FLINK-6862 URL: https://issues.apache.org/jira/browse/FLINK-6862 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.3.0 Reporter: Mark You
Following code sample work in version 1.2.1, but failed at 1.3.0 {code:title=Bar.java|borderStyle=solid} public class TumblingWindow { public static void main(String[] args) throws Exception { List<Content> data = new ArrayList<Content>(); data.add(new Content(1L, "Hi")); data.add(new Content(2L, "Hallo")); data.add(new Content(3L, "Hello")); data.add(new Content(4L, "Hello")); data.add(new Content(7L, "Hello")); data.add(new Content(8L, "Hello world")); data.add(new Content(16L, "Hello world")); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); DataStream<Content> stream = env.fromCollection(data); DataStream<Content> stream2 = stream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<Content>(Time.milliseconds(1)) { /** * */ private static final long serialVersionUID = 410512296011057717L; @Override public long extractTimestamp(Content element) { return element.getRecordTime(); } }); Table table = tableEnv.fromDataStream(stream2); table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w").select("w.start, content.count"); env.execute(); } public static class Content implements Serializable { private long recordTime; private String content; public Content() { super(); } public Content(long recordTime, String content) { super(); this.recordTime = recordTime; this.content = content; } public long getRecordTime() { return recordTime; } public void setRecordTime(long recordTime) { this.recordTime = recordTime; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } } private class TimestampWithEqualWatermark implements AssignerWithPunctuatedWatermarks<Object[]> { /** * */ private static final long serialVersionUID = 1L; @Override public long extractTimestamp(Object[] element, long previousElementTimestamp) { // TODO Auto-generated method stub return (long) element[0]; } @Override public Watermark checkAndGetNextWatermark(Object[] lastElement, long extractedTimestamp) { return new Watermark(extractedTimestamp); } } } {code} {noformat} Exception trace Exception in thread "main" org.apache.flink.table.api.ValidationException: Cannot resolve [rowtime] given input [content, recordTime]. at org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:143) at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:86) at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:83) at org.apache.flink.table.plan.TreeNode.postOrderTransform(TreeNode.scala:72) at org.apache.flink.table.plan.logical.LogicalNode.org$apache$flink$table$plan$logical$LogicalNode$$expressionPostOrderTransform$1(LogicalNode.scala:119) at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7$$anonfun$apply$1.apply(LogicalNode.scala:132) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7.apply(LogicalNode.scala:131) at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) at scala.collection.Iterator$class.foreach(Iterator.scala:742) at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308) at scala.collection.AbstractIterator.to(Iterator.scala:1194) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287) at scala.collection.AbstractIterator.toArray(Iterator.scala:1194) at org.apache.flink.table.plan.logical.LogicalNode.expressionPostOrderTransform(LogicalNode.scala:137) at org.apache.flink.table.plan.logical.LogicalNode.validate(LogicalNode.scala:83) at org.apache.flink.table.plan.logical.Project.validate(operators.scala:67) at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1054) at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1073) at com.taiwanmobile.cep.noc.TumblingWindow.main(TumblingWindow.java:54) {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)