[ https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15548978#comment-15548978 ]
ASF GitHub Bot commented on FLINK-4691: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81861775 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala --- @@ -0,0 +1,777 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark +import org.apache.flink.api.scala.stream.utils.StreamITCase +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.{Row, _} +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class GroupWindowITCase extends StreamingMultipleProgramsTestBase { + + @Test(expected = classOf[ValidationException]) + def testInvalidBatchWindow(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + val data = new mutable.MutableList[(Long, Int, String)] + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'long, 'int, 'string) + + table + .groupBy('string) + .window(Session withGap 10.rows as 'string) + } + + @Test(expected = classOf[TableException]) + def testInvalidRowtime1(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + val data = new mutable.MutableList[(Long, Int, String)] + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'rowtime, 'int, 'string) + + table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime2(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + val data = new mutable.MutableList[(Long, Int, String)] + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'long, 'int, 'string) + + table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count as 'rowtime) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime3(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + val data = new mutable.MutableList[(Long, Int, String)] + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'long, 'int, 'string) + + table.as('rowtime, 'myint, 'mystring) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime4(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + val data = new mutable.MutableList[(Long, Int, String)] + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'long, 'int, 'string) + + table + .groupBy('string) + .window(Tumble over 50.milli on 'string) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidTumblingSize(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + val data = new mutable.MutableList[(Long, Int, String)] + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'long, 'int, 'string) + + table + .groupBy('string) + .window(Tumble over "WRONG") + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidSlidingSize(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + val data = new mutable.MutableList[(Long, Int, String)] + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'long, 'int, 'string) + + table + .groupBy('string) + .window(Slide over "WRONG" every "WRONG") + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidSlidingSlide(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + val data = new mutable.MutableList[(Long, Int, String)] + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'long, 'int, 'string) + + table + .groupBy('string) + .window(Slide over 12.rows every "WRONG") + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidSessionGap(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + val data = new mutable.MutableList[(Long, Int, String)] + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'long, 'int, 'string) + + table + .groupBy('string) + .window(Session withGap 10.rows) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidWindowAlias1(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + val data = new mutable.MutableList[(Long, Int, String)] + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'long, 'int, 'string) + + table + .groupBy('string) + .window(Session withGap 10.rows as 1 + 1) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidWindowAlias2(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + val data = new mutable.MutableList[(Long, Int, String)] + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'long, 'int, 'string) + + table + .groupBy('string) + .window(Session withGap 10.rows as 'string) + .select('string, 'int.count) + } + + @Test + def testProcessingTimeTumblingGroupWindowOverTime(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val data = new mutable.MutableList[(Long, Int, String)] + data.+=((1L, 1, "Hi")) + data.+=((2L, 2, "Hello")) + data.+=((4L, 2, "Hello")) + data.+=((8L, 3, "Hello world")) + data.+=((16L, 3, "Hello world")) + val stream = env.fromCollection(data) + + val table = stream.toTable(tEnv, 'long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count) + + // we only test if validation is successful here since processing time is non-deterministic + windowedTable.toDataStream[Row] + } + + @Test + def testProcessingTimeTumblingGroupWindowOverCount(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val data = new mutable.MutableList[(Long, Int, String)] + data.+=((1L, 1, "Hi")) + data.+=((2L, 2, "Hello")) + data.+=((4L, 2, "Hello")) + data.+=((8L, 3, "Hello world")) + data.+=((16L, 3, "Hello world")) + val stream = env.fromCollection(data) + + val table = stream.toTable(tEnv, 'long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Tumble over 2.rows) + .select('string, 'int.count) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq("Hello world,2", "Hello,2") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testEventTimeTumblingGroupWindowOverTime(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val data = new mutable.MutableList[(Long, Int, String)] + data.+=((1L, 1, "Hi")) + data.+=((2L, 2, "Hello")) + data.+=((4L, 2, "Hello")) + data.+=((8L, 3, "Hello world")) + data.+=((16L, 3, "Hello world")) + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + + val table = stream.toTable(tEnv, 'long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Tumble over 5.milli on 'rowtime) + .select('string, 'int.count) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq("Hello world,1", "Hello world,1", "Hello,2", "Hi,1") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testEventTimeTumblingGroupWindowOverCount(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val data = new mutable.MutableList[(Long, Int, String)] + data.+=((1L, 1, "Hi")) + data.+=((2L, 2, "Hello")) + data.+=((4L, 2, "Hello")) + data.+=((8L, 3, "Hello world")) + data.+=((16L, 3, "Hello world")) + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + + val table = stream.toTable(tEnv, 'long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Tumble over 2.rows on 'rowtime) + .select('string, 'int.count) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq("Hello world,2", "Hello,2") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testProcessingTimeSlidingGroupWindowOverTime(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val data = new mutable.MutableList[(Long, Int, String)] + data.+=((1L, 1, "Hi")) + data.+=((2L, 2, "Hello")) + data.+=((4L, 2, "Hello")) + data.+=((8L, 3, "Hello world")) + data.+=((16L, 3, "Hello world")) + val stream = env.fromCollection(data) + + val table = stream.toTable(tEnv, 'long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Slide over 50.milli every 50.milli) + .select('string, 'int.count) + + // we only test if validation is successful here since processing time is non-deterministic + windowedTable.toDataStream[Row] + } + + @Test + def testProcessingTimeSlidingGroupWindowOverCount(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val data = new mutable.MutableList[(Long, Int, String)] + data.+=((1L, 1, "Hi")) + data.+=((2L, 2, "Hello")) + data.+=((4L, 2, "Hello")) + data.+=((8L, 3, "Hello world")) + data.+=((16L, 3, "Hello world")) + val stream = env.fromCollection(data) + + val table = stream.toTable(tEnv, 'long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Slide over 2.rows every 1.rows) + .select('string, 'int.count) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq("Hello world,1", "Hello world,2", "Hello,1", "Hello,2", "Hi,1") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testEventTimeSlidingGroupWindowOverTime(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val data = new mutable.MutableList[(Long, Int, String)] + data.+=((1L, 1, "Hi")) + data.+=((2L, 2, "Hello")) + data.+=((4L, 2, "Hello")) + data.+=((8L, 3, "Hello world")) + data.+=((16L, 3, "Hello world")) + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + + val table = stream.toTable(tEnv, 'long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Slide over 8.milli every 10.milli on 'rowtime) + .select('string, 'int.count) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq("Hello world,1", "Hello,2", "Hi,1") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testEventTimeSlidingGroupWindowOverCount(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val data = new mutable.MutableList[(Long, Int, String)] + data.+=((1L, 1, "Hi")) + data.+=((2L, 2, "Hello")) + data.+=((4L, 2, "Hello")) + data.+=((8L, 3, "Hello world")) + data.+=((16L, 3, "Hello world")) + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + + val table = stream.toTable(tEnv, 'long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Slide over 2.rows every 1.rows on 'rowtime) + .select('string, 'int.count) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq("Hello world,1", "Hello world,2", "Hello,1", "Hello,2", "Hi,1") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testEventTimeSessionGroupWindowOverTime(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val data = new mutable.MutableList[(Long, Int, String)] + data.+=((1L, 1, "Hi")) + data.+=((2L, 2, "Hello")) + data.+=((4L, 2, "Hello")) + data.+=((8L, 3, "Hello world")) + data.+=((16L, 3, "Hello world")) + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + + val table = stream.toTable(tEnv, 'long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Session withGap 7.milli on 'rowtime) + .select('string, 'int.count) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq("Hello world,1", "Hello world,1", "Hello,2", "Hi,1") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testAllProcessingTimeTumblingGroupWindowOverTime(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val data = new mutable.MutableList[(Long, Int, String)] + data.+=((1L, 1, "Hi")) + data.+=((2L, 2, "Hello")) + data.+=((4L, 2, "Hello")) + data.+=((8L, 3, "Hello world")) + data.+=((16L, 3, "Hello world")) + val stream = env.fromCollection(data) + + val table = stream.toTable(tEnv, 'long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count) + + // we only test if validation is successful here since processing time is non-deterministic + windowedTable.toDataStream[Row] + } + + @Test + def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val data = new mutable.MutableList[(Long, Int, String)] + data.+=((1L, 1, "Hi")) + data.+=((2L, 2, "Hello")) + data.+=((4L, 2, "Hello")) + data.+=((8L, 3, "Hello world")) + data.+=((16L, 3, "Hello world")) + val stream = env.fromCollection(data) + + val table = stream.toTable(tEnv, 'long, 'int, 'string) + + val windowedTable = table + .window(Tumble over 2.rows) + .select('int.count) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq("2", "2") --- End diff -- Shouldn't this be `"2", "2", "2"`? > Add group-windows for streaming tables > --------------------------------------- > > Key: FLINK-4691 > URL: https://issues.apache.org/jira/browse/FLINK-4691 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Timo Walther > Assignee: Timo Walther > > Add Tumble, Slide, Session group-windows for streaming tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > > Implementation of group-windows on streaming tables. This includes > implementing the API of group-windows, the logical validation for > group-windows, and the definition of the “rowtime” and “systemtime” keywords. > Group-windows on batch tables won’t be initially supported and will throw an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)