[ 
https://issues.apache.org/jira/browse/FLINK-39589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39589:
-----------------------------------
    Labels: pull-request-available rocksdb window  (was: rocksdb window)

> Window TVF aggregation emits incorrect results with RocksDB state backend
> -------------------------------------------------------------------------
>
>                 Key: FLINK-39589
>                 URL: https://issues.apache.org/jira/browse/FLINK-39589
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.18.1
>            Reporter: Kanchi  Masalia
>            Priority: Minor
>              Labels: pull-request-available, rocksdb, window
>
> Window TVF aggregation (TUMBLE/HOP/CUMULATE) silently produces incorrect 
> results when using RocksDB state backend. Per window, only one key emits 
> the correct aggregation - all other keys emit with count=0 and the wrong 
> key. The legacy GROUP BY TUMBLE() syntax works correctly with the same 
> backend and data.
> h3. Reproduction
> Using datagen connector with 5 distinct keys:
> {code:sql}
> CREATE TEMPORARY TABLE datagen_test (
>   event_userid BIGINT,
>   ts BIGINT,
>   event_time AS TO_TIMESTAMP_LTZ(ts, 3),
>   WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
> ) WITH (
>   'connector' = 'datagen',
>   'rows-per-second' = '100000',
>   'fields.event_userid.min' = '1',
>   'fields.event_userid.max' = '5',
>   'fields.ts.kind' = 'sequence',
>   'fields.ts.start' = '1771345700000',
>   'fields.ts.end'   = '1771346600000'
> );
> -- Broken (TVF syntax + RocksDB):
> SELECT event_userid, window_start, window_end, COUNT(*) AS cnt
> FROM TABLE(TUMBLE(TABLE datagen_test, DESCRIPTOR(event_time), INTERVAL '1' 
> MINUTE))
> GROUP BY event_userid, window_start, window_end;
> -- Works (legacy syntax + RocksDB):
> SELECT event_userid,
>        TUMBLE_START(event_time, INTERVAL '1' MINUTE),
>        TUMBLE_END(event_time, INTERVAL '1' MINUTE),
>        COUNT(*) AS cnt
> FROM datagen_test
> GROUP BY event_userid, TUMBLE(event_time, INTERVAL '1' MINUTE);
> {code}
> TVF output shows all rows emitting with the same key and most with cnt=0. 
> Legacy output shows 5 distinct keys with correct counts (~12,000 each).
> Switching to hashmap backend resolves the issue:
> {code:sql}
> SET 'state.backend.type' = 'hashmap';
> {code}
> h3. Root Cause
> RecordsWindowBuffer.flush() reuses the same WindowKey object when 
> requiresCopy=false (RocksDB path). These mutable key references are 
> captured by TimerHeapInternalTimer, so when timers fire, all timers 
> resolve to the last key — reading and clearing the same state repeatedly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to