[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15977684#comment-15977684 ]
ASF GitHub Bot commented on FLINK-6228: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112570917 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.table.OverWindowITCase.{RowTimeSourceFunction} +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class OverWindowITCase extends StreamingWithStateTestBase { + + @Test + def testProcTimeUnBoundedPartitionedRowOver(): Unit = { + + val data = List( + (1L, 1, "Hello"), + (2L, 2, "Hello"), + (3L, 3, "Hello"), + (4L, 4, "Hello"), + (5L, 5, "Hello"), + (6L, 6, "Hello"), + (7L, 7, "Hello World"), + (8L, 8, "Hello World"), + (20L, 20, "Hello World")) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'a, 'b, 'c) + + val windowedTable = table + .window( + Over partitionBy 'c orderBy 'procTime preceding UNBOUNDED_ROW following CURRENT_ROW as 'w) + .select('c, 'b.count over 'w as 'mycount) + .select('c, 'mycount) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "Hello World,1", "Hello World,2", "Hello World,3", + "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testRowTimeUnBoundedPartitionedRangeOver(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setStateBackend(getStateBackend) + StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear + env.setParallelism(1) + + val data = Seq( + Left(14000005L, (1, 1L, "Hi")), + Left(14000000L, (2, 1L, "Hello")), + Left(14000002L, (1, 1L, "Hello")), + Left(14000002L, (1, 2L, "Hello")), + Left(14000002L, (1, 3L, "Hello world")), + Left(14000003L, (2, 2L, "Hello world")), + Left(14000003L, (2, 3L, "Hello world")), + Right(14000020L), + Left(14000021L, (1, 4L, "Hello world")), + Left(14000022L, (1, 5L, "Hello world")), + Left(14000022L, (1, 6L, "Hello world")), + Left(14000022L, (1, 7L, "Hello world")), + Left(14000023L, (2, 4L, "Hello world")), + Left(14000023L, (2, 5L, "Hello world")), + Right(14000030L) + ) + val table = env + .addSource(new RowTimeSourceFunction[(Int, Long, String)](data)) + .toTable(tEnv).as('a, 'b, 'c) + + val windowedTable = table + .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE following + CURRENT_RANGE as 'w) + .select( + 'a, 'b, 'c, + 'b.sum over 'w, + 'b.count over 'w, + 'b.avg over 'w, + 'b.max over 'w, + 'b.min over 'w) + + val result = windowedTable.toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,1,Hello,6,3,2,3,1", + "1,2,Hello,6,3,2,3,1", + "1,3,Hello world,6,3,2,3,1", + "1,1,Hi,7,4,1,3,1", + "2,1,Hello,1,1,1,1,1", + "2,2,Hello world,6,3,2,3,1", + "2,3,Hello world,6,3,2,3,1", + "1,4,Hello world,11,5,2,4,1", + "1,5,Hello world,29,8,3,7,1", + "1,6,Hello world,29,8,3,7,1", + "1,7,Hello world,29,8,3,7,1", + "2,4,Hello world,15,5,3,5,1", + "2,5,Hello world,15,5,3,5,1" + ) + + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testProcTimeBoundedPartitionedRangeOver(): Unit = { + + val data = List( + (1, 1L, 0, "Hallo", 1L), + (2, 2L, 1, "Hallo Welt", 2L), + (2, 3L, 2, "Hallo Welt wie", 1L), + (3, 4L, 3, "Hallo Welt wie gehts?", 2L), + (3, 5L, 4, "ABC", 2L), + (3, 6L, 5, "BCD", 3L), + (4, 7L, 6, "CDE", 2L), + (4, 8L, 7, "DEF", 1L), + (4, 9L, 8, "EFG", 1L), + (4, 10L, 9, "FGH", 2L), + (5, 11L, 10, "GHI", 1L), + (5, 12L, 11, "HIJ", 3L), + (5, 13L, 12, "IJK", 3L), + (5, 14L, 13, "JKL", 2L), + (5, 15L, 14, "KLM", 2L)) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setParallelism(1) + StreamITCase.testResults = mutable.MutableList() + + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + + val windowedTable = table + .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW as 'w) + .select('a, 'c.sum over 'w, 'c.min over 'w) + val result = windowedTable.toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,0,0", + "2,1,1", + "2,3,1", + "3,3,3", + "3,7,3", + "3,12,3", + "4,6,6", + "4,13,6", + "4,21,6", + "4,30,6", + "5,10,10", + "5,21,10", + "5,33,10", + "5,46,10", + "5,60,10") + + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testRowTimeBoundedPartitionedRowOver(): Unit = { --- End diff -- Add a test for rowtime bounded range as well? > Integrating the OVER windows in the Table API > --------------------------------------------- > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: sunjincheng > Assignee: sunjincheng > > Syntax: > {code} > table > .overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) > ) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)