Piotr Nowojski created FLINK-37213: -------------------------------------- Summary: Improve performance of unbounded OVER aggregations Key: FLINK-37213 URL: https://issues.apache.org/jira/browse/FLINK-37213 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Affects Versions: 2.0-preview Reporter: Piotr Nowojski Assignee: Piotr Nowojski
Unbounded over aggregations can be painfully slow. For example queries like: {code:sql} SELECT rowtime, b, c, min(c) OVER (PARTITION BY b ORDER BY rowtime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW), max(c) OVER (PARTITION BY b ORDER BY rowtime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) FROM T -- or RANGE equivalent: SELECT rowtime, b, c, min(c) OVER (PARTITION BY b ORDER BY rowtime RANGE BETWEEN UNBOUNDED preceding AND CURRENT ROW), max(c) OVER (PARTITION BY b ORDER BY rowtime RANGE BETWEEN UNBOUNDED preceding AND CURRENT ROW) FROM T {code} On each processed watermark, regardless if any output will be produced by this query, they need to sort the whole state. For example we had a production query that had a key space of ~50 000 000 elements and we were expecting it to process around 1M records/h (~270records/s), with new watermark arriving every ~200ms. Flink SQL even with parallelism of 50 was not able to handle this load, averaging ~2 record/s/operator. The problem was that on each watermark, so nominally every 200ms, a timer for each key was firing, sorting all buffered records and checking if any buffered record has {{$rowtime <= watermar}} - given the frequency of watermarks and number of keys that was extremely rare. The end result was that single parallel instance of the {{RowTimeRowsUnboundedPrecedingFunction}} was firing ~20 000 timers/s while processing only ~2 records/s. I'm proposing to add a new version of {{RowTimeRowsUnboundedPrecedingFunction}} and {{RowTimeRangeUnboundedPrecedingFunction}} that would more like {{StreamExecTemporalSort}}. Registering timer per each encountered {{$rowtime}} value, and using timers to access the buffered records in the ascending order of {{$rowtime}}. This way we would avoid: * unnecessarily firing of timers - timer would be fired only if there are records to be processed * on each timer we don't need to sort all of the state, but just access one single key in the {{MapState<Long, List<RowData>> inputState}} buffer. -- This message was sent by Atlassian Jira (v8.20.10#820010)