Re: How to read a savepoint fast without exploding the memory

2025-02-04 Thread Gabor Somogyi
What I could imagine is to create a normal Flink job,
use execution.state-recovery.path=/path/to/savepoint
set the operator UID on a custom written operator, which opens the state
info for you.

The only drawback is that you must know the keyBy range... this can be
problematic but if you can do it it's a win :)

G


On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin  wrote:

> Hi Gabor,
>
> I thought so. I was hoping for a way to read the savepoint in pages,
> instead of as a single blob up front which I think is what the hashmap
> does... we just want to be called for each entry and extract the bit we
> want in that scenario.
>
> Never mind
>
> Thank you for the insight. Saves me a lot of hunting for nothing.
>
> JM
>
> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi 
> wrote:
>
>> Hi Jean-Marc,
>>
>> We've already realized that the RocksDB approach is not reaching the
>> performance criteria which it should be. There is an open issue for it [1].
>> The hashmap based approach was and is always expecting more memory. So if
>> the memory footprint is a hard requirement then RocksDB is the only way now.
>>
>> Bad to say but I can't suggest any nifty trick to make it better. All I
>> can promise that I'm now measuring performance of the RocksDB approach
>> and intended to eliminate the slowness. Since we don't know what exactly
>> causes the slowness the new Frocksdb-8.10.0 can be also an imrpvement.
>>
>> All in all it will take some time to sort this out.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-37109
>>
>> BR,
>> G
>>
>>
>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin 
>> wrote:
>>
>>> What would be the best approach to read a savepoint and minimise the
>>> memory consumption. We just need to transform it into something else for
>>> investigation.
>>>
>>> Our flink 1.20 streaming job is using HashMap backend, and is spread
>>> over 6 task slots in 6 pods (under k8s). Savepoints are saved on S3. A
>>> savepoint can be 4-5Gb or more.
>>>
>>> The reader is more basic, using a Local Execution EnvironmentThis is
>>> essentially what we are doing:
>>>
>>> StreamExecutionEnvironment env =
>>> LocalStreamEnvironment.getExecutionEnvironment();
>>> env.setParallelism(1);
>>>
>>> SavepointReader savepoint =
>>> SavepointReader.read(env, savepointLocation, new
>>> HashMapStateBackend());
>>> // SavepointReader.read(env, savepointLocation, new
>>> EmbeddedRocksDBStateBackend()); // Too slow
>>>
>>> DataStream
>>> mainOperatorState =
>>> savepoint.readKeyedState(
>>> MAIN_OPERATOR,
>>> new
>>> StateManagerJsonNodeReaderFunction<>(StateManager.class));
>>>
>>>
>>> CloseableIterator
>>> stateReader = mainOperatorState.executeAndCollect()
>>> stateReader.forEachRemaining( record -> { ...
>>> /// extract what we need here
>>> }
>>>
>>>
>>> We tried two approaches:
>>> - One is to read the savepoint with a rockDb backend. That works and is
>>> low on memory usage, but is very very slow. We noticed the iterator is
>>> available very early on, but it is slow...
>>> - The other is to read the savepoint with a HashMap backend. That is
>>> very fast, as expected. However the iterator apparently only returns once
>>> the whole savepoint has been loaded in the HashMap, so heavy memory
>>> consumption.
>>>
>>> Is there a better way to do that? or a way to tune it so that it does
>>> not consume all the memory ? or maybe reading it in parts...
>>>
>>> Thanks
>>>
>>> JM
>>>
>>


Re: How to read a savepoint fast without exploding the memory

2025-02-04 Thread Jean-Marc Paulin
That's a good idea, Sadly I have no control over the keys

I was going to patch Flink with the suggestion in FLINK-37109
 first to see how that
goes. If that brings RockDb performance in an acceptable range for us we
might go that way. I really like the light memory consumption of RockDb for
that kind of side job.

Thanks

JM

On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi 
wrote:

> What I could imagine is to create a normal Flink job,
> use execution.state-recovery.path=/path/to/savepoint
> set the operator UID on a custom written operator, which opens the state
> info for you.
>
> The only drawback is that you must know the keyBy range... this can be
> problematic but if you can do it it's a win :)
>
> G
>
>
> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin  wrote:
>
>> Hi Gabor,
>>
>> I thought so. I was hoping for a way to read the savepoint in pages,
>> instead of as a single blob up front which I think is what the hashmap
>> does... we just want to be called for each entry and extract the bit we
>> want in that scenario.
>>
>> Never mind
>>
>> Thank you for the insight. Saves me a lot of hunting for nothing.
>>
>> JM
>>
>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi 
>> wrote:
>>
>>> Hi Jean-Marc,
>>>
>>> We've already realized that the RocksDB approach is not reaching the
>>> performance criteria which it should be. There is an open issue for it [1].
>>> The hashmap based approach was and is always expecting more memory. So
>>> if the memory footprint is a hard requirement then RocksDB is the only way
>>> now.
>>>
>>> Bad to say but I can't suggest any nifty trick to make it better. All I
>>> can promise that I'm now measuring performance of the RocksDB approach
>>> and intended to eliminate the slowness. Since we don't know what exactly
>>> causes the slowness the new Frocksdb-8.10.0 can be also an imrpvement.
>>>
>>> All in all it will take some time to sort this out.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-37109
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin 
>>> wrote:
>>>
 What would be the best approach to read a savepoint and minimise the
 memory consumption. We just need to transform it into something else for
 investigation.

 Our flink 1.20 streaming job is using HashMap backend, and is spread
 over 6 task slots in 6 pods (under k8s). Savepoints are saved on S3. A
 savepoint can be 4-5Gb or more.

 The reader is more basic, using a Local Execution EnvironmentThis is
 essentially what we are doing:

 StreamExecutionEnvironment env =
 LocalStreamEnvironment.getExecutionEnvironment();
 env.setParallelism(1);

 SavepointReader savepoint =
 SavepointReader.read(env, savepointLocation, new
 HashMapStateBackend());
 // SavepointReader.read(env, savepointLocation, new
 EmbeddedRocksDBStateBackend()); // Too slow

 DataStream
 mainOperatorState =
 savepoint.readKeyedState(
 MAIN_OPERATOR,
 new
 StateManagerJsonNodeReaderFunction<>(StateManager.class));


 CloseableIterator
 stateReader = mainOperatorState.executeAndCollect()
 stateReader.forEachRemaining( record -> { ...
 /// extract what we need here
 }


 We tried two approaches:
 - One is to read the savepoint with a rockDb backend. That works and
 is low on memory usage, but is very very slow. We noticed the iterator is
 available very early on, but it is slow...
 - The other is to read the savepoint with a HashMap backend. That is
 very fast, as expected. However the iterator apparently only returns once
 the whole savepoint has been loaded in the HashMap, so heavy memory
 consumption.

 Is there a better way to do that? or a way to tune it so that it does
 not consume all the memory ? or maybe reading it in parts...

 Thanks

 JM

>>>


Re: How to read a savepoint fast without exploding the memory

2025-02-04 Thread Gabor Somogyi
Just to give an update. I've applied the mentioned patch and the execution
time drastically decreased (the gain is 98.9%):

2025-02-04 16:52:54,448 INFO  o.a.f.e.s.r.FlinkTestStateReader
  [] - Execution time: PT14.690426S

I need to double check what that would mean to correctness and all other
aspects.

G


On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi 
wrote:

> Please report back on how the patch behaves including any side effects.
>
> Now I'm in testing the state reading with processor API vs the mentioned
> job where we control the keys.
> The difference is extreme, especially because the numbers are coming from
> reading ~40Mb state file😅
>
> 2025-02-04 13:21:53,580 INFO  o.a.f.e.s.r.FlinkTestStateReader
>[] - Execution time: PT22M24.612954S
> ...
> 2025-02-04 13:39:14,704 INFO  o.a.f.e.s.r.FlinkTestStateReaderJob 
>[] - Execution time: PT6.930659S
>
> Don't need to mention that the bigger is the processor API.
>
> G
>
>
> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin  wrote:
>
>> That's a good idea, Sadly I have no control over the keys
>>
>> I was going to patch Flink with the suggestion in FLINK-37109
>>  first to see how
>> that goes. If that brings RockDb performance in an acceptable range for us
>> we might go that way. I really like the light memory consumption of RockDb
>> for that kind of side job.
>>
>> Thanks
>>
>> JM
>>
>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi 
>> wrote:
>>
>>> What I could imagine is to create a normal Flink job,
>>> use execution.state-recovery.path=/path/to/savepoint
>>> set the operator UID on a custom written operator, which opens the state
>>> info for you.
>>>
>>> The only drawback is that you must know the keyBy range... this can be
>>> problematic but if you can do it it's a win :)
>>>
>>> G
>>>
>>>
>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin 
>>> wrote:
>>>
 Hi Gabor,

 I thought so. I was hoping for a way to read the savepoint in pages,
 instead of as a single blob up front which I think is what the hashmap
 does... we just want to be called for each entry and extract the bit we
 want in that scenario.

 Never mind

 Thank you for the insight. Saves me a lot of hunting for nothing.

 JM

 On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi <
 gabor.g.somo...@gmail.com> wrote:

> Hi Jean-Marc,
>
> We've already realized that the RocksDB approach is not reaching the
> performance criteria which it should be. There is an open issue for it 
> [1].
> The hashmap based approach was and is always expecting more memory. So
> if the memory footprint is a hard requirement then RocksDB is the only way
> now.
>
> Bad to say but I can't suggest any nifty trick to make it better. All
> I can promise that I'm now measuring performance of the RocksDB approach
> and intended to eliminate the slowness. Since we don't know what
> exactly causes the slowness the new Frocksdb-8.10.0 can be also an
> imrpvement.
>
> All in all it will take some time to sort this out.
>
> [1] https://issues.apache.org/jira/browse/FLINK-37109
>
> BR,
> G
>
>
> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin 
> wrote:
>
>> What would be the best approach to read a savepoint and minimise the
>> memory consumption. We just need to transform it into something else for
>> investigation.
>>
>> Our flink 1.20 streaming job is using HashMap backend, and is spread
>> over 6 task slots in 6 pods (under k8s). Savepoints are saved on S3. A
>> savepoint can be 4-5Gb or more.
>>
>> The reader is more basic, using a Local Execution EnvironmentThis is
>> essentially what we are doing:
>>
>> StreamExecutionEnvironment env =
>> LocalStreamEnvironment.getExecutionEnvironment();
>> env.setParallelism(1);
>>
>> SavepointReader savepoint =
>> SavepointReader.read(env, savepointLocation, new
>> HashMapStateBackend());
>> // SavepointReader.read(env, savepointLocation, new
>> EmbeddedRocksDBStateBackend()); // Too slow
>>
>>
>> DataStream
>> mainOperatorState =
>> savepoint.readKeyedState(
>> MAIN_OPERATOR,
>> new
>> StateManagerJsonNodeReaderFunction<>(StateManager.class));
>>
>>
>> CloseableIterator
>> stateReader = mainOperatorState.executeAndCollect()
>> stateReader.forEachRemaining( record -> { ...
>> /// extract what we need here
>> }
>>
>>
>> We tried two approaches:
>> - One is to read the savepoint with a rockDb backend. That works and
>> is low on memory usage, but is very very slow. We noticed the iterator is
>> available very early on, but it is slow...
>> - The other is to read the savepoint with a Ha

Re: Dead Letter Queue for FlinkSQL

2025-02-04 Thread Zhanghao Chen
You'll need to implement an custom sink for that.


Best,
Zhanghao Chen

From: Ilya Karpov 
Sent: Monday, February 3, 2025 18:30
To: user 
Subject: Dead Letter Queue for FlinkSQL

Hi there,

Because sink connectors can throw exceptions in real time (for example, due to 
constraint violation) it's a good practice to have DLQ to continue data 
processing. Is there any way in FlinkSQL to implement/configure DLQ?

Thanks forward!


Unsubscribe

2025-02-04 Thread Mujahid Niaz
Unsubscribe


Re: Paimon missing CatalogFactory

2025-02-04 Thread Yanquan Lv
Hi, Dominik.

It seems that you only used Paimon's version number, But actually you
should include the version number of Flink, like
org.apache.paimon:paimon-flink-1.19:1.0.0.

You can see the main difference from the content of the two links below:
[1] https://mvnrepository.com/artifact/org.apache.paimon/paimon-flink/1.0.0
[2]
https://mvnrepository.com/artifact/org.apache.paimon/paimon-flink-1.19/1.0.0

 于2025年1月31日周五 22:57写道:

> Hi Flink Community
>
>
>
> I’m currently trying to get Paimon running in our k8s environment with
> Flink SQL (v1.19.1). I added the dependencies to our Flink SQL-Runner as
> well as the lib folder of the docker image:
>
>
>
> "org.apache.paimon"   %   "paimon-flink"  % "1.0.0",
> "org.apache.paimon"   %   "paimon-s3" % "1.0.0"
>
>
>
> runRaw(s"curl --output-dir /opt/flink/lib/ -sLO 
> https://artifactory.company.com/artifactory/maven-remote/org/apache/paimon/paimon-flink/*$*paimonVersion/paimon-flink-*$*paimonVersion.jar
>  
> ")
> runRaw(s"curl --output-dir /opt/flink/lib/ -sLO 
> https://artifactory.company.com/artifactory/maven-remote/org/apache/paimon/paimon-s3/*$*paimonVersion/paimon-s3-*$*paimonVersion.jar
>  
> ")
>
>
>
> Then I try to create a Catalog like:
>
>
>
> tEnv.executeSql(
>   """CREATE CATALOG paimon WITH (
> | 'type'='paimon',
> | 'warehouse'='s3://tmp/paimon',
> | 's3.access-key'='',
> | 's3.secret-key'=''
> |);
> |""".stripMargin)
>
>
>
> Unfortunately, what I end up with is the following error message:
>
>
>
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'paimon' that implements
> 'org.apache.flink.table.factories.CatalogFactory' in the classpath.
>
>
>
> Available factory identifiers are:
>
>
>
> generic_in_memory
>
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:617)
>
> at
> org.apache.flink.table.factories.FactoryUtil.getCatalogFactory(FactoryUtil.java:795)
>
> at
> org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:492)
>
>
>
>
>
> The error is the same when running in a TableEnvironment test or when
> running on k8s.
>
>
>
> Does anyone have an idea what is wrong here? Somehow it seems that my job
> can’t find the appropriate META-INF.services ->
> org.apache.flink.table.factories.Factory entry of the Paimon dependency?
>
>
>
> Kind Regards
>
> Dominik
>


Re: Flink High Availability Data Cleanup

2025-02-04 Thread Zhanghao Chen
Hi Yang,

When the job failed temporarily, e.g. due to single machine failure, Flink will 
retain the HA metadata and try to recover. However, when the job has already 
reached the terminal failed status (controlled by the restart strategy [1]), 
Flink will delete all metadata and exit. In your case, you might want to revise 
the restart strategy of the job to avoid entering the terminal failed status 
too quickly.

The two options are apocryphal. Don't trust LLMs too much :)

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#restart-strategies

Best,
Zhanghao Chen

From: Chen Yang via user 
Sent: Wednesday, February 5, 2025 7:17
To: user@flink.apache.org 
Cc: Vignesh Chandramohan 
Subject: Flink High Availability Data Cleanup

Hi Flink Community,

I'm running the Flink jobs (standalone mode) with high availability in 
Kubernetes (Flink version 1.17.2). The job is deployed with two job managers. I 
noticed that the leader job manager would delete the ConfigMap when the job 
failed and restarted. Thus the standby job manager couldn't recover the jobId 
and checkpoint from the ConfigMap. And the job started with a fresh state. 
While from the Flink docs 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/#high-availability-data-clean-up,
 it mentions that HA related ConfigMaps would be retained and job would 
recovered from the checkpoints stored in the ConfigMaps. Looks like the Flink 
doesn't work as described. Are there some configs to persist the configmap when 
the job fails or restarts?

During my search via Google and ChatGPT, it recommends the following 2 configs 
to keep the configmap during job cleanup. But I can't find any Flink docs 
mentioning these configurations nor in the Flink code. Please advise!

high-availability.cleanup-on-shutdown

or

kubernetes.jobmanager.cleanup-ha-metadata

Thanks,
Chen



--
[https://s3.us-west-2.amazonaws.com/doordash-static/media/email-signatures/doordash-icon.png]
Chen Yang
Software Engineer, Data Infrastructure

DoorDash.com


Re: Unsubscribe

2025-02-04 Thread Zhanghao Chen
Please send email to user-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from user@flink.apache.org.

Best,
Zhanghao Chen

From: Mujahid Niaz 
Sent: Wednesday, February 5, 2025 9:29
Cc: user@flink.apache.org 
Subject: Unsubscribe

Unsubscribe


Re: How to read a savepoint fast without exploding the memory

2025-02-04 Thread Gabor Somogyi
Hi Jean-Marc,

We've already realized that the RocksDB approach is not reaching the
performance criteria which it should be. There is an open issue for it [1].
The hashmap based approach was and is always expecting more memory. So if
the memory footprint is a hard requirement then RocksDB is the only way now.

Bad to say but I can't suggest any nifty trick to make it better. All I can
promise that I'm now measuring performance of the RocksDB approach
and intended to eliminate the slowness. Since we don't know what exactly
causes the slowness the new Frocksdb-8.10.0 can be also an imrpvement.

All in all it will take some time to sort this out.

[1] https://issues.apache.org/jira/browse/FLINK-37109

BR,
G


On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin 
wrote:

> What would be the best approach to read a savepoint and minimise the
> memory consumption. We just need to transform it into something else for
> investigation.
>
> Our flink 1.20 streaming job is using HashMap backend, and is spread over
> 6 task slots in 6 pods (under k8s). Savepoints are saved on S3. A savepoint
> can be 4-5Gb or more.
>
> The reader is more basic, using a Local Execution EnvironmentThis is
> essentially what we are doing:
>
> StreamExecutionEnvironment env =
> LocalStreamEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
>
> SavepointReader savepoint =
> SavepointReader.read(env, savepointLocation, new
> HashMapStateBackend());
> // SavepointReader.read(env, savepointLocation, new
> EmbeddedRocksDBStateBackend()); // Too slow
>
> DataStream
> mainOperatorState =
> savepoint.readKeyedState(
> MAIN_OPERATOR,
> new StateManagerJsonNodeReaderFunction<>(StateManager.class));
>
>
> CloseableIterator
> stateReader = mainOperatorState.executeAndCollect()
> stateReader.forEachRemaining( record -> { ...
> /// extract what we need here
> }
>
>
> We tried two approaches:
> - One is to read the savepoint with a rockDb backend. That works and is
> low on memory usage, but is very very slow. We noticed the iterator is
> available very early on, but it is slow...
> - The other is to read the savepoint with a HashMap backend. That is very
> fast, as expected. However the iterator apparently only returns once the
> whole savepoint has been loaded in the HashMap, so heavy memory consumption.
>
> Is there a better way to do that? or a way to tune it so that it does not
> consume all the memory ? or maybe reading it in parts...
>
> Thanks
>
> JM
>


Re: How to read a savepoint fast without exploding the memory

2025-02-04 Thread Gabor Somogyi
Please report back on how the patch behaves including any side effects.

Now I'm in testing the state reading with processor API vs the mentioned
job where we control the keys.
The difference is extreme, especially because the numbers are coming from
reading ~40Mb state file😅

2025-02-04 13:21:53,580 INFO  o.a.f.e.s.r.FlinkTestStateReader
  [] - Execution time: PT22M24.612954S
...
2025-02-04 13:39:14,704 INFO  o.a.f.e.s.r.FlinkTestStateReaderJob
  [] - Execution time: PT6.930659S

Don't need to mention that the bigger is the processor API.

G


On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin  wrote:

> That's a good idea, Sadly I have no control over the keys
>
> I was going to patch Flink with the suggestion in FLINK-37109
>  first to see how that
> goes. If that brings RockDb performance in an acceptable range for us we
> might go that way. I really like the light memory consumption of RockDb for
> that kind of side job.
>
> Thanks
>
> JM
>
> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi 
> wrote:
>
>> What I could imagine is to create a normal Flink job,
>> use execution.state-recovery.path=/path/to/savepoint
>> set the operator UID on a custom written operator, which opens the state
>> info for you.
>>
>> The only drawback is that you must know the keyBy range... this can be
>> problematic but if you can do it it's a win :)
>>
>> G
>>
>>
>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin  wrote:
>>
>>> Hi Gabor,
>>>
>>> I thought so. I was hoping for a way to read the savepoint in pages,
>>> instead of as a single blob up front which I think is what the hashmap
>>> does... we just want to be called for each entry and extract the bit we
>>> want in that scenario.
>>>
>>> Never mind
>>>
>>> Thank you for the insight. Saves me a lot of hunting for nothing.
>>>
>>> JM
>>>
>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi 
>>> wrote:
>>>
 Hi Jean-Marc,

 We've already realized that the RocksDB approach is not reaching the
 performance criteria which it should be. There is an open issue for it [1].
 The hashmap based approach was and is always expecting more memory. So
 if the memory footprint is a hard requirement then RocksDB is the only way
 now.

 Bad to say but I can't suggest any nifty trick to make it better. All I
 can promise that I'm now measuring performance of the RocksDB approach
 and intended to eliminate the slowness. Since we don't know what
 exactly causes the slowness the new Frocksdb-8.10.0 can be also an
 imrpvement.

 All in all it will take some time to sort this out.

 [1] https://issues.apache.org/jira/browse/FLINK-37109

 BR,
 G


 On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin 
 wrote:

> What would be the best approach to read a savepoint and minimise the
> memory consumption. We just need to transform it into something else for
> investigation.
>
> Our flink 1.20 streaming job is using HashMap backend, and is spread
> over 6 task slots in 6 pods (under k8s). Savepoints are saved on S3. A
> savepoint can be 4-5Gb or more.
>
> The reader is more basic, using a Local Execution EnvironmentThis is
> essentially what we are doing:
>
> StreamExecutionEnvironment env =
> LocalStreamEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
>
> SavepointReader savepoint =
> SavepointReader.read(env, savepointLocation, new
> HashMapStateBackend());
> // SavepointReader.read(env, savepointLocation, new
> EmbeddedRocksDBStateBackend()); // Too slow
>
> DataStream
> mainOperatorState =
> savepoint.readKeyedState(
> MAIN_OPERATOR,
> new
> StateManagerJsonNodeReaderFunction<>(StateManager.class));
>
>
> CloseableIterator
> stateReader = mainOperatorState.executeAndCollect()
> stateReader.forEachRemaining( record -> { ...
> /// extract what we need here
> }
>
>
> We tried two approaches:
> - One is to read the savepoint with a rockDb backend. That works and
> is low on memory usage, but is very very slow. We noticed the iterator is
> available very early on, but it is slow...
> - The other is to read the savepoint with a HashMap backend. That is
> very fast, as expected. However the iterator apparently only returns once
> the whole savepoint has been loaded in the HashMap, so heavy memory
> consumption.
>
> Is there a better way to do that? or a way to tune it so that it does
> not consume all the memory ? or maybe reading it in parts...
>
> Thanks
>
> JM
>



Unsubscribe

2025-02-04 Thread Lin Hou



How to read a savepoint fast without exploding the memory

2025-02-04 Thread Jean-Marc Paulin
What would be the best approach to read a savepoint and minimise the memory
consumption. We just need to transform it into something else for
investigation.

Our flink 1.20 streaming job is using HashMap backend, and is spread over 6
task slots in 6 pods (under k8s). Savepoints are saved on S3. A savepoint
can be 4-5Gb or more.

The reader is more basic, using a Local Execution EnvironmentThis is
essentially what we are doing:

StreamExecutionEnvironment env =
LocalStreamEnvironment.getExecutionEnvironment();
env.setParallelism(1);

SavepointReader savepoint =
SavepointReader.read(env, savepointLocation, new
HashMapStateBackend());
// SavepointReader.read(env, savepointLocation, new
EmbeddedRocksDBStateBackend()); // Too slow

DataStream
mainOperatorState =
savepoint.readKeyedState(
MAIN_OPERATOR,
new StateManagerJsonNodeReaderFunction<>(StateManager.class));


CloseableIterator
stateReader = mainOperatorState.executeAndCollect()
stateReader.forEachRemaining( record -> { ...
/// extract what we need here
}


We tried two approaches:
- One is to read the savepoint with a rockDb backend. That works and is low
on memory usage, but is very very slow. We noticed the iterator is
available very early on, but it is slow...
- The other is to read the savepoint with a HashMap backend. That is very
fast, as expected. However the iterator apparently only returns once the
whole savepoint has been loaded in the HashMap, so heavy memory consumption.

Is there a better way to do that? or a way to tune it so that it does not
consume all the memory ? or maybe reading it in parts...

Thanks

JM


Flink High Availability Data Cleanup

2025-02-04 Thread Chen Yang via user
Hi Flink Community,

I'm running the Flink jobs (standalone mode) with high availability in
Kubernetes (Flink version 1.17.2). The job is deployed with two job
managers. I noticed that the leader job manager would delete the ConfigMap
when the job failed and restarted. Thus the standby job manager couldn't
recover the jobId and checkpoint from the ConfigMap. And the job started
with a fresh state. While from the Flink docs
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/#high-availability-data-clean-up,
it mentions that HA related ConfigMaps would be retained and job would
recovered from the checkpoints stored in the ConfigMaps. Looks like the
Flink doesn't work as described. Are there some configs to persist the
configmap when the job fails or restarts?

During my search via Google and ChatGPT, it recommends the following 2
configs to keep the configmap during job cleanup. But I can't find any
Flink docs mentioning these configurations nor in the Flink code. Please
advise!

high-availability.cleanup-on-shutdown

or

kubernetes.jobmanager.cleanup-ha-metadata

Thanks,
Chen



-- 

Chen Yang
Software Engineer, Data Infrastructure

DoorDash.com 


Re: How to read a savepoint fast without exploding the memory

2025-02-04 Thread Jean-Marc Paulin
Hi Gabor,

I thought so. I was hoping for a way to read the savepoint in pages,
instead of as a single blob up front which I think is what the hashmap
does... we just want to be called for each entry and extract the bit we
want in that scenario.

Never mind

Thank you for the insight. Saves me a lot of hunting for nothing.

JM

On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi 
wrote:

> Hi Jean-Marc,
>
> We've already realized that the RocksDB approach is not reaching the
> performance criteria which it should be. There is an open issue for it [1].
> The hashmap based approach was and is always expecting more memory. So if
> the memory footprint is a hard requirement then RocksDB is the only way now.
>
> Bad to say but I can't suggest any nifty trick to make it better. All I
> can promise that I'm now measuring performance of the RocksDB approach
> and intended to eliminate the slowness. Since we don't know what exactly
> causes the slowness the new Frocksdb-8.10.0 can be also an imrpvement.
>
> All in all it will take some time to sort this out.
>
> [1] https://issues.apache.org/jira/browse/FLINK-37109
>
> BR,
> G
>
>
> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin 
> wrote:
>
>> What would be the best approach to read a savepoint and minimise the
>> memory consumption. We just need to transform it into something else for
>> investigation.
>>
>> Our flink 1.20 streaming job is using HashMap backend, and is spread over
>> 6 task slots in 6 pods (under k8s). Savepoints are saved on S3. A savepoint
>> can be 4-5Gb or more.
>>
>> The reader is more basic, using a Local Execution EnvironmentThis is
>> essentially what we are doing:
>>
>> StreamExecutionEnvironment env =
>> LocalStreamEnvironment.getExecutionEnvironment();
>> env.setParallelism(1);
>>
>> SavepointReader savepoint =
>> SavepointReader.read(env, savepointLocation, new
>> HashMapStateBackend());
>> // SavepointReader.read(env, savepointLocation, new
>> EmbeddedRocksDBStateBackend()); // Too slow
>>
>> DataStream
>> mainOperatorState =
>> savepoint.readKeyedState(
>> MAIN_OPERATOR,
>> new StateManagerJsonNodeReaderFunction<>(StateManager.class));
>>
>>
>> CloseableIterator
>> stateReader = mainOperatorState.executeAndCollect()
>> stateReader.forEachRemaining( record -> { ...
>> /// extract what we need here
>> }
>>
>>
>> We tried two approaches:
>> - One is to read the savepoint with a rockDb backend. That works and is
>> low on memory usage, but is very very slow. We noticed the iterator is
>> available very early on, but it is slow...
>> - The other is to read the savepoint with a HashMap backend. That is
>> very fast, as expected. However the iterator apparently only returns once
>> the whole savepoint has been loaded in the HashMap, so heavy memory
>> consumption.
>>
>> Is there a better way to do that? or a way to tune it so that it does not
>> consume all the memory ? or maybe reading it in parts...
>>
>> Thanks
>>
>> JM
>>
>