Piotr Nowojski created FLINK-37213:
--------------------------------------

             Summary: Improve performance of unbounded OVER aggregations
                 Key: FLINK-37213
                 URL: https://issues.apache.org/jira/browse/FLINK-37213
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Runtime
    Affects Versions: 2.0-preview
            Reporter: Piotr Nowojski
            Assignee: Piotr Nowojski


Unbounded over aggregations can be painfully slow.

For example queries like:

{code:sql}

SELECT 
  rowtime, 
  b, 
  c, 
  min(c) OVER (PARTITION BY b ORDER BY rowtime ROWS BETWEEN UNBOUNDED preceding 
AND CURRENT ROW), 
  max(c) OVER (PARTITION BY b ORDER BY rowtime ROWS BETWEEN UNBOUNDED preceding 
AND CURRENT ROW)
FROM T

-- or RANGE equivalent:

SELECT 
  rowtime, 
  b, 
  c, 
  min(c) OVER (PARTITION BY b ORDER BY rowtime RANGE BETWEEN UNBOUNDED 
preceding AND CURRENT ROW), 
  max(c) OVER (PARTITION BY b ORDER BY rowtime RANGE BETWEEN UNBOUNDED 
preceding AND CURRENT ROW)
FROM T
{code}

On each processed watermark, regardless if any output will be produced by this 
query, they need to sort the whole state. 

For example we had a production query that had a key space of ~50 000 000 
elements and we were expecting it to process around 1M records/h 
(~270records/s), with new watermark arriving every ~200ms. Flink SQL even with 
parallelism of 50 was not able to handle this load, averaging ~2 
record/s/operator.

The problem was that on each watermark, so nominally every 200ms, a timer for 
each key was firing, sorting all buffered records and checking if any buffered 
record has {{$rowtime <= watermar}} - given the frequency of watermarks and 
number of keys that was extremely rare. The end result was that single parallel 
instance of the {{RowTimeRowsUnboundedPrecedingFunction}} was firing ~20 000 
timers/s while processing only ~2 records/s.

I'm proposing to add a new version of {{RowTimeRowsUnboundedPrecedingFunction}} 
and {{RowTimeRangeUnboundedPrecedingFunction}} that would more like 
{{StreamExecTemporalSort}}. Registering timer per each encountered {{$rowtime}} 
value, and using timers to access the buffered records in the ascending order 
of {{$rowtime}}. This way we would avoid:
* unnecessarily firing of timers - timer would be fired only if there are 
records to be processed
* on each timer we don't need to sort all of the state, but just access one 
single key in the {{MapState<Long, List<RowData>> inputState}} buffer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to