[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904367#comment-15904367 ]
ASF GitHub Bot commented on FLINK-5658: --------------------------------------- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105323083 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ + +import scala.collection.mutable + +class UnboundedRowtimeOverTest extends StreamingWithStateTestBase { + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testWithPartition(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + StreamITCase.testResults = mutable.MutableList() + + val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by rowtime() range between " + + "unbounded preceding and current row) from T1" + + val t1 = StreamTestData.getSmall3TupleDataStream(env) + .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(Int, Long, String)] { + + def getCurrentWatermark: Watermark = new Watermark(1300000L) + + def extractTimestamp(element: (Int, Long, String), previousElementTimestamp: Long): Long = + 1400000 + }).toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("T1", t1) + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected1 = mutable.MutableList( + "1,1,1", "2,2,2", "3,2,5") + val expected2 = mutable.MutableList( + "1,1,1", "2,2,5", "3,2,3") + assertTrue(expected1.equals(StreamITCase.testResults.sorted) || + expected2.equals(StreamITCase.testResults.sorted)) + } + + /** test sliding event-time unbounded window without partitiion by **/ + @Test + def testWithoutPartition(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + StreamITCase.testResults = mutable.MutableList() + + val sqlQuery = "SELECT SUM(a) " + + "over (order by rowtime() range between unbounded preceding and current row) from T1" + + val t1 = StreamTestData.getSmall3TupleDataStream(env) + .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(Int, Long, String)] { + + def getCurrentWatermark: Watermark = new Watermark(1300000L) + + def extractTimestamp(element: (Int, Long, String), previousElementTimestamp: Long): Long = + 1400000 + }).toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("T1", t1) + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + assertEquals(Some("6"), StreamITCase.testResults.sorted.get(StreamITCase.testResults.size - 1)) + } + + /** test sliding event-time unbounded window with later record **/ + @Test + def testWithLater(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.getConfig.setAutoWatermarkInterval(10000); + StreamITCase.testResults = mutable.MutableList() + + val sqlQuery = "SELECT d, SUM(a) " + + "over (order by rowtime() range between unbounded preceding and current row) from T1" + + val t1 = StreamTestData.getSmall4TupleEventTimeDataStream(env) + .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Small4Tuple] { + var cur = 1200000L; + + def getCurrentWatermark: Watermark = new Watermark({cur += 10; cur;}) + + def extractTimestamp(element: Small4Tuple, previousElementTimestamp: Long): Long = + element.d --- End diff -- Can we set the value of `cur` based on the data? > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > ------------------------------------------------------------------------ > > Key: FLINK-5658 > URL: https://issues.apache.org/jira/browse/FLINK-5658 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Fabian Hueske > Assignee: Yuhong Hong > > The goal of this issue is to add support for OVER RANGE aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5655) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)