Yang Li created FLINK-36377: ------------------------------- Summary: Support the use of the LAST_VALUE aggregate function on ROW type data Key: FLINK-36377 URL: https://issues.apache.org/jira/browse/FLINK-36377 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Reporter: Yang Li
h2. Introduction In Flink, after applying a group by, users may use LAST_VALUE to process certain fields to ensure that all fields have corresponding aggregation functions. Currently, LAST_VALUE does not support the ROW type syntax, so users always apply the LAST_VALUE function to each individual field separately, as shown below. SELECT LAST_VALUE(bool_a) AS last_bool_a, LAST_VALUE(int2_b) AS last_int2_b, LAST_VALUE(int4_c) AS last_int4_c, LAST_VALUE(int8_d) AS last_int8_d, LAST_VALUE(float4_e) AS last_float4_e, LAST_VALUE(float4_f) AS last_float4_f, LAST_VALUE(numeric_g) AS last_numeric_g, LAST_VALUE(text_m) AS last_text_m, LAST_VALUE(varchar_p) AS last_varchar_p, date_h FROM source_table GROUP BY date_h If the upstream operator is a retract stream, this approach will lead to redundant StateMap traversal. To facilitate retraction, Flink's internal {{LastValueWithRetractAggFunction}} will store all historical data related to the primary key. When the last value is deleted, it will traverse all keys in the {{orderToValue}} (which maps timestamps to data) and this {{MapView}} is stored in the form of {{{}StateMap{}}}. More {{LAST_VALUE}} functions leads to more times the read and write operations of RocksDB. Therefore, I advocate for handling {{ROW}} types with {{{}LAST_VALUE{}}}, allowing support for all fields with just one {{LAST_VALUE}} function as below. SELECT LAST_VALUE( ROW( bool_a, int2_b, int4_c, int8_d, float4_e, float4_f, numeric_g, text_m, varchar_p ) ) AS row_data, date_h FROM source_table GROUP BY date_h The experiment indicates that applying the {{ROW}} type to the {{LAST_VALUE}} function can improve the processing speed for retract streams, but has no effect on append-only streams. h2. Evaluation: The throughput of jobs was compared based on whether the {{ROW}} type was used in the {{LAST_VALUE}} function, considering both retract and append-only scenarios. h3. Retraction Use a deduplication operator to convert the append-only stream generated by datagen into a retract stream. h4. LAST_VALUE with Separated Fields !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=N2UxY2VkZjAwOWM0M2M4ODYxNzMwZjA3ZDYxNTYzOTVfbElkb2JxUkwzY3hHNEZ4TXZYNUxHaEVQdVk3M25mcWZfVG9rZW46Ym94azRqS3VCeXpQSFl1YjVhaVF0NVBMcHhjXzE3MjczMzU0NjE6MTcyNzMzOTA2MV9WNA|width=903,height=236! h4. LAST_VALUE with ROW !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=MTNkOTI3OTRhYmU3ZmYxMzE4NDhkOWU1NDkxYzZmNTBfNUtKVGp0b0lOd1MyUVpDbnR0SWJ4aGxQd3QwNTZmZXdfVG9rZW46Ym94azR1UTVZUGZFdnFXMktBdWdHMjd6cHVmXzE3MjczMzU0NjE6MTcyNzMzOTA2MV9WNA|width=894,height=233! h4. Summary he two jobs show little difference in throughput (Row 4817: Mean 1808). Through flame graph analysis, applying the ROW type to the LAST_VALUE function reduces the consumption of the aggregate function calls to accumulate, with CPU usage for accumulate being (ROW 20.02%: Separated 66.98%). LastValueWithRetractAccumulator uses MapState storage MapView. Therefore, updating the LastValueWithRetractAccumulator requires reading from or writing to RocksDB. h3. AppendOnly h4. LAST_VALUE with Separated Fields !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=OGExYjk0NTQ3NDNmZDQ4OTI5ODhiZTliM2QzODM5YmVfcE1LQTZSRElaRkZpNmhsRkxndzhSRGxQdFJrTFdZQ0lfVG9rZW46Ym94azRVYWlsWU5UTmJ6VHhRTmRaTnJHRHBmXzE3MjczMzU0NjE6MTcyNzMzOTA2MV9WNA|width=870,height=228! h4. LAST_VALUE with ROW !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=N2JkZjFiMDZjODU4MmEyNGNmNjgzNzQyYzM4YmYzZTlfNUt3VVc1ZmZHeFpoSWRvdm1HYU0yRldxcGtLM1MyY09fVG9rZW46Ym94azRqcTZvNW04NU1KTXpoQUZEa0g4R29mXzE3MjczMzU0NjE6MTcyNzMzOTA2MV9WNA|width=864,height=230! h4. Summary The two jobs show little difference in throughput (Row 13411: Mean 10673). Further examination of the flame graphs for both processes reveals that the bottleneck for both jobs lies in getting {{RocksDBValueState}} which is called by {{{}GroupFunction{}}}. Using {{ROW}} aggregation does not yield significant optimization in this part. I suspect it's because Flink uses RowData to store data from multiple Accumulators, and every time the {{accState}} invokes the {{value}} method, it reads all the Accumulators at the same time. Therefore, the use of ROW optimization might not be very effective. h2. Conclusion # Using ROW type for LAST_VALUE Aggregation can improve the processing speed for retract streams, with effectiveness proportional to the number of fields contained in the {{{}ROW{}}}. # Using ROW type for LAST_VALUE Aggregation results in limited improvements , as the optimization effect on state backend read speed is not significant. -- This message was sent by Atlassian Jira (v8.20.10#820010)