I am working on a program using Flink SQL. I initially developed it in a
Docker Compose environment where I run Flink and Kafka, and everything
worked correctly. However, when I deployed it to another environment, it
stopped working. The difference I noticed in the new environment, which
only runs on a single node in localhost, is that instead of having
parallelism.default:
1, it was set to 4.

The query is:

  WITH user_zones_state AS (
    SELECT
        eez.code,
        ez.zoneIds,
        eez.siteId,
        eez.ts
    FROM user_event AS eez
    LEFT JOIN user_zones_relation FOR SYSTEM_TIME AS OF eez.ts AS ez
    ON eez.siteId = ez.siteId AND eez.sector = ez.sector
  )
  SELECT
    code,
    ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY
ts), ARRAY[-1]), -1)  AS prev_zoneIds,
    ts
  FROM user_zones_state

The query does not return any results, nor does it throw any errors. The
problem seems to come from the following function:

ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY
ts), ARRAY[-1]), -1) AS prev_zoneIds

If I remove this function, it works in both environments and returns data.

I thought the issue might be related to it being a stateful function, so I
configured it to use RocksDB for state storage. However, in the Docker
Compose environment, I did not configure any state backend, and* it worked
fine. It only started working in the new environment when I reduced the
parallelism to 1, and I don’t understand why.* When parallelism is 4 it has
occasionally returned data.

The source I use is Kafka topics with varying numbers of partitions,
ranging from 1 to 4, although I don’t think that’s related to the issue.

Reply via email to