[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16010840#comment-16010840 ]
ASF GitHub Bot commented on FLINK-6075: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3889#discussion_r116475426 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet } +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel } +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, DataStream, KeyedStream, WindowedStream } +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.{ Window => DataStreamWindow } +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions._ +import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.plan.nodes.CommonAggregate +import org.apache.flink.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval +import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, TimeIntervalTypeInfo } +import org.apache.flink.types.Row +import org.apache.calcite.sql.SqlAggFunction +import org.apache.flink.table.plan.nodes.datastream.DataStreamRel +import org.apache.calcite.sql.fun.SqlSingleValueAggFunction +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.calcite.rel.core.Sort +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.flink.table.runtime.aggregate.SortUtil._ +import org.apache.calcite.rel.RelCollation +import org.apache.calcite.rex.RexNode +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} +import org.apache.flink.table.plan.schema.RowSchema + +/** + * Flink RelNode which matches along with Sort Rule. + * + */ +class DataStreamSort( + sortCollation: RelCollation, + sortOffset: RexNode, + sortFetch: RexNode, + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputNode: RelNode, + schema: RowSchema, + inputSchema: RowSchema, + description: String) + extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel { + + override def deriveRowType(): RelDataType = schema.logicalType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataStreamSort( + sortCollation, + sortOffset, + sortFetch, + cluster, + traitSet, + inputs.get(0), + schema, + inputSchema, + description) + } + + override def toString: String = { + s"Sort(by: ($SortUtil.getSortFieldToString(sortCollation, rowRelDataType))," + + " offset: $SortUtil.getOffsetToString(sortOffset)," + + " fetch: $SortUtil.getFetchToString(sortFetch, sortOffset))" + } + + override def explainTerms(pw: RelWriter) : RelWriter = { + + //need to identify time between others order fields. Time needs to be first sort element + checkTimeOrder() + + super.explainTerms(pw) + .item("orderBy", SortUtil.getSortFieldToString(sortCollation, schema.logicalType)) + .item("offset", SortUtil.getOffsetToString(sortOffset)) + .item("fetch", SortUtil.getFetchToString(sortFetch, sortOffset)) + } + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + queryConfig: StreamQueryConfig): DataStream[CRow] = { + + val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) + + //need to identify time between others order fields. Time needs to be first sort element + val timeType = SortUtil.getTimeType(sortCollation, schema.logicalType) + + //time ordering needs to be ascending + if (SortUtil.getTimeDirection(sortCollation) != Direction.ASCENDING) { + throw new TableException("SQL/Table supports only ascending time ordering") + } + + val execCfg = tableEnv.execEnv.getConfig + + //enable to extend for other types of aggregates that will not be implemented in a window + timeType match { + case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType) => + (sortOffset,sortFetch) match { + case (o: Any, f: Any) => // offset and fetch needs retraction + throw new TableException("SQL/Table does not support sort with offset and fetch") + case (_, f: Any) => // offset needs retraction + throw new TableException("SQL/Table does not support sort with fetch") + case (o: Any, _) => // fetch needs retraction + throw new TableException("SQL/Table does not support sort with offset") + case _ => createSortProcTime(inputDS, execCfg) //sort can be done without retraction + } + case _ if FlinkTypeFactory.isRowtimeIndicatorType(timeType) => + (sortOffset,sortFetch) match { + case (o: Any, f: Any) => // offset and fetch needs retraction + throw new TableException("SQL/Table does not support sort with offset and fetch") + case (_, f: Any) => // offset needs retraction + throw new TableException("SQL/Table does not support sort with fetch") + case (o: Any, _) => // fetch needs retraction + throw new TableException("SQL/Table does not support sort with offset") + case _ => createSortRowTime(inputDS, execCfg) //sort can be done without retraction + } + case _ => + throw new TableException("SQL/Table needs to have sort on time as first sort element") + } + + } + + /** + * Create Sort logic based on processing time + */ + def createSortProcTime( + inputDS: DataStream[CRow], + execCfg: ExecutionConfig): DataStream[CRow] = { + + val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo) + + //if the order has secondary sorting fields in addition to the proctime + if( SortUtil.getSortFieldIndexList(sortCollation).size > 1) { + + val processFunction = SortUtil.createProcTimeSortFunction(sortCollation, + inputSchema.logicalType, inputSchema.physicalTypeInfo, execCfg) + + inputDS + .keyBy(new NullByteKeySelector[CRow]) + .process(processFunction).setParallelism(1).setMaxParallelism(1) + .returns(returnTypeInfo) + .asInstanceOf[DataStream[CRow]] + } else { + //if the order is done only on proctime we only need to forward the elements + inputDS.keyBy(new NullByteKeySelector[CRow]) + .map(new IdentityRowMap()) + .setParallelism(1).setMaxParallelism(1) + .returns(returnTypeInfo) + .asInstanceOf[DataStream[CRow]] + } + } + + /** + * Create Sort logic based on row time + */ + def createSortRowTime( + inputDS: DataStream[CRow], + execCfg: ExecutionConfig): DataStream[CRow] = { + + val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo) + + val processFunction = SortUtil.createRowTimeSortFunction(sortCollation, + inputSchema.logicalType, inputSchema.physicalTypeInfo, execCfg) + + inputDS + .keyBy(new NullByteKeySelector[CRow]) + .process(processFunction).setParallelism(1).setMaxParallelism(1) + .returns(returnTypeInfo) + .asInstanceOf[DataStream[CRow]] + + } + + /** + * Function is used to check at verification time if the SQL syntax is supported + */ + + def checkTimeOrder() = { --- End diff -- I think we should move the check for time being the first sorting column and ASCENDING only into the `DataStreamSortRule`. When we later add sorting which is not based on time, we might want to make a distinction in the rule. > Support Limit/Top(Sort) for Stream SQL > -------------------------------------- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted collection of the data on which the logic will be applied > (i.e., select a subset of the items or the entire sorted set). These > functions would make sense in the streaming context only in the context of a > window. Without defining a window the functions could never emit as the sort > operation would never trigger. If an SQL query will be provided without > limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). > Although not targeted by this JIRA, in the case of working based on event > time order, the retraction mechanisms of windows and the lateness mechanisms > can be used to deal with out of order events and retraction/updates of > results. > **Functionality example** > We exemplify with the query below for all the 3 types of operators (sorting, > limit and top). Rowtime indicates when the HOP window will trigger – which > can be observed in the fact that outputs are generated only at those moments. > The HOP windows will trigger at every hour (fixed hour) and each event will > contribute/ be duplicated for 2 consecutive hour intervals. Proctime > indicates the processing time when a new event arrives in the system. Events > are of the type (a,b) with the ordering being applied on the b field. > `SELECT a FROM stream1 HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR) > ORDER BY b (LIMIT 2/ TOP 2 / [ASC/DESC] `) > ||Rowtime|| Proctime|| Stream1|| Limit 2|| Top 2|| Sort > [ASC]|| > | |10:00:00 |(aaa, 11) | | | > | > | |10:05:00 |(aab, 7) | | | | > |10-11 |11:00:00 | | aab,aaa |aab,aaa | aab,aaa > | > | |11:03:00 |(aac,21) | | | | > > |11-12 |12:00:00 | | aab,aaa |aab,aaa | aab,aaa,aac| > | |12:10:00 |(abb,12) | | | | > > | |12:15:00 |(abb,12) | | | | > > |12-13 |13:00:00 | | abb,abb | abb,abb | > abb,abb,aac| > |...| > **Implementation option** > Considering that the SQL operators will be associated with window boundaries, > the functionality will be implemented within the logic of the window as > follows. > * Window assigner – selected based on the type of window used in SQL > (TUMBLING, SLIDING…) > * Evictor/ Trigger – time or count evictor based on the definition of the > window boundaries > * Apply – window function that sorts data and selects the output to trigger > (based on LIMIT/TOP parameters). All data will be sorted at once and result > outputted when the window is triggered > An alternative implementation can be to use a fold window function to sort > the elements as they arrive, one at a time followed by a flatMap to filter > the number of outputs. > !sort.png! > **General logic of Join** > ``` > inputDataStream.window(new [Slide/Tumble][Time/Count]Window()) > //.trigger(new [Time/Count]Trigger()) – use default > //.evictor(new [Time/Count]Evictor()) – use default > .apply(SortAndFilter()); > ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)