Hi, In fact, if I'm not mistaken, this issue likely relates to the situation where the number of Kafka partitions is less than parallelism.
Speculated Reason: The over agg based on event time (which in your SQL is represented as `ORDER BY ts`) is a watermark-driven operator. It only outputs accumulated data once it receives watermarks. I suspect that in your scenario, since the number of Kafka partitions is less than parallelism 4, some of the source subtasks will not produce watermark. This leads to over agg being unable to advance its watermark, as it requires the minimum watermark from all input subtask to push the watermark event timer forward, resulting in no data being output. Verification: In the Flink UI, you can be able to see the watermark progress of over agg. Please check if the watermark is not progressing (or perhaps it is stuck at Long.Min). Solutions: 1. (Recommended) Set `table.exec.source.idle-timeout`[1] to ignore alignment for subtasks that have not emitted a watermark after a certain timeout. 2. Ensure that the concurrency level matches the number of Kafka partitions. Looking forward to hearing your feedback on whether these solutions have been effective. [1] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-source-idle-timeout -- Best! Xuyang At 2024-12-20 00:19:11, "Guillermo Ortiz Fernández" <guillermo.ortiz.f...@gmail.com> wrote: I am working on a program using Flink SQL. I initially developed it in a Docker Compose environment where I run Flink and Kafka, and everything worked correctly. However, when I deployed it to another environment, it stopped working. The difference I noticed in the new environment, which only runs on a single node in localhost, is that instead of having parallelism.default: 1, it was set to 4. The query is: WITH user_zones_state AS ( SELECT eez.code, ez.zoneIds, eez.siteId, eez.ts FROM user_event AS eez LEFT JOIN user_zones_relation FOR SYSTEM_TIME AS OF eez.ts AS ez ON eez.siteId = ez.siteId AND eez.sector = ez.sector ) SELECT code, ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY ts), ARRAY[-1]), -1) AS prev_zoneIds, ts FROM user_zones_state The query does not return any results, nor does it throw any errors. The problem seems to come from the following function: ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY ts), ARRAY[-1]), -1) AS prev_zoneIds If I remove this function, it works in both environments and returns data. I thought the issue might be related to it being a stateful function, so I configured it to use RocksDB for state storage. However, in the Docker Compose environment, I did not configure any state backend, and it worked fine. It only started working in the new environment when I reduced the parallelism to 1, and I don’t understand why. When parallelism is 4 it has occasionally returned data. The source I use is Kafka topics with varying numbers of partitions, ranging from 1 to 4, although I don’t think that’s related to the issue.