I have been reviewing the previous query I ran in the flink sql-client while simultaneously running an INSERT into a Kafka topic (with 4 partitions) from another flink sql-client. For testing purposes, I set up 8 task slots and a default parallelism of 4.
When I run the query, it shows as "RUNNING" in Flink UI, leaving 4 slots free. After running the INSERT into the Kafka topic, I can confirm, using a Kafka consumer, that the data has been inserted. However, the query does not update its results. When I inspect the specific Kafka source task in the DAG, it displays "Records Received: 0." Why might the query not be reading the records even though it is running? The table configuration includes the option you mentioned earlier. CREATE TABLE user ( ....) 'connector' = 'kafka', 'topic' = 'avro.users-flink', <- 4 partitions 'properties.bootstrap.servers' = 'xxx', 'properties.group.id' = 'presenceFlinkGroup', 'properties.auto.offset.reset' = 'latest', 'scan.startup.mode' = 'group-offsets', 'key.format' = 'raw', 'key.fields' = 'code', 'value.format' = 'avro', 'value.fields-include' = 'ALL', 'scan.watermark.idle-timeout' = '1000', 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.sasl.jaas.config' = 'xx', 'properties.ssl.truststore.location' = 'xx', 'properties.ssl.truststore.password' = 'xx'); El vie, 20 dic 2024 a las 6:49, Xuyang (<xyzhong...@163.com>) escribió: > Hi, > > In fact, if I'm not mistaken, this issue likely relates to the situation > where the number of Kafka partitions is less than parallelism. > > > *Speculated Reason*: The over agg based on event time (which in your SQL > is represented as `ORDER BY ts`) is a watermark-driven operator. It only > outputs accumulated data once it receives watermarks. I suspect that in > your scenario, since the number of Kafka partitions is less than > parallelism 4, some of the source subtasks will not produce watermark. This > leads to over agg being unable to advance its watermark, as it requires the > minimum watermark from all input subtask to push the watermark event timer > forward, resulting in no data being output. > > > *Verification*: In the Flink UI, you can be able to see the watermark > progress of over agg. Please check if the watermark is not progressing (or > perhaps it is stuck at Long.Min). > > > *Solutions*: > > 1. (Recommended) Set `table.exec.source.idle-timeout`[1] to ignore > alignment for subtasks that have not emitted a watermark after a certain > timeout. > > 2. Ensure that the concurrency level matches the number of Kafka > partitions. > > > Looking forward to hearing your feedback on whether these solutions have > been effective. > > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-source-idle-timeout > > > -- > Best! > Xuyang > > > At 2024-12-20 00:19:11, "Guillermo Ortiz Fernández" < > guillermo.ortiz.f...@gmail.com> wrote: > > I am working on a program using Flink SQL. I initially developed it in a > Docker Compose environment where I run Flink and Kafka, and everything > worked correctly. However, when I deployed it to another environment, it > stopped working. The difference I noticed in the new environment, which > only runs on a single node in localhost, is that instead of having > parallelism.default: > 1, it was set to 4. > > The query is: > > WITH user_zones_state AS ( > SELECT > eez.code, > ez.zoneIds, > eez.siteId, > eez.ts > FROM user_event AS eez > LEFT JOIN user_zones_relation FOR SYSTEM_TIME AS OF eez.ts AS ez > ON eez.siteId = ez.siteId AND eez.sector = ez.sector > ) > SELECT > code, > ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY > ts), ARRAY[-1]), -1) AS prev_zoneIds, > ts > FROM user_zones_state > > The query does not return any results, nor does it throw any errors. The > problem seems to come from the following function: > > ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY ts), > ARRAY[-1]), -1) AS prev_zoneIds > > If I remove this function, it works in both environments and returns data. > > I thought the issue might be related to it being a stateful function, so I > configured it to use RocksDB for state storage. However, in the Docker > Compose environment, I did not configure any state backend, and* it > worked fine. It only started working in the new environment when I reduced > the parallelism to 1, and I don’t understand why.* When parallelism is 4 > it has occasionally returned data. > > The source I use is Kafka topics with varying numbers of partitions, > ranging from 1 to 4, although I don’t think that’s related to the issue. > >