Xingcan Cui created FLINK-34926:
-----------------------------------

             Summary: Adaptive auto parallelism doesn't work for a query
                 Key: FLINK-34926
                 URL: https://issues.apache.org/jira/browse/FLINK-34926
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.18.1
            Reporter: Xingcan Cui
         Attachments: image_720.png

We have the following query running in batch mode.

 
{code:java}
WITH FEATURE_INCLUSION AS (
    SELECT
        insertion_id, -- Not unique
        features -- Array<Row<key, value>>
    FROM
        features_table
),
TOTAL AS (
    SELECT
        COUNT(DISTINCT insertion_id) total_id
    FROM
        FEATURE_INCLUSION
),
FEATURE_INCLUSION_COUNTS AS (
    SELECT
        `key`,
        COUNT(DISTINCT insertion_id) AS id_count
    FROM
        FEATURE_INCLUSION,
        UNNEST(features) as t (`key`, `value`)
    WHERE
        TRUE
    GROUP BY
        `key`
),
RESULTS AS (
    SELECT
        `key`
    FROM
        FEATURE_INCLUSION_COUNTS,
        TOTAL
    WHERE
       (1.0 * id_count)/total_id > 0.1
)
SELECT
    JSON_ARRAYAGG(`key`) AS feature_ids,
FROM
    RESULTS{code}
The parallelism adaptively set by Flink for the following operator was always 1.
{code:java}
[37]:HashAggregate(isMerge=[true], groupBy=[key, insertion_id], select=[key, 
insertion_id])
+- [38]:LocalHashAggregate(groupBy=[key], select=[key, 
Partial_COUNT(insertion_id) AS count$0]){code}
If we turn off `execution.batch.adaptive.auto-parallelism.enabled` and manually 
set `parallelism.default` to be greater than one, it worked.

The screenshot of the full job graph is attached. !image_720.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to