Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3649#discussion_r109072158 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -204,21 +183,145 @@ class UnboundedEventTimeOverProcessFunction( * If timestamps arrive in order (as in case of using the RocksDB state backend) this is just * an append with O(1). */ - private def insertToSortedList(recordTimeStamp: Long) = { + private def insertToSortedList(recordTimestamp: Long) = { val listIterator = sortedTimestamps.listIterator(sortedTimestamps.size) var continue = true while (listIterator.hasPrevious && continue) { val timestamp = listIterator.previous - if (recordTimeStamp >= timestamp) { + if (recordTimestamp >= timestamp) { listIterator.next - listIterator.add(recordTimeStamp) + listIterator.add(recordTimestamp) continue = false } } if (continue) { - sortedTimestamps.addFirst(recordTimeStamp) + sortedTimestamps.addFirst(recordTimestamp) } } + /** + * Process the same timestamp datas, the mechanism is different between + * rows and range window. + */ + def processElementsWithSameTimestamp( + curRowList: JList[Row], + lastAccumulator: Row, + out: Collector[Row]): Unit + +} + +/** + * A ProcessFunction to support unbounded ROWS window. + * With the ROWS option you define on a physical level how many rows are included in your window frame --- End diff -- Thanks for reminding me, i will pay attention next time.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---