Hi,

Did you observe that the number of records received by the Kafka source task is 
0? What about the number of records received by the downstream tasks of the 
Kafka source? 

If the number of records received and sent by all tasks in the entire topology 
is 0, it suggests that the issue lies with the Kafka source not reading any 
data, rather than being related to the stateful over agg with expression 
"ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY ts), 
ARRAY[-1]), -1) AS prev_zoneIds.".




--

    Best!
    Xuyang




At 2024-12-20 19:22:15, "Guillermo Ortiz Fernández" 
<guillermo.ortiz.f...@gmail.com> wrote:

I have been reviewing the previous query I ran in the flink sql-client while 
simultaneously running an INSERT into a Kafka topic (with 4 partitions) from 
another flink sql-client. For testing purposes, I set up 8 task slots and a 
default parallelism of 4.

When I run the query, it shows as "RUNNING" in Flink UI, leaving 4 slots free. 
After running the INSERT into the Kafka topic, I can confirm, using a Kafka 
consumer, that the data has been inserted. However, the query does not update 
its results. When I inspect the specific Kafka source task in the DAG, it 
displays "Records Received: 0."

Why might the query not be reading the records even though it is running? The 
table configuration includes the option you mentioned earlier.




CREATE TABLE user (
....)
 'connector' = 'kafka',
 'topic' = 'avro.users-flink',  <- 4 partitions
 'properties.bootstrap.servers' = 'xxx',
 'properties.group.id' = 'presenceFlinkGroup',
 'properties.auto.offset.reset' = 'latest',
 'scan.startup.mode' = 'group-offsets',
 'key.format' = 'raw',
 'key.fields' = 'code',
 'value.format' = 'avro',
 'value.fields-include' = 'ALL',
 'scan.watermark.idle-timeout' = '1000',
 'properties.security.protocol' = 'SASL_SSL',
 'properties.sasl.mechanism' = 'GSSAPI',
 'properties.sasl.kerberos.service.name' = 'kafka',
 'properties.sasl.jaas.config' = 'xx',
 'properties.ssl.truststore.location' = 'xx',
 'properties.ssl.truststore.password' = 'xx');



El vie, 20 dic 2024 a las 6:49, Xuyang (<xyzhong...@163.com>) escribió:


Hi, 

In fact, if I'm not mistaken, this issue likely relates to the situation where 
the number of Kafka partitions is less than parallelism. 




Speculated Reason: The over agg based on event time (which in your SQL is 
represented as `ORDER BY ts`) is a watermark-driven operator. It only outputs 
accumulated data once it receives watermarks. I suspect that in your scenario, 
since the number of Kafka partitions is less than parallelism 4, some of the 
source subtasks will not produce watermark. This leads to over agg being unable 
to advance its watermark, as it requires the minimum watermark from all input 
subtask to push the watermark event timer forward, resulting in no data being 
output. 




Verification: In the Flink UI, you can be able to see the watermark progress of 
over agg. Please check if the watermark is not progressing (or perhaps it is 
stuck at Long.Min). 




Solutions: 

1. (Recommended) Set `table.exec.source.idle-timeout`[1] to ignore alignment 
for subtasks that have not emitted a watermark after a certain timeout. 

2. Ensure that the concurrency level matches the number of Kafka partitions.




Looking forward to hearing your feedback on whether these solutions have been 
effective.




[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-source-idle-timeout




--

    Best!
    Xuyang




At 2024-12-20 00:19:11, "Guillermo Ortiz Fernández" 
<guillermo.ortiz.f...@gmail.com> wrote:

I am working on a program using Flink SQL. I initially developed it in a Docker 
Compose environment where I run Flink and Kafka, and everything worked 
correctly. However, when I deployed it to another environment, it stopped 
working. The difference I noticed in the new environment, which only runs on a 
single node in localhost, is that instead of having parallelism.default: 1, it 
was set to 4.

The query is:

  WITH user_zones_state AS (
    SELECT
        eez.code,
        ez.zoneIds,
        eez.siteId,
        eez.ts
    FROM user_event AS eez
    LEFT JOIN user_zones_relation FOR SYSTEM_TIME AS OF eez.ts AS ez
    ON eez.siteId = ez.siteId AND eez.sector = ez.sector
  )
  SELECT
    code,
    ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY ts), 
ARRAY[-1]), -1)  AS prev_zoneIds,
    ts
  FROM user_zones_state  

The query does not return any results, nor does it throw any errors. The 
problem seems to come from the following function:

ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY ts), 
ARRAY[-1]), -1) AS prev_zoneIds




If I remove this function, it works in both environments and returns data.

I thought the issue might be related to it being a stateful function, so I 
configured it to use RocksDB for state storage. However, in the Docker Compose 
environment, I did not configure any state backend, and it worked fine. It only 
started working in the new environment when I reduced the parallelism to 1, and 
I don’t understand why. When parallelism is 4 it has occasionally returned data.

The source I use is Kafka topics with varying numbers of partitions, ranging 
from 1 to 4, although I don’t think that’s related to the issue.

Reply via email to