it was my mistake, number and taskslot and parallelism wasn't right and I
was inserting data from FlinkSQL. The insert didn't have enough resources
and failed but from flinksql console wasn't reported any error.

El mar, 24 dic 2024 a las 3:03, Xuyang (<xyzhong...@163.com>) escribió:

> Hi,
>
> Did you observe that the number of records received by the Kafka source
> task is 0? What about the number of records received by the downstream
> tasks of the Kafka source?
>
> If the number of records received and sent by all tasks in the entire
> topology is 0, it suggests that the issue lies with the Kafka source not
> reading any data, rather than being related to the stateful over agg with
> expression "ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code
> ORDER BY ts), ARRAY[-1]), -1) AS prev_zoneIds.".
>
>
> --
>     Best!
>     Xuyang
>
>
> At 2024-12-20 19:22:15, "Guillermo Ortiz Fernández" <
> guillermo.ortiz.f...@gmail.com> wrote:
>
> I have been reviewing the previous query I ran in the flink sql-client
> while simultaneously running an INSERT into a Kafka topic (with 4
> partitions) from another flink sql-client. For testing purposes, I set up 8
> task slots and a default parallelism of 4.
>
> When I run the query, it shows as "RUNNING" in Flink UI, leaving 4 slots
> free. After running the INSERT into the Kafka topic, I can confirm, using
> a Kafka consumer, that the data has been inserted. However, the query does
> not update its results. When I inspect the specific Kafka source task in
> the DAG, it displays "Records Received: 0."
>
> Why might the query not be reading the records even though it is running?
> The table configuration includes the option you mentioned earlier.
>
>
> CREATE TABLE user (
> ....)
>  'connector' = 'kafka',
>  'topic' = 'avro.users-flink',  <- 4 partitions
>  'properties.bootstrap.servers' = 'xxx',
>  'properties.group.id' = 'presenceFlinkGroup',
>  'properties.auto.offset.reset' = 'latest',
>  'scan.startup.mode' = 'group-offsets',
>  'key.format' = 'raw',
>  'key.fields' = 'code',
>  'value.format' = 'avro',
>  'value.fields-include' = 'ALL',
>  'scan.watermark.idle-timeout' = '1000',
>  'properties.security.protocol' = 'SASL_SSL',
>  'properties.sasl.mechanism' = 'GSSAPI',
>  'properties.sasl.kerberos.service.name' = 'kafka',
>  'properties.sasl.jaas.config' = 'xx',
>  'properties.ssl.truststore.location' = 'xx',
>  'properties.ssl.truststore.password' = 'xx');
>
> El vie, 20 dic 2024 a las 6:49, Xuyang (<xyzhong...@163.com>) escribió:
>
>> Hi,
>>
>> In fact, if I'm not mistaken, this issue likely relates to the situation
>> where the number of Kafka partitions is less than parallelism.
>>
>>
>> *Speculated Reason*: The over agg based on event time (which in your SQL
>> is represented as `ORDER BY ts`) is a watermark-driven operator. It only
>> outputs accumulated data once it receives watermarks. I suspect that in
>> your scenario, since the number of Kafka partitions is less than
>> parallelism 4, some of the source subtasks will not produce watermark. This
>> leads to over agg being unable to advance its watermark, as it requires the
>> minimum watermark from all input subtask to push the watermark event timer
>> forward, resulting in no data being output.
>>
>>
>> *Verification*: In the Flink UI, you can be able to see the watermark
>> progress of over agg. Please check if the watermark is not progressing (or
>> perhaps it is stuck at Long.Min).
>>
>>
>> *Solutions*:
>>
>> 1. (Recommended) Set `table.exec.source.idle-timeout`[1] to ignore
>> alignment for subtasks that have not emitted a watermark after a certain
>> timeout.
>>
>> 2. Ensure that the concurrency level matches the number of Kafka
>> partitions.
>>
>>
>> Looking forward to hearing your feedback on whether these solutions have
>> been effective.
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-source-idle-timeout
>>
>>
>> --
>>     Best!
>>     Xuyang
>>
>>
>> At 2024-12-20 00:19:11, "Guillermo Ortiz Fernández" <
>> guillermo.ortiz.f...@gmail.com> wrote:
>>
>> I am working on a program using Flink SQL. I initially developed it in a
>> Docker Compose environment where I run Flink and Kafka, and everything
>> worked correctly. However, when I deployed it to another environment, it
>> stopped working. The difference I noticed in the new environment, which
>> only runs on a single node in localhost, is that instead of having 
>> parallelism.default:
>> 1, it was set to 4.
>>
>> The query is:
>>
>>   WITH user_zones_state AS (
>>     SELECT
>>         eez.code,
>>         ez.zoneIds,
>>         eez.siteId,
>>         eez.ts
>>     FROM user_event AS eez
>>     LEFT JOIN user_zones_relation FOR SYSTEM_TIME AS OF eez.ts AS ez
>>     ON eez.siteId = ez.siteId AND eez.sector = ez.sector
>>   )
>>   SELECT
>>     code,
>>     ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY
>> ts), ARRAY[-1]), -1)  AS prev_zoneIds,
>>     ts
>>   FROM user_zones_state
>>
>> The query does not return any results, nor does it throw any errors. The
>> problem seems to come from the following function:
>>
>> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY ts), 
>> ARRAY[-1]), -1) AS prev_zoneIds
>>
>> If I remove this function, it works in both environments and returns data.
>>
>> I thought the issue might be related to it being a stateful function, so
>> I configured it to use RocksDB for state storage. However, in the Docker
>> Compose environment, I did not configure any state backend, and* it
>> worked fine. It only started working in the new environment when I reduced
>> the parallelism to 1, and I don’t understand why.* When parallelism is 4
>> it has occasionally returned data.
>>
>> The source I use is Kafka topics with varying numbers of partitions,
>> ranging from 1 to 4, although I don’t think that’s related to the issue.
>>
>>

Reply via email to