Motivation There is user demand for support of SQL window functions.
Description Window functions allow computations over a set of rows that are related to the current row. Some window functions resemble aggregate functions, but unlike aggregates, they return a value for each row rather than producing a single result for a group. Public API A SQL window function expression has the following syntax: <aggregate or special window function call> OVER ([PARTITION BY <expr1>, <expr2>, ...] [ORDER BY <expr1> [ASC/DESC], <expr2> [ASC/DESC], ...] [<frame>]) frame is defined as: [<mode> BETWEEN <bound> AND <bound>] bound can be specified as: CURRENT ROW UNBOUNDED PRECEDING/FOLLOWING <expr> PRECEDING/FOLLOWING A frame mode can be one of three types: ROWS: Defines the frame by an exact number of rows, similar to the LIMIT clause. For example, ROWS 3 PRECEDING includes the 3 previous rows and the current row. RANGE: Defines the frame based on a logical value range. For example, RANGE 3 PRECEDING includes rows with values in [current value - 3, current value], and the current row. This works only for types with well-defined ranges, such as numeric and temporal types. GROUPS: Defines the frame by a number of distinct values. For example, GROUPS 3 PRECEDING includes rows with the 3 previous distinct values and the current row. Note: GROUPS mode is not supported in Calcite versions up to and including 1.40.0 and is not considered further. In RANGE mode, rows with equal sort key values (ORDER BY) are called peer rows. Peer index refers to the position of a row among peers (rows with identical sort keys). Implementation Options The proposed algorithm for window function evaluation includes the following steps: Divide rows into partitions based on the PARTITION BY clause. Sort rows within each partition according to the ORDER BY clause. For each row, determine the frame according to the frame clause. Apply the <aggregate or special window function> to the frame. Append the computed result as a new field in the row. Two implementation strategies were considered: Hashing Window Operation: Uses a hash table to divide rows into partitions. Streaming Window Operation: Relies on pre-sorting of rows to create partitions. Proposed Implementation The Streaming Window Operation approach is recommended, as it can reuse existing sort operations. We can merge the sort for partitioning with the sort for ordering by performing a single sort using both PARTITION BY and ORDER BY expressions. Advantages: Simplified implementation—no need to implement sorting within the WindowNode. Better performance with only one sort operation with O(N log N) complexity, instead of a full sort plus per-partition sort. The WindowNode needs to store only the current partition. For frames like ROWS/RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, results can be computed without storing the full partition: For ROWS: Only one row needs to be kept. For RANGE: Only peer rows of the current row need to be kept. Note: This implementation does not prevent adding Hashing Window Operation in the future, which may offer better performance with O(N + N log M) complexity. Logical Planning Window function expressions are extracted from LogicalProject and form LogicalWindow with window function grouping based on identical frame definitions. This is handled by Calcite’s standard rule: CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW. Constants used in window function calls are stored in LogicalWindow. Since the aggregation implementation in Ignite uses field indices only, these constants must be projected into the input of LogicalWindow and removed afterward. Calcite plan pseudocode: LogicalWindow(..., const=[1], aggCall=COUNT($const0)) becomes: LogicalProject(exprs=[$1]) -- keeps only the result of the window function LogicalWindow(..., const=[], aggCall=COUNT($0)) LogicalProject(exprs=[1]) To support this transformation, a new rule is proposed: ProjectWindowConstantsRule. A new heuristic phase HEP_WINDOW_SPLIT is introduced for applying these two rules. CoreRules.FILTER_WINDOW_TRANSPOSE and CoreRules.PROJECT_WINDOW_TRANSPOSE are added to HEP_FILTER_PUSH_DOWN and HEP_PROJECT_PUSH_DOWN. In Calcite 1.38.0, there is a bug in CoreRules.PROJECT_WINDOW_TRANSPOSE — it doesn't check expressions in frame boundaries. Until upgrading to 1.39.0 or above, a patched version of this rule is required. Physical Planning The rule converting LogicalWindow into a physical WindowNode operator creates one physical operator per expression group. It requires the input to be sorted by both PARTITION BY and ORDER BY expressions. When deriving or passing through traits, the WindowNode uses these rules: For DistributionTrait, the proposed distribution must satisfy the partitioning (PARTITION BY) expressions. For CollationTrait, the sort order must begin with partitioning expressions (in any order direction), followed by the ORDER BY expressions. Other traits are either passed through or derived without modification. Execution Window Calculation Algorithm: Check if the incoming row from downstream belongs to the current partition by comparing partitioning keys. If not, finalize and emit the results for the current partition, then initialize a new partition. Add the row to the current partition. If the frame can compute results immediately, compute and emit the result. Frame Boundaries in ROWS Mode For each row, compute its index in the partition and calculate: CURRENT ROW: current index UNBOUNDED PRECEDING: index 0 UNBOUNDED FOLLOWING: partition size - 1 N PRECEDING: current index - N N FOLLOWING: current index + N Frame Boundaries in RANGE Mode Transformations are applied to the sort expression value: CURRENT ROW: unchanged UNBOUNDED PRECEDING/FOLLOWING: returns null (special handling) N PRECEDING/FOLLOWING: supported only when a single ORDER BY expression is present. The sign of N is adjusted according to the sort direction and whether it is PRECEDING or FOLLOWING. The effective boundary value is then computed using functions like ADD or DATEADD, depending on the data type of the sort expression and the value of N. Determine the peer index of the current row (based on sort keys). If the peer index remains unchanged and the frame boundary expressions are constants, the previous frame bounds can be reused for efficiency. Otherwise, apply projection and use binary search to locate the frame bounds. Function Evaluation (General Case) Compute start and end indices within frame. If the start and end haven't changed, reuse the current accumulator result. If only the end changed, incrementally add new rows. If the start changed, reinitialize the accumulator with all rows in the new frame. Further optimization is possible if the aggregation accumulators support removal of previously added rows. Supported Functions Aggregate-based functions: COUNT, SUM, AVG, and other Ignite aggregates Navigation functions: ROW_NUMBER, RANK, DENSE_RANK LEAD, LAG, FIRST_VALUE, LAST_VALUE Distribution functions: PERCENT_RANK, CUME_DIST, NTILE, NTH_VALUE The set of functions can be expanded if required. Functions that can be computed without accessing all rows in a partition: All aggregate functions (with wrappers) ROW_NUMBER RANK DENSE_RANK Other functions like LEAD, LAG, NTILE, etc., require full or random access to the partition. Risks and Assumptions The proposed implementation requires full sort of input rows, which may consume significant memory. This can be mitigated by enhancing the sort operator to support: Partial in-memory sorting Writing sorted partitions to disk Merging sorted runs via N-way merge sort during result production This technique is known as spilling — offloading intermediate sorted data to disk to handle memory pressure efficiently. Discussion Links https://lists.apache.org/thread/hnys5g1nfbjxcbo59939th1pwo74t9np https://issues.apache.org/jira/browse/IGNITE-14777 Reference Links https://www.postgresql.org/docs/current/tutorial-window.html https://substrait.io/relations/physical_relations/#hashing-window-operation https://substrait.io/relations/physical_relations/#streaming-window-operation Tickets IGNITE-14777 Note: This implementation can be ported to Apache Ignite 3, as the Calcite Engine implementation is similar across versions.