Xingcan Cui created FLINK-34926: ----------------------------------- Summary: Adaptive auto parallelism doesn't work for a query Key: FLINK-34926 URL: https://issues.apache.org/jira/browse/FLINK-34926 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.18.1 Reporter: Xingcan Cui Attachments: image_720.png
We have the following query running in batch mode. {code:java} WITH FEATURE_INCLUSION AS ( SELECT insertion_id, -- Not unique features -- Array<Row<key, value>> FROM features_table ), TOTAL AS ( SELECT COUNT(DISTINCT insertion_id) total_id FROM FEATURE_INCLUSION ), FEATURE_INCLUSION_COUNTS AS ( SELECT `key`, COUNT(DISTINCT insertion_id) AS id_count FROM FEATURE_INCLUSION, UNNEST(features) as t (`key`, `value`) WHERE TRUE GROUP BY `key` ), RESULTS AS ( SELECT `key` FROM FEATURE_INCLUSION_COUNTS, TOTAL WHERE (1.0 * id_count)/total_id > 0.1 ) SELECT JSON_ARRAYAGG(`key`) AS feature_ids, FROM RESULTS{code} The parallelism adaptively set by Flink for the following operator was always 1. {code:java} [37]:HashAggregate(isMerge=[true], groupBy=[key, insertion_id], select=[key, insertion_id]) +- [38]:LocalHashAggregate(groupBy=[key], select=[key, Partial_COUNT(insertion_id) AS count$0]){code} If we turn off `execution.batch.adaptive.auto-parallelism.enabled` and manually set `parallelism.default` to be greater than one, it worked. The screenshot of the full job graph is attached. !image_720.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)