[ https://issues.apache.org/jira/browse/FLINK-3940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15405482#comment-15405482 ]
ASF GitHub Bot commented on FLINK-3940: --------------------------------------- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2282#discussion_r73290760 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala --- @@ -71,11 +78,57 @@ class DataSetSort( partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2) } + val offsetAndFetchDS = if (offset != null) { + val offsetIndex = RexLiteral.intValue(offset) + val fetchIndex = if (fetch != null) { + RexLiteral.intValue(fetch) + offsetIndex + } else { + Int.MaxValue + } + if (currentParallelism != 1) { + val partitionCount = partitionedDs.mapPartition( + new MapPartitionFunction[Any, Int] { + override def mapPartition(value: lang.Iterable[Any], out: Collector[Int]): Unit = { + val iterator = value.iterator() + var elementCount = 0 + while (iterator.hasNext) { + elementCount += 1 + iterator -> iterator.next() + } + out.collect(elementCount) + } + }).collect().asScala --- End diff -- I agree that the number of elements in every partition is necessary but triggering additional jobs during a plan construction phase is not what we want. It should be up to the user if a job plan is executed at all. Have you thought about using a broadcast variable as a side input of your filter function? > Add support for ORDER BY OFFSET FETCH > ------------------------------------- > > Key: FLINK-3940 > URL: https://issues.apache.org/jira/browse/FLINK-3940 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Affects Versions: 1.1.0 > Reporter: Fabian Hueske > Assignee: GaoLun > Priority: Minor > > Currently only ORDER BY without OFFSET and FETCH are supported. > This issue tracks the effort to add support for OFFSET and FETCH and involves: > - Implementing the execution strategy in `DataSetSort` > - adapting the `DataSetSortRule` to support OFFSET and FETCH > - extending the Table API and validation to support OFFSET and FETCH and > generate a corresponding RelNode. -- This message was sent by Atlassian JIRA (v6.3.4#6332)