Motivation

There is user demand for support of SQL window functions.

Description

Window functions allow computations over a set of rows that are related to the 
current row.
Some window functions resemble aggregate functions, but unlike aggregates, they 
return a value for each row rather than producing a single result for a group.

Public API

A SQL window function expression has the following syntax:

<aggregate or special window function call> OVER ([PARTITION BY <expr1>, 
<expr2>, ...] [ORDER BY <expr1> [ASC/DESC], <expr2> [ASC/DESC], ...] [<frame>])
  
frame is defined as:

[<mode> BETWEEN <bound> AND <bound>]
  
bound can be specified as:

CURRENT ROW
UNBOUNDED PRECEDING/FOLLOWING
<expr> PRECEDING/FOLLOWING
  
A frame mode can be one of three types:

ROWS: Defines the frame by an exact number of rows, similar to the LIMIT 
clause. For example, ROWS 3 PRECEDING includes the 3 previous rows and the 
current row.
RANGE: Defines the frame based on a logical value range. For example, RANGE 3 
PRECEDING includes rows with values in [current value - 3, current value], and 
the current row. This works only for types with well-defined ranges, such as 
numeric and temporal types.
GROUPS: Defines the frame by a number of distinct values. For example, GROUPS 3 
PRECEDING includes rows with the 3 previous distinct values and the current row.
Note: GROUPS mode is not supported in Calcite versions up to and including 
1.40.0 and is not considered further.
In RANGE mode, rows with equal sort key values (ORDER BY) are called peer rows.
Peer index refers to the position of a row among peers (rows with identical 
sort keys).

Implementation Options

The proposed algorithm for window function evaluation includes the following 
steps:

Divide rows into partitions based on the PARTITION BY clause.
Sort rows within each partition according to the ORDER BY clause.
For each row, determine the frame according to the frame clause.
Apply the <aggregate or special window function> to the frame.
Append the computed result as a new field in the row.
Two implementation strategies were considered:

Hashing Window Operation: Uses a hash table to divide rows into partitions.
Streaming Window Operation: Relies on pre-sorting of rows to create partitions.
Proposed Implementation

The Streaming Window Operation approach is recommended, as it can reuse 
existing sort operations.
We can merge the sort for partitioning with the sort for ordering by performing 
a single sort using both PARTITION BY and ORDER BY expressions.

Advantages:

Simplified implementation—no need to implement sorting within the WindowNode.
Better performance with only one sort operation with O(N log N) complexity, 
instead of a full sort plus per-partition sort.
The WindowNode needs to store only the current partition.
For frames like ROWS/RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, results 
can be computed without storing the full partition:
For ROWS: Only one row needs to be kept.
For RANGE: Only peer rows of the current row need to be kept.
Note: This implementation does not prevent adding Hashing Window Operation in 
the future, which may offer better performance with O(N + N log M) complexity.

Logical Planning

Window function expressions are extracted from LogicalProject and form 
LogicalWindow with window function grouping based on identical frame 
definitions.
This is handled by Calcite’s standard rule: 
CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW.

Constants used in window function calls are stored in LogicalWindow.
Since the aggregation implementation in Ignite uses field indices only, these 
constants must be projected into the input of LogicalWindow and removed 
afterward.

Calcite plan pseudocode:

LogicalWindow(..., const=[1], aggCall=COUNT($const0))
  
becomes:

LogicalProject(exprs=[$1]) -- keeps only the result of the window function
  LogicalWindow(..., const=[], aggCall=COUNT($0))
    LogicalProject(exprs=[1])
  
To support this transformation, a new rule is proposed: 
ProjectWindowConstantsRule.

A new heuristic phase HEP_WINDOW_SPLIT is introduced for applying these two 
rules.

CoreRules.FILTER_WINDOW_TRANSPOSE and CoreRules.PROJECT_WINDOW_TRANSPOSE are 
added to HEP_FILTER_PUSH_DOWN and HEP_PROJECT_PUSH_DOWN.

In Calcite 1.38.0, there is a bug in CoreRules.PROJECT_WINDOW_TRANSPOSE — it 
doesn't check expressions in frame boundaries.
Until upgrading to 1.39.0 or above, a patched version of this rule is required.
Physical Planning

The rule converting LogicalWindow into a physical WindowNode operator creates 
one physical operator per expression group.
It requires the input to be sorted by both PARTITION BY and ORDER BY 
expressions.

When deriving or passing through traits, the WindowNode uses these rules:

For DistributionTrait, the proposed distribution must satisfy the partitioning 
(PARTITION BY) expressions.
For CollationTrait, the sort order must begin with partitioning expressions (in 
any order direction), followed by the ORDER BY expressions.
Other traits are either passed through or derived without modification.
Execution

Window Calculation Algorithm:

Check if the incoming row from downstream belongs to the current partition by 
comparing partitioning keys.
If not, finalize and emit the results for the current partition, then 
initialize a new partition.
Add the row to the current partition.
If the frame can compute results immediately, compute and emit the result.
Frame Boundaries in ROWS Mode

For each row, compute its index in the partition and calculate:

CURRENT ROW: current index
UNBOUNDED PRECEDING: index 0
UNBOUNDED FOLLOWING: partition size - 1
N PRECEDING: current index - N
N FOLLOWING: current index + N
Frame Boundaries in RANGE Mode

Transformations are applied to the sort expression value:

CURRENT ROW: unchanged
UNBOUNDED PRECEDING/FOLLOWING: returns null (special handling)
N PRECEDING/FOLLOWING: supported only when a single ORDER BY expression is 
present.
The sign of N is adjusted according to the sort direction and whether it is 
PRECEDING or FOLLOWING. The effective boundary value is then computed using 
functions like ADD or DATEADD, depending on the data type of the sort 
expression and the value of N.
Determine the peer index of the current row (based on sort keys).
If the peer index remains unchanged and the frame boundary expressions are 
constants, the previous frame bounds can be reused for efficiency.
Otherwise, apply projection and use binary search to locate the frame bounds.

Function Evaluation (General Case)

Compute start and end indices within frame.
If the start and end haven't changed, reuse the current accumulator result.
If only the end changed, incrementally add new rows.
If the start changed, reinitialize the accumulator with all rows in the new 
frame.
Further optimization is possible if the aggregation accumulators support 
removal of previously added rows.
Supported Functions

Aggregate-based functions: 
COUNT, SUM, AVG, and other Ignite aggregates
Navigation functions:
ROW_NUMBER, RANK, DENSE_RANK
LEAD, LAG, FIRST_VALUE, LAST_VALUE
Distribution functions:
PERCENT_RANK, CUME_DIST, NTILE, NTH_VALUE
The set of functions can be expanded if required.

Functions that can be computed without accessing all rows in a partition:

All aggregate functions (with wrappers)
ROW_NUMBER
RANK
DENSE_RANK
Other functions like LEAD, LAG, NTILE, etc., require full or random access to 
the partition.

Risks and Assumptions

The proposed implementation requires full sort of input rows, which may consume 
significant memory.
This can be mitigated by enhancing the sort operator to support:

Partial in-memory sorting
Writing sorted partitions to disk
Merging sorted runs via N-way merge sort during result production
This technique is known as spilling — offloading intermediate sorted data to 
disk to handle memory pressure efficiently.
Discussion Links

https://lists.apache.org/thread/hnys5g1nfbjxcbo59939th1pwo74t9np
https://issues.apache.org/jira/browse/IGNITE-14777
Reference Links

https://www.postgresql.org/docs/current/tutorial-window.html
https://substrait.io/relations/physical_relations/#hashing-window-operation
https://substrait.io/relations/physical_relations/#streaming-window-operation
Tickets

IGNITE-14777

Note: This implementation can be ported to Apache Ignite 3, as the Calcite 
Engine implementation is similar across versions.

Reply via email to