Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4736#discussion_r142254410 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala --- @@ -85,6 +85,46 @@ class OverWindowITCase extends StreamingWithStateTestBase { } @Test + def testOverWindowWithConstant(): Unit = { + + val data = List( + (1L, 1, "Hello"), + (2L, 2, "Hello"), + (3L, 3, "Hello"), + (4L, 4, "Hello"), + (5L, 5, "Hello"), + (6L, 6, "Hello"), + (7L, 7, "Hello World"), + (8L, 8, "Hello World"), + (8L, 8, "Hello World"), + (20L, 20, "Hello World")) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) + val weightAvgFun = new WeightedAvg + + val windowedTable = table + .window( + Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW as 'w) + .select('c, weightAvgFun('a, 42, 'b, "2") over 'w as 'wAvg) + .select('c, 'wAvg) --- End diff -- can be removed
---