Hi James,

Coming back to your original question on how to restart jobs from 
savepoints/checkpoints on LocalStreamEnvironment (the one used in a debugger):

Out of the box LocalStreamEnvironment does not allow setting a snapshot path to 
resume the job from.
The trick for me to do it anyway was to remodel the execute method and add a 
call to

jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(fromSavepoint,
 true))

(fromSavepoint being the savepointPath)

This is somewhat ugly but works (only ever used in debugger session, not in 
prod code).

The remodeled execute method look like this (for Flink 1.13.0, and should be 
similar for other releases): [1]


Feel free to get back with additional questions 😊

Thias

[1] remodeled execute(
) (scala):

  def execute(jobName: String): JobExecutionResult = {

    if (fromSavepoint != null && 
env.streamEnv.getJavaEnv.isInstanceOf[LocalStreamEnvironment]) {
      // transform the streaming program into a JobGraph
      val locEnv = env.streamEnv.getJavaEnv.asInstanceOf[LocalStreamEnvironment]
      val streamGraph = locEnv.getStreamGraph
      streamGraph.setJobName(jobName)

      val jobGraph = streamGraph.getJobGraph()
      jobGraph.setAllowQueuedScheduling(true)

      
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(fromSavepoint,
 true))

      val configuration = new org.apache.flink.configuration.Configuration
      configuration.addAll(jobGraph.getJobConfiguration)
      configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0")

      // add (and override) the settings with what the user defined
      val cls = classOf[LocalStreamEnvironment]
      val cfgField = cls.getDeclaredField("configuration")
      cfgField.setAccessible(true)
      val cofg = 
cfgField.get(locEnv).asInstanceOf[org.apache.flink.configuration.Configuration]
      configuration.addAll(cofg)


      if (!configuration.contains(RestOptions.BIND_PORT)) 
configuration.setString(RestOptions.BIND_PORT, "0")

      val numSlotsPerTaskManager = 
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
jobGraph.getMaximumParallelism)

      val cfg = new 
MiniClusterConfiguration.Builder().setConfiguration(configuration).setNumSlotsPerTaskManager(numSlotsPerTaskManager).build

      val miniCluster = new MiniCluster(cfg)

      try {
        miniCluster.start()
        configuration.setInteger(RestOptions.PORT, 
miniCluster.getRestAddress.get.getPort)
        return miniCluster.executeJobBlocking(jobGraph)
      } finally {
        //        transformations.clear
        miniCluster.close()
      }

    } else {
     throw new 
InvalidParameterException("flink.stream-environment.from-savepoint may only be 
used for local debug execution")
    }
  }






From: Piotr Nowojski <pnowoj...@apache.org>
Sent: Donnerstag, 17. Februar 2022 09:23
To: Cristian Constantinescu <zei...@gmail.com>
Cc: Sandys-Lumsdaine, James <james.sandys-lumsda...@systematica.com>; James 
Sandys-Lumsdaine <jas...@hotmail.com>; user@flink.apache.org
Subject: Re: Basic questions about resuming stateful Flink jobs

Hi James,

> Do I copy the checkpoint into a savepoint directory and treat it like a 
> savepoint?

You don't need to copy the checkpoint. Actually you can not do that, as 
checkpoints are not relocatable. But you can point to the checkpoint directory 
and resume from it like you would from a savepoint.

Regarding the testing, I would suggest taking a look at the docs [1] and 
MiniClusterWithClientResource in particular. If you are using it, you can 
access the cluster client (MiniClusterWithClientResource#getClusterClient) and 
this client should be an equivalent of the CLI/Rest API. You can also use it to 
recover from savepoints - check for `setSavepointRestoreSettings` usage in [2].

But the real question would be why do you want to do it? You might not 
necessarily need to test for recovery at this level. From a user code 
perspective, it doesn't matter if you use checkpoint/savepoint, where it's 
stored. IMO what you want to do is to have:

1. Proper unit tests using TestHarness(es)

Again, take a look at [1]. You can setup unit tests, process some records, 
carefully control timers, then call 
`AbstractStreamOperatorTestHarness#snapshot` to take snapshot and 
`AbstractStreamOperatorTestHarness#initializeState` to test the recovery code 
path. For examples you can take a look at usages of those methods in the Flink 
code base. For example [3].

2. Later, I would recommend complementing such unit tests with some end-to-end 
tests, that would make sure everything is integrated properly, that your 
cluster is configured correctly etc. Then you don't need to use MiniCluster, as 
you can simply use Rest API/CLI. But crucially you don't need to be so thorough 
with covering all of the cases on this level, especially the failure handling, 
as you can rely more on the unit tests. Having said that, you might want to 
have a test that kills/restarts one TM on an end-to-end level.

Best,
Piotrek

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/testing/
[2] 
https://github.com/apache/flink/blob/cd8ea8d5b207569f68acc5a3c8db95cd2ca47ba6/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
[3] 
https://github.com/apache/flink/blob/fdf40d2e0efe2eed77ca9633121691c8d1e744cb/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java

ƛr., 16 lut 2022 o 21:57 Cristian Constantinescu 
<zei...@gmail.com<mailto:zei...@gmail.com>> napisaƂ(a):
Hi James,

I literally just went through what you're doing at my job. While I'm using 
Apache Beam and not the Flink api directly, the concepts still apply. TL;DR: it 
works as expected.

What I did is I set up a kafka topic listener that always throws an exception 
if the last received message's timestamp is less than 5 minutes from when the 
processing happens (basically simulating a code fix after 5 minutes). Then I 
let the pipeline execute the normal processing and I'd send a message on the 
exception topic.

I have set up flink to retry twice, Beam offers a flag 
(numberOfExecutionRetries) [1] but it boils down to one of the Flink flags here 
[2]. What that does is that once Flink encounters an exception, say for example 
like my exception throwing topic, it will restore itself from the last 
checkpoint which includes kafka offsets and other things that transforms might 
have in there. Effectively this replays the messages after the checkpoint, and 
of course, my exception is thrown again when it tries to reprocess that 
message. After the second try, Flink will give up and the Flink job will stop 
(just like if you cancel it). If ran in an IDE, process will stop, if ran on a 
Flink cluster, the job will stop.

When a Flink job stops, it usually clears up its checkpoints, unless you 
externalize them, for Beam it's the externalizedCheckpointsEnabled flag set to 
true. Check the docs to see what that maps to.

Then, when you restart the flink job, just add the -s Flink flag followed by 
the latest checkpoint path. If you're running from an IDE, say IntelliJ, you 
can still pass the -s flag to Main method launcher.

We use a bash script to restart or Flink jobs in our UAT/PROD boxes for now, 
you can use this command: find "$PATH_WHERE_YOU_SAVE_STATE" -name "_metadata" 
-print0 | xargs -r -0 ls -1 -t | head -1 to find the latest checkpoint in that 
path. And you know where PATH_WHERE_YOU_SAVE_STATE is, because you have to 
specify it when you initially start the flink job. For Beam, that's the 
stateBackendStoragePath flag. This is going to pick up the latest checkpoint 
before the pipeline stopped and will continue from it with your updated jar 
that handles the exception properly.

Also note that I think you can set all these flags with Java code. In Beam it's 
just adding to the Main method args parameter or adding them to the 
PipelineOptions once you build that object from args. I've never used the Flink 
libs, just the runner, but from [1] and [3] it looks like you can configure 
things in code if you prefer that.

Hope it helps,
Cristian

[1] https://beam.apache.org/documentation/runners/flink/
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/task_failure_recovery/
[3] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/savepoints/#configuration


On Wed, Feb 16, 2022 at 12:28 PM Sandys-Lumsdaine, James 
<james.sandys-lumsda...@systematica.com<mailto:james.sandys-lumsda...@systematica.com>>
 wrote:
Thanks for your reply, Piotr.

Some follow on questions:
>". Nevertheless you might consider enabling them as this allows you to 
>manually cancel the job if it enters an endless recovery/failure loop, fix the 
>underlying issue, and restart the job from the externalised checkpoint.

How is this done? Are you saying the retained checkpoint (i.e. the last 
checkpoint that isn’t deleted) can somehow be used when restarting the Flink 
application? If I am running in my IDE and just using the local streaming 
environment, how can I test my recovery code either with a retained checkpoint? 
All my attempts so far just say “No checkpoint found during restore.” Do I copy 
the checkpoint into a savepoint directory and treat it like a savepoint?

On the topic of savepoints, that web page [1] says I need to use “bin/flink 
savepoint” or “bin/flink stop --savepointPath” – but again, if I’m currently 
not running in a real cluster how else can I create and recover from the save 
points?

From what I’ve read there is state, checkpoints and save points – all of them 
hold state - and currently I can’t get any of these to restore when developing 
in an IDE and the program builds up all state from scratch. So what else do I 
need to do in my Java code to tell Flink to load a savepoint?

Thanks,

James.


From: Piotr Nowojski <pnowoj...@apache.org<mailto:pnowoj...@apache.org>>
Sent: 16 February 2022 16:36
To: James Sandys-Lumsdaine <jas...@hotmail.com<mailto:jas...@hotmail.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Basic questions about resuming stateful Flink jobs

CAUTION: External email. The email originated outside of our company
Hi James,

Sure! The basic idea of checkpoints is that they are fully owned by the running 
job and used for failure recovery. Thus by default if you stopped the job, 
checkpoints are being removed. If you want to stop a job and then later resume 
working from the same point that it has previously stopped, you most likely 
want to use savepoints [1]. You can stop the job with a savepoint and later you 
can restart another job from that savepoint.

Regarding the externalised checkpoints. Technically you could use them in the 
similar way, but there is no command like "take a checkpoint and stop the job". 
Nevertheless you might consider enabling them as this allows you to manually 
cancel the job if it enters an endless recovery/failure loop, fix the 
underlying issue, and restart the job from the externalised checkpoint.

Best,
Piotrek

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/savepoints/

ƛr., 16 lut 2022 o 16:44 James Sandys-Lumsdaine 
<jas...@hotmail.com<mailto:jas...@hotmail.com>> napisaƂ(a):
Hi all,

I have a 1.14 Flink streaming workflow with many stateful functions that has a 
FsStateBackend and checkpointed enabled, although I haven't set a location for 
the checkpointed state.

I've really struggled to understand how I can stop my Flink job and restart it 
and ensure it carries off exactly where is left off by using the state or 
checkpoints or savepoints. This is not clearly explained in the book or the web 
documentation.

Since I have no control over my Flink job id I assume I can not force Flink to 
pick up the state recorded under the jobId directory for the FsStateBackend. 
Therefore I think​ Flink should read back in the last checkpointed data but I 
don't understand how to force my program to read this in? Do I use retained 
checkpoints or not? How can I force my program either use the last checkpointed 
state (e.g. when running from my IDE, starting and stopping the program) or 
maybe force it not to read in the state and start completely fresh?

The web documentation talks about bin/flink but I am running from my IDE so I 
want my Java code to control this progress using the Flink API in Java.

Can anyone give me some basic pointers as I'm obviously missing something 
fundamental on how to allow my program to be stopped and started without losing 
all the state.

Many thanks,

James.


The information transmitted is intended only for the person or entity to which 
it is addressed and may contain confidential and/or privileged material. Any 
review, retransmission, dissemination or other use of, or taking of any action 
in reliance upon, this information by persons or entities other than the 
intended recipient is prohibited. If you received this in error, please contact 
the sender and delete the material from any computer.

This communication is for informational purposes only. It is not intended as an 
offer or solicitation for the purchase or sale of any financial instrument or 
as an official confirmation of any transaction. Any market prices, data and 
other information are not warranted as to completeness or accuracy and are 
subject to change without notice. Any comments or statements made herein do not 
necessarily reflect those of Systematica Investments UK LLP, its parents, 
subsidiaries or affiliates.

Systematica Investments UK LLP (“SIUK”), which is authorised and regulated by 
the Financial Conduct Authority of the United Kingdom (the “FCA”) is authorised 
and regulated by the Financial Conduct Authority and is registered with the 
U.S. Securities and Exchange Commission as an investment adviser under the 
Investment Advisers Act of 1940.

Systematica Investments UK LLP is registered in England and Wales with a 
partnership number OC424197. Registered Office: Equitable House, 47 King 
William Street, London EC4R 9AF.

Recipients of this communication should note that electronic communication, 
whether by email, website, SWIFT or otherwise, is an unsafe method of 
communication. Emails and SWIFT messages may be lost, delivered to the wrong 
address, intercepted or affected by delays, interference by third parties or 
viruses and their confidentiality, security and integrity cannot be guaranteed. 
None of SIGPL or any of its affiliates bear any liability or responsibility 
therefor.

Please see the important information at 
www.systematica.com/disclaimer.<http://www.systematica.com/disclaimer>

Please see the important information, including regarding the processing of 
personal data by Systematica, at 
www.systematica.com/PrivacyNotice<http://www.systematica.com/PrivacyNotice>.

www.systematica.com<http://www.systematica.com/>
Diese Nachricht ist ausschliesslich fĂŒr den Adressaten bestimmt und beinhaltet 
unter UmstÀnden vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewĂ€hrleistet werden kann, ĂŒbernehmen wir keine 
Haftung fĂŒr die GewĂ€hrung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtĂŒmlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller AnhÀnge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to