it was my mistake, number and taskslot and parallelism wasn't right and I was inserting data from FlinkSQL. The insert didn't have enough resources and failed but from flinksql console wasn't reported any error.
El mar, 24 dic 2024 a las 3:03, Xuyang (<xyzhong...@163.com>) escribió: > Hi, > > Did you observe that the number of records received by the Kafka source > task is 0? What about the number of records received by the downstream > tasks of the Kafka source? > > If the number of records received and sent by all tasks in the entire > topology is 0, it suggests that the issue lies with the Kafka source not > reading any data, rather than being related to the stateful over agg with > expression "ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code > ORDER BY ts), ARRAY[-1]), -1) AS prev_zoneIds.". > > > -- > Best! > Xuyang > > > At 2024-12-20 19:22:15, "Guillermo Ortiz Fernández" < > guillermo.ortiz.f...@gmail.com> wrote: > > I have been reviewing the previous query I ran in the flink sql-client > while simultaneously running an INSERT into a Kafka topic (with 4 > partitions) from another flink sql-client. For testing purposes, I set up 8 > task slots and a default parallelism of 4. > > When I run the query, it shows as "RUNNING" in Flink UI, leaving 4 slots > free. After running the INSERT into the Kafka topic, I can confirm, using > a Kafka consumer, that the data has been inserted. However, the query does > not update its results. When I inspect the specific Kafka source task in > the DAG, it displays "Records Received: 0." > > Why might the query not be reading the records even though it is running? > The table configuration includes the option you mentioned earlier. > > > CREATE TABLE user ( > ....) > 'connector' = 'kafka', > 'topic' = 'avro.users-flink', <- 4 partitions > 'properties.bootstrap.servers' = 'xxx', > 'properties.group.id' = 'presenceFlinkGroup', > 'properties.auto.offset.reset' = 'latest', > 'scan.startup.mode' = 'group-offsets', > 'key.format' = 'raw', > 'key.fields' = 'code', > 'value.format' = 'avro', > 'value.fields-include' = 'ALL', > 'scan.watermark.idle-timeout' = '1000', > 'properties.security.protocol' = 'SASL_SSL', > 'properties.sasl.mechanism' = 'GSSAPI', > 'properties.sasl.kerberos.service.name' = 'kafka', > 'properties.sasl.jaas.config' = 'xx', > 'properties.ssl.truststore.location' = 'xx', > 'properties.ssl.truststore.password' = 'xx'); > > El vie, 20 dic 2024 a las 6:49, Xuyang (<xyzhong...@163.com>) escribió: > >> Hi, >> >> In fact, if I'm not mistaken, this issue likely relates to the situation >> where the number of Kafka partitions is less than parallelism. >> >> >> *Speculated Reason*: The over agg based on event time (which in your SQL >> is represented as `ORDER BY ts`) is a watermark-driven operator. It only >> outputs accumulated data once it receives watermarks. I suspect that in >> your scenario, since the number of Kafka partitions is less than >> parallelism 4, some of the source subtasks will not produce watermark. This >> leads to over agg being unable to advance its watermark, as it requires the >> minimum watermark from all input subtask to push the watermark event timer >> forward, resulting in no data being output. >> >> >> *Verification*: In the Flink UI, you can be able to see the watermark >> progress of over agg. Please check if the watermark is not progressing (or >> perhaps it is stuck at Long.Min). >> >> >> *Solutions*: >> >> 1. (Recommended) Set `table.exec.source.idle-timeout`[1] to ignore >> alignment for subtasks that have not emitted a watermark after a certain >> timeout. >> >> 2. Ensure that the concurrency level matches the number of Kafka >> partitions. >> >> >> Looking forward to hearing your feedback on whether these solutions have >> been effective. >> >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-source-idle-timeout >> >> >> -- >> Best! >> Xuyang >> >> >> At 2024-12-20 00:19:11, "Guillermo Ortiz Fernández" < >> guillermo.ortiz.f...@gmail.com> wrote: >> >> I am working on a program using Flink SQL. I initially developed it in a >> Docker Compose environment where I run Flink and Kafka, and everything >> worked correctly. However, when I deployed it to another environment, it >> stopped working. The difference I noticed in the new environment, which >> only runs on a single node in localhost, is that instead of having >> parallelism.default: >> 1, it was set to 4. >> >> The query is: >> >> WITH user_zones_state AS ( >> SELECT >> eez.code, >> ez.zoneIds, >> eez.siteId, >> eez.ts >> FROM user_event AS eez >> LEFT JOIN user_zones_relation FOR SYSTEM_TIME AS OF eez.ts AS ez >> ON eez.siteId = ez.siteId AND eez.sector = ez.sector >> ) >> SELECT >> code, >> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY >> ts), ARRAY[-1]), -1) AS prev_zoneIds, >> ts >> FROM user_zones_state >> >> The query does not return any results, nor does it throw any errors. The >> problem seems to come from the following function: >> >> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY ts), >> ARRAY[-1]), -1) AS prev_zoneIds >> >> If I remove this function, it works in both environments and returns data. >> >> I thought the issue might be related to it being a stateful function, so >> I configured it to use RocksDB for state storage. However, in the Docker >> Compose environment, I did not configure any state backend, and* it >> worked fine. It only started working in the new environment when I reduced >> the parallelism to 1, and I don’t understand why.* When parallelism is 4 >> it has occasionally returned data. >> >> The source I use is Kafka topics with varying numbers of partitions, >> ranging from 1 to 4, although I don’t think that’s related to the issue. >> >>