You are right... Currently, the Flink SQL Client submits DML statements
asynchronously to the Flink cluster,
which means it is not possible to determine the final success or failure of the
job directly from the console.
--
Best!
Xuyang
在 2024-12-26 20:38:56,"Guillermo Ortiz Fernández"
<[email protected]> 写道:
it was my mistake, number and taskslot and parallelism wasn't right and I was
inserting data from FlinkSQL. The insert didn't have enough resources and
failed but from flinksql console wasn't reported any error.
El mar, 24 dic 2024 a las 3:03, Xuyang (<[email protected]>) escribió:
Hi,
Did you observe that the number of records received by the Kafka source task is
0? What about the number of records received by the downstream tasks of the
Kafka source?
If the number of records received and sent by all tasks in the entire topology
is 0, it suggests that the issue lies with the Kafka source not reading any
data, rather than being related to the stateful over agg with expression
"ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY ts),
ARRAY[-1]), -1) AS prev_zoneIds.".
--
Best!
Xuyang
At 2024-12-20 19:22:15, "Guillermo Ortiz Fernández"
<[email protected]> wrote:
I have been reviewing the previous query I ran in the flink sql-client while
simultaneously running an INSERT into a Kafka topic (with 4 partitions) from
another flink sql-client. For testing purposes, I set up 8 task slots and a
default parallelism of 4.
When I run the query, it shows as "RUNNING" in Flink UI, leaving 4 slots free.
After running the INSERT into the Kafka topic, I can confirm, using a Kafka
consumer, that the data has been inserted. However, the query does not update
its results. When I inspect the specific Kafka source task in the DAG, it
displays "Records Received: 0."
Why might the query not be reading the records even though it is running? The
table configuration includes the option you mentioned earlier.
CREATE TABLE user (
....)
'connector' = 'kafka',
'topic' = 'avro.users-flink', <- 4 partitions
'properties.bootstrap.servers' = 'xxx',
'properties.group.id' = 'presenceFlinkGroup',
'properties.auto.offset.reset' = 'latest',
'scan.startup.mode' = 'group-offsets',
'key.format' = 'raw',
'key.fields' = 'code',
'value.format' = 'avro',
'value.fields-include' = 'ALL',
'scan.watermark.idle-timeout' = '1000',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'GSSAPI',
'properties.sasl.kerberos.service.name' = 'kafka',
'properties.sasl.jaas.config' = 'xx',
'properties.ssl.truststore.location' = 'xx',
'properties.ssl.truststore.password' = 'xx');
El vie, 20 dic 2024 a las 6:49, Xuyang (<[email protected]>) escribió:
Hi,
In fact, if I'm not mistaken, this issue likely relates to the situation where
the number of Kafka partitions is less than parallelism.
Speculated Reason: The over agg based on event time (which in your SQL is
represented as `ORDER BY ts`) is a watermark-driven operator. It only outputs
accumulated data once it receives watermarks. I suspect that in your scenario,
since the number of Kafka partitions is less than parallelism 4, some of the
source subtasks will not produce watermark. This leads to over agg being unable
to advance its watermark, as it requires the minimum watermark from all input
subtask to push the watermark event timer forward, resulting in no data being
output.
Verification: In the Flink UI, you can be able to see the watermark progress of
over agg. Please check if the watermark is not progressing (or perhaps it is
stuck at Long.Min).
Solutions:
1. (Recommended) Set `table.exec.source.idle-timeout`[1] to ignore alignment
for subtasks that have not emitted a watermark after a certain timeout.
2. Ensure that the concurrency level matches the number of Kafka partitions.
Looking forward to hearing your feedback on whether these solutions have been
effective.
[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-source-idle-timeout
--
Best!
Xuyang
At 2024-12-20 00:19:11, "Guillermo Ortiz Fernández"
<[email protected]> wrote:
I am working on a program using Flink SQL. I initially developed it in a Docker
Compose environment where I run Flink and Kafka, and everything worked
correctly. However, when I deployed it to another environment, it stopped
working. The difference I noticed in the new environment, which only runs on a
single node in localhost, is that instead of having parallelism.default: 1, it
was set to 4.
The query is:
WITH user_zones_state AS (
SELECT
eez.code,
ez.zoneIds,
eez.siteId,
eez.ts
FROM user_event AS eez
LEFT JOIN user_zones_relation FOR SYSTEM_TIME AS OF eez.ts AS ez
ON eez.siteId = ez.siteId AND eez.sector = ez.sector
)
SELECT
code,
ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY ts),
ARRAY[-1]), -1) AS prev_zoneIds,
ts
FROM user_zones_state
The query does not return any results, nor does it throw any errors. The
problem seems to come from the following function:
ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY code ORDER BY ts),
ARRAY[-1]), -1) AS prev_zoneIds
If I remove this function, it works in both environments and returns data.
I thought the issue might be related to it being a stateful function, so I
configured it to use RocksDB for state storage. However, in the Docker Compose
environment, I did not configure any state backend, and it worked fine. It only
started working in the new environment when I reduced the parallelism to 1, and
I don’t understand why. When parallelism is 4 it has occasionally returned data.
The source I use is Kafka topics with varying numbers of partitions, ranging
from 1 to 4, although I don’t think that’s related to the issue.