Hi, Did you observe that the number of records received by the Kafka source task is 0? What about the number of records received by the downstream tasks of the Kafka source?
If the number of records received and sent by all tasks in the entire topology is 0, it suggests that the issue lies with the Kafka source not reading any data, rather than being related to the stateful over agg with expression "ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY ts), ARRAY[-1]), -1) AS prev_zoneIds.". -- Best! Xuyang At 2024-12-20 19:22:15, "Guillermo Ortiz Fernández" <guillermo.ortiz.f...@gmail.com> wrote: I have been reviewing the previous query I ran in the flink sql-client while simultaneously running an INSERT into a Kafka topic (with 4 partitions) from another flink sql-client. For testing purposes, I set up 8 task slots and a default parallelism of 4. When I run the query, it shows as "RUNNING" in Flink UI, leaving 4 slots free. After running the INSERT into the Kafka topic, I can confirm, using a Kafka consumer, that the data has been inserted. However, the query does not update its results. When I inspect the specific Kafka source task in the DAG, it displays "Records Received: 0." Why might the query not be reading the records even though it is running? The table configuration includes the option you mentioned earlier. CREATE TABLE user ( ....) 'connector' = 'kafka', 'topic' = 'avro.users-flink', <- 4 partitions 'properties.bootstrap.servers' = 'xxx', 'properties.group.id' = 'presenceFlinkGroup', 'properties.auto.offset.reset' = 'latest', 'scan.startup.mode' = 'group-offsets', 'key.format' = 'raw', 'key.fields' = 'code', 'value.format' = 'avro', 'value.fields-include' = 'ALL', 'scan.watermark.idle-timeout' = '1000', 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.sasl.jaas.config' = 'xx', 'properties.ssl.truststore.location' = 'xx', 'properties.ssl.truststore.password' = 'xx'); El vie, 20 dic 2024 a las 6:49, Xuyang (<xyzhong...@163.com>) escribió: Hi, In fact, if I'm not mistaken, this issue likely relates to the situation where the number of Kafka partitions is less than parallelism. Speculated Reason: The over agg based on event time (which in your SQL is represented as `ORDER BY ts`) is a watermark-driven operator. It only outputs accumulated data once it receives watermarks. I suspect that in your scenario, since the number of Kafka partitions is less than parallelism 4, some of the source subtasks will not produce watermark. This leads to over agg being unable to advance its watermark, as it requires the minimum watermark from all input subtask to push the watermark event timer forward, resulting in no data being output. Verification: In the Flink UI, you can be able to see the watermark progress of over agg. Please check if the watermark is not progressing (or perhaps it is stuck at Long.Min). Solutions: 1. (Recommended) Set `table.exec.source.idle-timeout`[1] to ignore alignment for subtasks that have not emitted a watermark after a certain timeout. 2. Ensure that the concurrency level matches the number of Kafka partitions. Looking forward to hearing your feedback on whether these solutions have been effective. [1] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-source-idle-timeout -- Best! Xuyang At 2024-12-20 00:19:11, "Guillermo Ortiz Fernández" <guillermo.ortiz.f...@gmail.com> wrote: I am working on a program using Flink SQL. I initially developed it in a Docker Compose environment where I run Flink and Kafka, and everything worked correctly. However, when I deployed it to another environment, it stopped working. The difference I noticed in the new environment, which only runs on a single node in localhost, is that instead of having parallelism.default: 1, it was set to 4. The query is: WITH user_zones_state AS ( SELECT eez.code, ez.zoneIds, eez.siteId, eez.ts FROM user_event AS eez LEFT JOIN user_zones_relation FOR SYSTEM_TIME AS OF eez.ts AS ez ON eez.siteId = ez.siteId AND eez.sector = ez.sector ) SELECT code, ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY ts), ARRAY[-1]), -1) AS prev_zoneIds, ts FROM user_zones_state The query does not return any results, nor does it throw any errors. The problem seems to come from the following function: ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY ts), ARRAY[-1]), -1) AS prev_zoneIds If I remove this function, it works in both environments and returns data. I thought the issue might be related to it being a stateful function, so I configured it to use RocksDB for state storage. However, in the Docker Compose environment, I did not configure any state backend, and it worked fine. It only started working in the new environment when I reduced the parallelism to 1, and I don’t understand why. When parallelism is 4 it has occasionally returned data. The source I use is Kafka topics with varying numbers of partitions, ranging from 1 to 4, although I don’t think that’s related to the issue.