Re:How can we read checkpoint data for debugging state

2025-02-20 Thread Xuyang
Hi, 

FYI that hopes helpful: FLIP-496: SQL connector for keyed savepoint data[1]




[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A+SQL+connector+for+keyed+savepoint+data




--

Best!
Xuyang




At 2025-02-21 12:11:02, "Sachin Mittal"  wrote:

Hi,

So I have a flink application which stores state (RocksDB state backend) in S3 
with the following directory structure:
s3://{bucket}/flink-checkpoints/
/{job-id}|+ --shared/
+ --taskowned/
+ --chk-/


I have my job pipeline defined like:
finalDataStreamevents = env.fromSource(..., "Src");


SingleOutputStreamOperator statsData =
events
.keyBy(newMyKeySelector())
.process(newMyStatsProcessor(), Types.POJO(StatsData.class))
.name("Proc");


statsData
.addSink(newMySink<>(...))
.name("Sink");


env.execute("Exec");


The MyStatsProcessor has keyed states defined as:
state1 =
getRuntimeContext().getState(newValueStateDescriptor<>("state1", 
Types.POJO(StateOne.class)));
state2 =
getRuntimeContext().getState(newValueStateDescriptor<>("state2", 
Types.POJO(StateTwo.class)));




So my question is how can I read any checkpoint state. I see this API 
flink-state-processor-api.
Can I use the same here, if so how do I instantiate it:


StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();SavepointReadersavepoint=SavepointReader.read(env,"s3://{bucket}/flink-checkpoints/{job-id}",
 );
Is this the correct way to use this API for reading a checkpoint ?


Please note I have also enabled:
state.backend.incremental: 'true'
state.backend.local-recovery: 'true'
state.backend.changelog.enabled: 'true'
state.backend.changelog.storage: filesystem
dstl.dfs.base-path: s3://{bucket}/changelog
dstl.dfs.compression.enabled: 'true'


Now after say I am able to create the reader how do I inspect a particular 
keyed state.
I see a function called readKeyedState but I am unsure as to what uuid I need 
to pass to read a particular state?
Would something like this work:


DataStream keyedState = 
savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), 
newReaderFunction());
And now in my KeyedState class, I can access state1 and state2.


Would this work?


Please let me know if I am on the right track or this is something not possible 
to read checkpointed states via any external application for debugging.


Thanks
Sachin













































Re: FlinkSQL LAG, strange behavior?

2025-02-20 Thread Guillermo Ortiz Fernández
Another option would be add an extra field like:

ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY *ts,
row_number*), ARRAY[-1]), -1) AS prev_zoneIds,

But it isn't possible either.

El jue, 20 feb 2025 a las 11:48, Guillermo Ortiz Fernández (<
guillermo.ortiz.f...@gmail.com>) escribió:

> I tried to create a additional field based on ROW_NUMBER but it can't be
> used in LAG function.
>
> The idea was:
> WITH ranked AS (
> SELECT
> msisdn,
> eventTimestamp,
> zoneIds,
> ts,
> TO_TIMESTAMP_LTZ(ROW_NUMBER() OVER (PARTITION BY msisdn ORDER BY
> ts), 3) AS ts_ranked
> FROM example_2
> )
> SELECT
> msisdn,
> eventTimestamp,
> ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds,
> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY
> ts_ranked), ARRAY[-1]), -1) AS prev_zoneIds,
> ts
> FROM ranked;
>
> RANKED subselect:
>   msisdn   eventTimestampzoneIds
>ts  row_num   ts_ranked
>673944959173999638[1]
> 2025-02-19 21:19:40.0001 1970-01-01 01:00:00.001
>673944959173999638[1]
> 2025-02-19 21:19:40.0002 1970-01-01 01:00:00.002
>
> *But [ERROR] Could not execute SQL statement. Reason:*
> *org.apache.flink.table.api.TableException: OVER windows' ordering in
> stream mode must be defined on a time attribute. *
>
> I guess just could use "ts" field because if it's the temporal field?
>
>
> El jue, 20 feb 2025 a las 10:41, Alessio Bernesco Làvore (<
> alessio.berne...@gmail.com>) escribió:
>
>> Hello,
>> don't know if it's strictly related, but using a TIMESTAMP_LTZ vs.
>> TIMESTAMP ts i had some not understandable behaviors.
>>
>> Changing the ts to TIMESTAMP (not LTZ) the identical query works as
>> expected.
>> So, first all, we changed all the parsing to TIMESTAMP to exclude this
>> kind of problems.
>>
>> If someone is interested i can provide the exact query to reproduce the
>> problem we figured out.
>>
>> Greetings,
>> Alessio
>>
>> On Thu, Feb 20, 2025 at 10:31 AM Guillermo Ortiz Fernández <
>> guillermo.ortiz.f...@gmail.com> wrote:
>>
>>> I have created a table that reads from a Kafka topic. What I want to do
>>> is order the data by eventTime and add a new field that represents the
>>> previous value using the LAG function.
>>>
>>> The problem arises when two records have exactly the same eventTime,
>>> which produces a "strange" behavior.
>>>
>>>
>>> CREATE TABLE example (
>>> eventTimestamp BIGINT NOT NULL,
>>> msisdn INT NOT NULL,
>>> zoneIds ARRAY NOT NULL,
>>> ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3),
>>> `kafka_offset` BIGINT METADATA FROM 'offset' VIRTUAL,
>>> WATERMARK FOR ts AS ts
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'topic' = 'example-offset',
>>> 'properties.bootstrap.servers' = '',
>>> 'properties.auto.offset.reset' = 'latest',
>>> 'scan.startup.mode' = 'latest-offset',
>>> 'key.format' = 'raw',
>>> 'key.fields' = 'msisdn',
>>> 'value.format' = 'avro',
>>> 'value.fields-include' = 'ALL',
>>> 'scan.watermark.idle-timeout' = '1000',
>>>
>>> );
>>>
>>>
>>> INSERT INTO example (eventTimestamp, msisdn, zoneIds)VALUES
>>> (173999638, 673944959, ARRAY[1]),
>>> (173999638, 673944959, ARRAY[1]);
>>>
>>>
>>> SELECT
>>> msisdn,
>>> eventTimestamp,
>>> ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds,
>>> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY 
>>> ts, kafka_offset), ARRAY[-1]), -1)  AS prev_zoneIds,
>>> tsFROM example;
>>>
>>> *Actual Result:*
>>> msisdneventTimestampzoneIdsprev_zoneIdsts
>>> 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000
>>> 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000*Expected
>>> Result:*
>>> msisdneventTimestampzoneIdsprev_zoneIdsts
>>> 673944959 173999638 [1] [ ] 2025-02-19 21:19:40.000
>>> 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000
>>> --
>>> *Is this behavior normal?*
>>>
>>> I am trying to achieve the expected behavior by including the metadata
>>> of the offset in the example table and adding it to the OVER clause in
>>> the LAG function. However, it seems that Flink does not allow ordering
>>> by more than one column:
>>>
>>>
>>> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY ts, 
>>> kafka_offset), ARRAY[-1]), -1)  AS prev_zoneIds,
>>>
>>> Results in:
>>>
>>> [ERROR] Could not execute SQL statement. Reason:
>>> org.apache.flink.table.api.TableException: The window can only be ordered 
>>> by a single time column.
>>>
>>> --
>>>
>>> Would you happen to know how to achieve the expected result?
>>>
>>


Question about large windows and RocksDB state backend

2025-02-20 Thread Gabriele Mencagli

Dear Flink Community,

I am working with an application developed using the DataStream API. The 
application has a window operator using with a large temporal window. 
Due to the processing done, we are using the ProcessWindowFunction 
implementation as we need to explicitly buffer all the tuples falling 
within the boundaries of each triggered window.


We are interested in running the application with the RocksDB embedded 
state backend to reduce the memory footprint. We would like to know if 
there is information available about how windows are stored and 
represented in RocksDB, whether windows represent separate objects in 
RocksDB, if tuples are grouped by key, replicated in case of overlapping 
windows, and so forth.


We understand that this question can be answered by manually inspecting 
the source code. However, we would be very grateful if someone could 
share their knowledge on this topic and suggest any relevant documents 
available online. Unfortunately, the documentation on Flink's website 
does not provide such low-level details.


Thanks a lot,

Gabriele


Re: FlinkSQL LAG, strange behavior?

2025-02-20 Thread Alessio Bernesco Làvore
Hello,
don't know if it's strictly related, but using a TIMESTAMP_LTZ vs.
TIMESTAMP ts i had some not understandable behaviors.

Changing the ts to TIMESTAMP (not LTZ) the identical query works as
expected.
So, first all, we changed all the parsing to TIMESTAMP to exclude this kind
of problems.

If someone is interested i can provide the exact query to reproduce the
problem we figured out.

Greetings,
Alessio

On Thu, Feb 20, 2025 at 10:31 AM Guillermo Ortiz Fernández <
guillermo.ortiz.f...@gmail.com> wrote:

> I have created a table that reads from a Kafka topic. What I want to do is
> order the data by eventTime and add a new field that represents the
> previous value using the LAG function.
>
> The problem arises when two records have exactly the same eventTime,
> which produces a "strange" behavior.
>
>
> CREATE TABLE example (
> eventTimestamp BIGINT NOT NULL,
> msisdn INT NOT NULL,
> zoneIds ARRAY NOT NULL,
> ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3),
> `kafka_offset` BIGINT METADATA FROM 'offset' VIRTUAL,
> WATERMARK FOR ts AS ts
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'example-offset',
> 'properties.bootstrap.servers' = '',
> 'properties.auto.offset.reset' = 'latest',
> 'scan.startup.mode' = 'latest-offset',
> 'key.format' = 'raw',
> 'key.fields' = 'msisdn',
> 'value.format' = 'avro',
> 'value.fields-include' = 'ALL',
> 'scan.watermark.idle-timeout' = '1000',
>
> );
>
>
> INSERT INTO example (eventTimestamp, msisdn, zoneIds)VALUES
> (173999638, 673944959, ARRAY[1]),
> (173999638, 673944959, ARRAY[1]);
>
>
> SELECT
> msisdn,
> eventTimestamp,
> ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds,
> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY 
> ts, kafka_offset), ARRAY[-1]), -1)  AS prev_zoneIds,
> tsFROM example;
>
> *Actual Result:*
> msisdneventTimestampzoneIdsprev_zoneIdsts
> 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000
> 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000*Expected Result:*
> msisdneventTimestampzoneIdsprev_zoneIdsts
> 673944959 173999638 [1] [ ] 2025-02-19 21:19:40.000
> 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000
> --
> *Is this behavior normal?*
>
> I am trying to achieve the expected behavior by including the metadata of
> the offset in the example table and adding it to the OVER clause in the
> LAG function. However, it seems that Flink does not allow ordering by
> more than one column:
>
>
> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY ts, 
> kafka_offset), ARRAY[-1]), -1)  AS prev_zoneIds,
>
> Results in:
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: The window can only be ordered by 
> a single time column.
>
> --
>
> Would you happen to know how to achieve the expected result?
>


Re: FlinkSQL LAG, strange behavior?

2025-02-20 Thread Guillermo Ortiz Fernández
I tried to create a additional field based on ROW_NUMBER but it can't be
used in LAG function.

The idea was:
WITH ranked AS (
SELECT
msisdn,
eventTimestamp,
zoneIds,
ts,
TO_TIMESTAMP_LTZ(ROW_NUMBER() OVER (PARTITION BY msisdn ORDER BY
ts), 3) AS ts_ranked
FROM example_2
)
SELECT
msisdn,
eventTimestamp,
ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds,
ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY
ts_ranked), ARRAY[-1]), -1) AS prev_zoneIds,
ts
FROM ranked;

RANKED subselect:
  msisdn   eventTimestampzoneIds
   ts  row_num   ts_ranked
   673944959173999638[1] 2025-02-19
21:19:40.0001 1970-01-01 01:00:00.001
   673944959173999638[1] 2025-02-19
21:19:40.0002 1970-01-01 01:00:00.002

*But [ERROR] Could not execute SQL statement. Reason:*
*org.apache.flink.table.api.TableException: OVER windows' ordering in
stream mode must be defined on a time attribute. *

I guess just could use "ts" field because if it's the temporal field?


El jue, 20 feb 2025 a las 10:41, Alessio Bernesco Làvore (<
alessio.berne...@gmail.com>) escribió:

> Hello,
> don't know if it's strictly related, but using a TIMESTAMP_LTZ vs.
> TIMESTAMP ts i had some not understandable behaviors.
>
> Changing the ts to TIMESTAMP (not LTZ) the identical query works as
> expected.
> So, first all, we changed all the parsing to TIMESTAMP to exclude this
> kind of problems.
>
> If someone is interested i can provide the exact query to reproduce the
> problem we figured out.
>
> Greetings,
> Alessio
>
> On Thu, Feb 20, 2025 at 10:31 AM Guillermo Ortiz Fernández <
> guillermo.ortiz.f...@gmail.com> wrote:
>
>> I have created a table that reads from a Kafka topic. What I want to do
>> is order the data by eventTime and add a new field that represents the
>> previous value using the LAG function.
>>
>> The problem arises when two records have exactly the same eventTime,
>> which produces a "strange" behavior.
>>
>>
>> CREATE TABLE example (
>> eventTimestamp BIGINT NOT NULL,
>> msisdn INT NOT NULL,
>> zoneIds ARRAY NOT NULL,
>> ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3),
>> `kafka_offset` BIGINT METADATA FROM 'offset' VIRTUAL,
>> WATERMARK FOR ts AS ts
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = 'example-offset',
>> 'properties.bootstrap.servers' = '',
>> 'properties.auto.offset.reset' = 'latest',
>> 'scan.startup.mode' = 'latest-offset',
>> 'key.format' = 'raw',
>> 'key.fields' = 'msisdn',
>> 'value.format' = 'avro',
>> 'value.fields-include' = 'ALL',
>> 'scan.watermark.idle-timeout' = '1000',
>>
>> );
>>
>>
>> INSERT INTO example (eventTimestamp, msisdn, zoneIds)VALUES
>> (173999638, 673944959, ARRAY[1]),
>> (173999638, 673944959, ARRAY[1]);
>>
>>
>> SELECT
>> msisdn,
>> eventTimestamp,
>> ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds,
>> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY 
>> ts, kafka_offset), ARRAY[-1]), -1)  AS prev_zoneIds,
>> tsFROM example;
>>
>> *Actual Result:*
>> msisdneventTimestampzoneIdsprev_zoneIdsts
>> 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000
>> 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000*Expected Result:*
>> msisdneventTimestampzoneIdsprev_zoneIdsts
>> 673944959 173999638 [1] [ ] 2025-02-19 21:19:40.000
>> 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000
>> --
>> *Is this behavior normal?*
>>
>> I am trying to achieve the expected behavior by including the metadata of
>> the offset in the example table and adding it to the OVER clause in the
>> LAG function. However, it seems that Flink does not allow ordering by
>> more than one column:
>>
>>
>> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY ts, 
>> kafka_offset), ARRAY[-1]), -1)  AS prev_zoneIds,
>>
>> Results in:
>>
>> [ERROR] Could not execute SQL statement. Reason:
>> org.apache.flink.table.api.TableException: The window can only be ordered by 
>> a single time column.
>>
>> --
>>
>> Would you happen to know how to achieve the expected result?
>>
>


FlinkSQL LAG, strange behavior?

2025-02-20 Thread Guillermo Ortiz Fernández
I have created a table that reads from a Kafka topic. What I want to do is
order the data by eventTime and add a new field that represents the
previous value using the LAG function.

The problem arises when two records have exactly the same eventTime, which
produces a "strange" behavior.


CREATE TABLE example (
eventTimestamp BIGINT NOT NULL,
msisdn INT NOT NULL,
zoneIds ARRAY NOT NULL,
ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3),
`kafka_offset` BIGINT METADATA FROM 'offset' VIRTUAL,
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'kafka',
'topic' = 'example-offset',
'properties.bootstrap.servers' = '',
'properties.auto.offset.reset' = 'latest',
'scan.startup.mode' = 'latest-offset',
'key.format' = 'raw',
'key.fields' = 'msisdn',
'value.format' = 'avro',
'value.fields-include' = 'ALL',
'scan.watermark.idle-timeout' = '1000',

);


INSERT INTO example (eventTimestamp, msisdn, zoneIds)VALUES
(173999638, 673944959, ARRAY[1]),
(173999638, 673944959, ARRAY[1]);


SELECT
msisdn,
eventTimestamp,
ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds,
ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn
ORDER BY ts, kafka_offset), ARRAY[-1]), -1)  AS prev_zoneIds,
tsFROM example;

*Actual Result:*
msisdneventTimestampzoneIdsprev_zoneIdsts
673944959 173999638 [1] [1] 2025-02-19 21:19:40.000
673944959 173999638 [1] [1] 2025-02-19 21:19:40.000*Expected Result:*
msisdneventTimestampzoneIdsprev_zoneIdsts
673944959 173999638 [1] [ ] 2025-02-19 21:19:40.000
673944959 173999638 [1] [1] 2025-02-19 21:19:40.000
--
*Is this behavior normal?*

I am trying to achieve the expected behavior by including the metadata of
the offset in the example table and adding it to the OVER clause in the LAG
function. However, it seems that Flink does not allow ordering by more than
one column:


ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY
ts, kafka_offset), ARRAY[-1]), -1)  AS prev_zoneIds,

Results in:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: The window can only be
ordered by a single time column.

--

Would you happen to know how to achieve the expected result?


Re:Re: Performance problem with FlinkSQL

2025-02-20 Thread Xuyang
Hi, 

There can be various reasons for performance issues, such as:

1. Bottlenecks in the Kafka connector when reading data

2. Time-consuming filter conditions

3. Bottlenecks in the Kafka connector when writing data

To further diagnose the problem, you can try:

1. Set the Flink configuration `pipeline.operator-chaining.enabled` to false. 
This will allow you to see the individual vertexes (source -> calc -> sink) on 
the Flink UI. Check the entire topology to see if any vertex are experiencing 
backpressure.

2. If there is no backpressure, it is likely that the bottleneck lies in data 
reading. Otherwise, the operator following the last one experiencing 
backpressure is likely the bottleneck (for example, if the source is under 
backpressure, the calc operator could be the bottleneck, indicating that the 
filtering logic is CPU-intensive).

3. 

```

and the process has been running for several days without failures, although 
with high CPU and memory usage.

```

If CPU utilization is high, you can also further analyze the CPU flame graph to 
see what tasks are being processed by that node and share it here.




Hope this helpful!




--

Best!
Xuyang




At 2025-02-20 22:03:19, "Guillermo Ortiz Fernández" 
 wrote:

I have been configuring the Flink cluster (application mode) to process the 
Kafka data volume. The current configuration consists of 16 pods, each with 
12GB of memory and 2 CPUs. Each TaskManager has 4 slots.

All processing is done using Flink SQL, and since Kafka topics may contain 
out-of-order data, a 5-minute watermark is applied. In previous configurations, 
we allocated between 4-8GB per pod, but this caused memory errors that led to 
process crashes.

Currently, this process is a migration from Kafka Streams, which is running in 
parallel, consuming exactly the same data on the same infrastructure, but using 
significantly fewer resources than Flink. Approximately, Flink is consuming 10x 
more memory and 8x more CPU than Kafka Streams.

I am unsure whether this is due to Flink inherently consuming more resources, 
whether using SQL results in poorly optimized auto-generated code, or if there 
is a configuration issue. The metrics do not show any backpressure or errors, 
and the process has been running for several days without failures, although 
with high CPU and memory usage.

Any ideas on how to diagnose or optimize this to reduce resource consumption?



El jue, 30 ene 2025 a las 10:02, Pedro Mázala () 
escribió:

> The average output rate needed to avoid lag after filtering messages should 
> be around 60K messages per second. I’ve been testing different configurations 
> of parallelism, slots and pods (everything runs on Kubernetes), but I’m far 
> from achieving those numbers.

How are you partitioning your query? Do you see any backpressure happening on 
the Flink UI?


> In the latest configuration, I used 20 pods, a parallelism of 120, with 4 
> slots per taskmanager.


Were all tasks working properly or you had idleness?


> Additionally, setting parallelism to 120 creates hundreds of subtasks for the 
> smaller topics, which don’t do much but still consume minimal resources even 
> if idle.


On the table API I'm not sure if you can choose parallelism per "task" 
DataStream I'm sure you can do it. 






Att,
Pedro Mázala
+31 (06) 3819 3814
Be awesome




On Wed, 29 Jan 2025 at 22:06, Guillermo Ortiz Fernández 
 wrote:

After last checking it uses about 200-400 millicores each pod and 2.2Gb.


El mié, 29 ene 2025 a las 21:41, Guillermo Ortiz Fernández 
() escribió:


I have a job entirely written in Flink SQL. The first part of the program 
processes 10 input topics and generates one output topic with normalized 
messages and some filtering applied (really easy, some where by fields and 
substring). Nine of the topics produce between hundreds and thousands of 
messages per second, with an average of 4–10 partitions each. The other topic 
produces 150K messages per second and has 500 partitions. They are unioned to 
the output topic.

The average output rate needed to avoid lag after filtering messages should be 
around 60K messages per second. I’ve been testing different configurations of 
parallelism, slots and pods (everything runs on Kubernetes), but I’m far from 
achieving those numbers.

In the latest configuration, I used 20 pods, a parallelism of 120, with 4 slots 
per taskmanager. With this setup, I achieve approximately 20K messages per 
second, but I’m unable to consume the largest topic at the rate messages are 
being produced. Additionally, setting parallelism to 120 creates hundreds of 
subtasks for the smaller topics, which don’t do much but still consume minimal 
resources even if idle.

I started trying with parallelism 12 and got about 1000-3000 messages per 
second.

When I check the use of cpu and memory to the pods and don't see any problem 
and they are far from the limit, each taskmanager has 4gb and 2cpus and they 
are never close to u

How can we read checkpoint data for debugging state

2025-02-20 Thread Sachin Mittal
Hi,
So I have a flink application which stores state (RocksDB state backend) in
S3 with the following directory structure:

s3://{bucket}/flink-checkpoints//{job-id}|+
--shared/+ --taskowned/+ --chk-/


I have my job pipeline defined like:

final DataStream events = env.fromSource(..., "Src");

SingleOutputStreamOperator statsData =
events
.keyBy(new MyKeySelector())
.process(new MyStatsProcessor(), Types.POJO(StatsData.class))
.name("Proc");

statsData
.addSink(new MySink<>(...))
.name("Sink");

env.execute("Exec");


The MyStatsProcessor has keyed states defined as:

state1 =
getRuntimeContext().getState(new
ValueStateDescriptor<>("state1", Types.POJO(StateOne.class)));
state2 =
getRuntimeContext().getState(new
ValueStateDescriptor<>("state2", Types.POJO(StateTwo.class)));


So my question is how can I read any checkpoint state. I see this API
flink-state-processor-api.
Can I use the same here, if so how do I instantiate it:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();SavepointReader
savepoint = SavepointReader.read(env,
"s3://{bucket}/flink-checkpoints/{job-id}", );

Is this the correct way to use this API for reading a checkpoint ?

Please note I have also enabled:
state.backend.incremental: 'true'
state.backend.local-recovery: 'true'
state.backend.changelog.enabled: 'true'
state.backend.changelog.storage: filesystem
dstl.dfs.base-path: s3://{bucket}/changelog
dstl.dfs.compression.enabled: 'true'

Now after say I am able to create the reader how do I inspect a particular
keyed state.
I see a function called readKeyedState but I am unsure as to what uuid I
need to pass to read a particular state?
Would something like this work:

DataStream keyedState = savepoint.readKeyedState(
OperatorIdentifier.forUid("Proc"), new ReaderFunction());
And now in my KeyedState class, I can access state1 and state2.

Would this work?

Please let me know if I am on the right track or this is something not
possible to read checkpointed states via any external application for
debugging.

Thanks
Sachin


Re: How can we read checkpoint data for debugging state

2025-02-20 Thread Sachin Mittal
Hi,
I am working on Flink 1.19.1, so I guess I cannot use the SQL connector as
that's to be released for 2.1.0
If I directly try to use the flink-state-processor-api, my very first
question is how do I know the uuid for each keyed state ?
Is it the step name?
as in

events
.keyBy(new MyKeySelector())
.process(new MyStatsProcessor(), Types.POJO(StatsData.class))
.name("Proc");

If now I want to access keyed states for MyStatsProcessor then I need to
access it using:

savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new
ReaderFunction());

Is this the right way to access it ?

Thanks
Sachin


On Fri, Feb 21, 2025 at 11:09 AM Xuyang  wrote:

> Hi,
>
> FYI that hopes helpful: FLIP-496: SQL connector for keyed savepoint data[1]
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A+SQL+connector+for+keyed+savepoint+data
>
>
> --
> Best!
> Xuyang
>
>
> At 2025-02-21 12:11:02, "Sachin Mittal"  wrote:
>
> Hi,
> So I have a flink application which stores state (RocksDB state backend)
> in S3 with the following directory structure:
>
> s3://{bucket}/flink-checkpoints//{job-id}|+ --shared/ 
>+ --taskowned/+ --chk-/
>
>
> I have my job pipeline defined like:
>
> final DataStream events = env.fromSource(..., "Src");
>
> SingleOutputStreamOperator statsData =
> events
> .keyBy(new MyKeySelector())
> .process(new MyStatsProcessor(), Types.POJO(StatsData.class))
> .name("Proc");
>
> statsData
> .addSink(new MySink<>(...))
> .name("Sink");
>
> env.execute("Exec");
>
>
> The MyStatsProcessor has keyed states defined as:
>
> state1 =
> getRuntimeContext().getState(new ValueStateDescriptor<>("state1", 
> Types.POJO(StateOne.class)));
> state2 =
> getRuntimeContext().getState(new ValueStateDescriptor<>("state2", 
> Types.POJO(StateTwo.class)));
>
>
> So my question is how can I read any checkpoint state. I see this API 
> flink-state-processor-api.
> Can I use the same here, if so how do I instantiate it:
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();SavepointReader 
> savepoint = SavepointReader.read(env, 
> "s3://{bucket}/flink-checkpoints/{job-id}", );
>
> Is this the correct way to use this API for reading a checkpoint ?
>
> Please note I have also enabled:
> state.backend.incremental: 'true'
> state.backend.local-recovery: 'true'
> state.backend.changelog.enabled: 'true'
> state.backend.changelog.storage: filesystem
> dstl.dfs.base-path: s3://{bucket}/changelog
> dstl.dfs.compression.enabled: 'true'
>
> Now after say I am able to create the reader how do I inspect a particular
> keyed state.
> I see a function called readKeyedState but I am unsure as to what uuid I
> need to pass to read a particular state?
> Would something like this work:
>
> DataStream keyedState = savepoint.readKeyedState(
> OperatorIdentifier.forUid("Proc"), new ReaderFunction());
> And now in my KeyedState class, I can access state1 and state2.
>
> Would this work?
>
> Please let me know if I am on the right track or this is something not
> possible to read checkpointed states via any external application for
> debugging.
>
> Thanks
> Sachin
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


SASL_SSL snik kafka

2025-02-20 Thread nick toker
Hello,

We are encountering a connection issue with our Kafka sink when using AWS
MSK IAM authentication.  While our Kafka source connects successfully, the
sink fails to establish a connection.

Here's how we're building the sink:

```java
KafkaSinkBuilder builder = KafkaSink.builder()

.setBootstrapServers(producerProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG))
.setRecordSerializer(new
FlinkKafkaRecordSerializationSchema<>(outputConfig))
.setKafkaProducerConfig(producerProps)
.setDeliveryGuarantee(deliveryGuarantee)
.setTransactionalIdPrefix(getTransactionalIdPrefix(outputConfig,
jobName));
```

The `producerProps` include the following configuration for IAM
authentication:

```
{
  security.protocol=SASL_SSL,
  sasl.mechanism=AWS_MSK_IAM,
  sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;,

sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
}
```

The sink connection fails with the following error:

```
2025-02-20 13:57:35,299 WARN  org.apache.kafka.clients.NetworkClient ... -
[AdminClient clientId=pdfOrchestrator-enumerator-admin-client] Error
connecting to node kafka-central-1.amazonaws.com:9098 (id: -1 rack: null)
java.io.IOException: Channel could not be created for socket
java.nio.channels.SocketChannel[closed]
...
Caused by: org.apache.kafka.common.KafkaException:
org.apache.kafka.common.errors.SaslAuthenticationException: Failed to
configure SaslClientAuthenticator
...
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException:
Failed to create SaslClient with mechanism AWS_MSK_IAM
```

This error suggests a problem with the SASL/IAM authentication
configuration for the sink.  We have already verified the following:

* **Endpoint:** We are using port 9098, which is correct for MSK with IAM
authentication.  We have also confirmed that the source connection uses the
same port.
* **Dependencies:** The `aws-msk-iam-auth` dependency is included in our
project.

We suspect the issue might be related to IAM role configuration or missing
AWS region information. Could you please provide guidance on how to further
troubleshoot this?  Specifically, we would appreciate information on:

* Best practices for configuring IAM roles for Flink jobs connecting to MSK
with IAM authentication.
* How to explicitly set the AWS region for the Kafka client in this context.
* Any other common pitfalls related to MSK IAM authentication with Flink.

Thank you for your assistance.


Re: FlinkSQL LAG, strange behavior?

2025-02-20 Thread Guillermo Ortiz Fernández
And another option I tried it's:

WITH ranked AS (
  select *
  FROM (
SELECT
msisdn,
eventTimestamp,
zoneIds,
ts,
ROW_NUMBER() OVER (PARTITION BY msisdn,ts ORDER BY ts) AS rownum
FROM example)
  WHERE rownum = 1
)
SELECT
msisdn,
eventTimestamp,
ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds,
ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY
ts), ARRAY[-1]), -1) AS prev_zoneIds,
ts
FROM ranked;

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: StreamPhysicalOverAggregate
doesn't support consuming update and delete changes which is produced by
node Deduplicate(keep=[FirstRow], key=[msisdn, ts], order=[ROWTIME])

El jue, 20 feb 2025 a las 12:00, Guillermo Ortiz Fernández (<
guillermo.ortiz.f...@gmail.com>) escribió:

> Another option would be add an extra field like:
>
> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY 
> *ts,
> row_number*), ARRAY[-1]), -1) AS prev_zoneIds,
>
> But it isn't possible either.
>
> El jue, 20 feb 2025 a las 11:48, Guillermo Ortiz Fernández (<
> guillermo.ortiz.f...@gmail.com>) escribió:
>
>> I tried to create a additional field based on ROW_NUMBER but it can't be
>> used in LAG function.
>>
>> The idea was:
>> WITH ranked AS (
>> SELECT
>> msisdn,
>> eventTimestamp,
>> zoneIds,
>> ts,
>> TO_TIMESTAMP_LTZ(ROW_NUMBER() OVER (PARTITION BY msisdn ORDER BY
>> ts), 3) AS ts_ranked
>> FROM example_2
>> )
>> SELECT
>> msisdn,
>> eventTimestamp,
>> ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds,
>> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER
>> BY ts_ranked), ARRAY[-1]), -1) AS prev_zoneIds,
>> ts
>> FROM ranked;
>>
>> RANKED subselect:
>>   msisdn   eventTimestampzoneIds
>>  ts  row_num   ts_ranked
>>673944959173999638[1]
>> 2025-02-19 21:19:40.0001 1970-01-01 01:00:00.001
>>673944959173999638[1]
>> 2025-02-19 21:19:40.0002 1970-01-01 01:00:00.002
>>
>> *But [ERROR] Could not execute SQL statement. Reason:*
>> *org.apache.flink.table.api.TableException: OVER windows' ordering in
>> stream mode must be defined on a time attribute. *
>>
>> I guess just could use "ts" field because if it's the temporal field?
>>
>>
>> El jue, 20 feb 2025 a las 10:41, Alessio Bernesco Làvore (<
>> alessio.berne...@gmail.com>) escribió:
>>
>>> Hello,
>>> don't know if it's strictly related, but using a TIMESTAMP_LTZ vs.
>>> TIMESTAMP ts i had some not understandable behaviors.
>>>
>>> Changing the ts to TIMESTAMP (not LTZ) the identical query works as
>>> expected.
>>> So, first all, we changed all the parsing to TIMESTAMP to exclude this
>>> kind of problems.
>>>
>>> If someone is interested i can provide the exact query to reproduce the
>>> problem we figured out.
>>>
>>> Greetings,
>>> Alessio
>>>
>>> On Thu, Feb 20, 2025 at 10:31 AM Guillermo Ortiz Fernández <
>>> guillermo.ortiz.f...@gmail.com> wrote:
>>>
 I have created a table that reads from a Kafka topic. What I want to do
 is order the data by eventTime and add a new field that represents the
 previous value using the LAG function.

 The problem arises when two records have exactly the same eventTime,
 which produces a "strange" behavior.


 CREATE TABLE example (
 eventTimestamp BIGINT NOT NULL,
 msisdn INT NOT NULL,
 zoneIds ARRAY NOT NULL,
 ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3),
 `kafka_offset` BIGINT METADATA FROM 'offset' VIRTUAL,
 WATERMARK FOR ts AS ts
 ) WITH (
 'connector' = 'kafka',
 'topic' = 'example-offset',
 'properties.bootstrap.servers' = '',
 'properties.auto.offset.reset' = 'latest',
 'scan.startup.mode' = 'latest-offset',
 'key.format' = 'raw',
 'key.fields' = 'msisdn',
 'value.format' = 'avro',
 'value.fields-include' = 'ALL',
 'scan.watermark.idle-timeout' = '1000',

 );


 INSERT INTO example (eventTimestamp, msisdn, zoneIds)VALUES
 (173999638, 673944959, ARRAY[1]),
 (173999638, 673944959, ARRAY[1]);


 SELECT
 msisdn,
 eventTimestamp,
 ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds,
 ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY 
 ts, kafka_offset), ARRAY[-1]), -1)  AS prev_zoneIds,
 tsFROM example;

 *Actual Result:*
 msisdneventTimestampzoneIdsprev_zoneIdsts
 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000
 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000*Expected
 Result:*
 msisdneventTimestampzoneIdsprev_zoneIdsts
 67394495

Re: Performance problem with FlinkSQL

2025-02-20 Thread Guillermo Ortiz Fernández
I have been configuring the Flink cluster (application mode) to process the
Kafka data volume. The current configuration consists of 16 pods, each
with *12GB
of memory and 2 CPUs*. Each TaskManager has *4 slots*.

All processing is done using *Flink SQL*, and since Kafka topics may
contain out-of-order data, a *5-minute watermark* is applied. In previous
configurations, we allocated between *4-8GB per pod*, but this caused
memory errors that led to process crashes.

Currently, this process is a *migration from Kafka Streams*, which is
running in parallel, consuming exactly the same data on the same
infrastructure, but using *significantly fewer resources than Flink*.
Approximately, Flink is consuming *10x more memory* and *8x more CPU* than
Kafka Streams.

I am unsure whether this is due to Flink inherently consuming more
resources, whether using SQL results in poorly optimized auto-generated
code, or if there is a configuration issue. The *metrics do not show any
backpressure or errors*, and the process has been running for several days
without failures, although with *high CPU and memory usage*.

Any ideas on how to diagnose or optimize this to reduce resource
consumption?

El jue, 30 ene 2025 a las 10:02, Pedro Mázala ()
escribió:

> > The average output rate needed to avoid lag after filtering messages
> should be around 60K messages per second. I’ve been testing different
> configurations of parallelism, slots and pods (everything runs on
> Kubernetes), but I’m far from achieving those numbers.
>
> How are you partitioning your query? Do you see any backpressure happening
> on the Flink UI?
>
> > In the latest configuration, I used 20 pods, a parallelism of 120, with
> 4 slots per taskmanager.
>
> Were all tasks working properly or you had idleness?
>
> > Additionally, setting parallelism to 120 creates hundreds of subtasks
> for the smaller topics, which don’t do much but still consume minimal
> resources even if idle.
>
> On the table API I'm not sure if you can choose parallelism per "task"
> DataStream I'm sure you can do it.
>
>
>
> Att,
> Pedro Mázala
> +31 (06) 3819 3814
> Be awesome
>
>
> On Wed, 29 Jan 2025 at 22:06, Guillermo Ortiz Fernández <
> guillermo.ortiz.f...@gmail.com> wrote:
>
>> After last checking it uses about 200-400 millicores each pod and 2.2Gb.
>>
>> El mié, 29 ene 2025 a las 21:41, Guillermo Ortiz Fernández (<
>> guillermo.ortiz.f...@gmail.com>) escribió:
>>
>>> I have a job entirely written in Flink SQL. The first part of the
>>> program processes 10 input topics and generates one output topic with
>>> normalized messages and some filtering applied (really easy, some where by
>>> fields and substring). Nine of the topics produce between hundreds and
>>> thousands of messages per second, with an average of 4–10 partitions each.
>>> The other topic produces 150K messages per second and has 500 partitions.
>>> They are unioned to the output topic.
>>>
>>> The average output rate needed to avoid lag after filtering messages
>>> should be around 60K messages per second. I’ve been testing different
>>> configurations of parallelism, slots and pods (everything runs on
>>> Kubernetes), but I’m far from achieving those numbers.
>>>
>>> In the latest configuration, I used 20 pods, a parallelism of 120, with
>>> 4 slots per taskmanager. With this setup, I achieve approximately 20K
>>> messages per second, but I’m unable to consume the largest topic at the
>>> rate messages are being produced. Additionally, setting parallelism to 120
>>> creates hundreds of subtasks for the smaller topics, which don’t do much
>>> but still consume minimal resources even if idle.
>>>
>>> I started trying with parallelism 12 and got about 1000-3000 messages
>>> per second.
>>>
>>> When I check the use of cpu and memory to the pods and don't see any
>>> problem and they are far from the limit, each taskmanager has 4gb and
>>> 2cpus and they are never close to using the cpu.
>>>
>>> It’s a requirement to do everything 100% in SQL. How can I improve the
>>> throughput rate? Should I be concerned about the hundreds of subtasks
>>> created for the smaller topics?
>>>
>>