I am working on a program using Flink SQL. I initially developed it in a Docker Compose environment where I run Flink and Kafka, and everything worked correctly. However, when I deployed it to another environment, it stopped working. The difference I noticed in the new environment, which only runs on a single node in localhost, is that instead of having parallelism.default: 1, it was set to 4.
The query is: WITH user_zones_state AS ( SELECT eez.code, ez.zoneIds, eez.siteId, eez.ts FROM user_event AS eez LEFT JOIN user_zones_relation FOR SYSTEM_TIME AS OF eez.ts AS ez ON eez.siteId = ez.siteId AND eez.sector = ez.sector ) SELECT code, ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY ts), ARRAY[-1]), -1) AS prev_zoneIds, ts FROM user_zones_state The query does not return any results, nor does it throw any errors. The problem seems to come from the following function: ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY ts), ARRAY[-1]), -1) AS prev_zoneIds If I remove this function, it works in both environments and returns data. I thought the issue might be related to it being a stateful function, so I configured it to use RocksDB for state storage. However, in the Docker Compose environment, I did not configure any state backend, and* it worked fine. It only started working in the new environment when I reduced the parallelism to 1, and I don’t understand why.* When parallelism is 4 it has occasionally returned data. The source I use is Kafka topics with varying numbers of partitions, ranging from 1 to 4, although I don’t think that’s related to the issue.