I have been reviewing the previous query I ran in the flink sql-client
while simultaneously running an INSERT into a Kafka topic (with 4
partitions) from another flink sql-client. For testing purposes, I set up 8
task slots and a default parallelism of 4.

When I run the query, it shows as "RUNNING" in Flink UI, leaving 4 slots
free. After running the INSERT into the Kafka topic, I can confirm, using a
Kafka consumer, that the data has been inserted. However, the query does
not update its results. When I inspect the specific Kafka source task in
the DAG, it displays "Records Received: 0."

Why might the query not be reading the records even though it is running?
The table configuration includes the option you mentioned earlier.


CREATE TABLE user (
....)
 'connector' = 'kafka',
 'topic' = 'avro.users-flink',  <- 4 partitions
 'properties.bootstrap.servers' = 'xxx',
 'properties.group.id' = 'presenceFlinkGroup',
 'properties.auto.offset.reset' = 'latest',
 'scan.startup.mode' = 'group-offsets',
 'key.format' = 'raw',
 'key.fields' = 'code',
 'value.format' = 'avro',
 'value.fields-include' = 'ALL',
 'scan.watermark.idle-timeout' = '1000',
 'properties.security.protocol' = 'SASL_SSL',
 'properties.sasl.mechanism' = 'GSSAPI',
 'properties.sasl.kerberos.service.name' = 'kafka',
 'properties.sasl.jaas.config' = 'xx',
 'properties.ssl.truststore.location' = 'xx',
 'properties.ssl.truststore.password' = 'xx');

El vie, 20 dic 2024 a las 6:49, Xuyang (<xyzhong...@163.com>) escribió:

> Hi,
>
> In fact, if I'm not mistaken, this issue likely relates to the situation
> where the number of Kafka partitions is less than parallelism.
>
>
> *Speculated Reason*: The over agg based on event time (which in your SQL
> is represented as `ORDER BY ts`) is a watermark-driven operator. It only
> outputs accumulated data once it receives watermarks. I suspect that in
> your scenario, since the number of Kafka partitions is less than
> parallelism 4, some of the source subtasks will not produce watermark. This
> leads to over agg being unable to advance its watermark, as it requires the
> minimum watermark from all input subtask to push the watermark event timer
> forward, resulting in no data being output.
>
>
> *Verification*: In the Flink UI, you can be able to see the watermark
> progress of over agg. Please check if the watermark is not progressing (or
> perhaps it is stuck at Long.Min).
>
>
> *Solutions*:
>
> 1. (Recommended) Set `table.exec.source.idle-timeout`[1] to ignore
> alignment for subtasks that have not emitted a watermark after a certain
> timeout.
>
> 2. Ensure that the concurrency level matches the number of Kafka
> partitions.
>
>
> Looking forward to hearing your feedback on whether these solutions have
> been effective.
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-source-idle-timeout
>
>
> --
>     Best!
>     Xuyang
>
>
> At 2024-12-20 00:19:11, "Guillermo Ortiz Fernández" <
> guillermo.ortiz.f...@gmail.com> wrote:
>
> I am working on a program using Flink SQL. I initially developed it in a
> Docker Compose environment where I run Flink and Kafka, and everything
> worked correctly. However, when I deployed it to another environment, it
> stopped working. The difference I noticed in the new environment, which
> only runs on a single node in localhost, is that instead of having 
> parallelism.default:
> 1, it was set to 4.
>
> The query is:
>
>   WITH user_zones_state AS (
>     SELECT
>         eez.code,
>         ez.zoneIds,
>         eez.siteId,
>         eez.ts
>     FROM user_event AS eez
>     LEFT JOIN user_zones_relation FOR SYSTEM_TIME AS OF eez.ts AS ez
>     ON eez.siteId = ez.siteId AND eez.sector = ez.sector
>   )
>   SELECT
>     code,
>     ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY
> ts), ARRAY[-1]), -1)  AS prev_zoneIds,
>     ts
>   FROM user_zones_state
>
> The query does not return any results, nor does it throw any errors. The
> problem seems to come from the following function:
>
> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY ts), 
> ARRAY[-1]), -1) AS prev_zoneIds
>
> If I remove this function, it works in both environments and returns data.
>
> I thought the issue might be related to it being a stateful function, so I
> configured it to use RocksDB for state storage. However, in the Docker
> Compose environment, I did not configure any state backend, and* it
> worked fine. It only started working in the new environment when I reduced
> the parallelism to 1, and I don’t understand why.* When parallelism is 4
> it has occasionally returned data.
>
> The source I use is Kafka topics with varying numbers of partitions,
> ranging from 1 to 4, although I don’t think that’s related to the issue.
>
>

Reply via email to