Hi James, Coming back to your original question on how to restart jobs from savepoints/checkpoints on LocalStreamEnvironment (the one used in a debugger):
Out of the box LocalStreamEnvironment does not allow setting a snapshot path to resume the job from. The trick for me to do it anyway was to remodel the execute method and add a call to jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(fromSavepoint, true)) (fromSavepoint being the savepointPath) This is somewhat ugly but works (only ever used in debugger session, not in prod code). The remodeled execute method look like this (for Flink 1.13.0, and should be similar for other releases): [1] Feel free to get back with additional questions đ Thias [1] remodeled execute(âŠ) (scala): def execute(jobName: String): JobExecutionResult = { if (fromSavepoint != null && env.streamEnv.getJavaEnv.isInstanceOf[LocalStreamEnvironment]) { // transform the streaming program into a JobGraph val locEnv = env.streamEnv.getJavaEnv.asInstanceOf[LocalStreamEnvironment] val streamGraph = locEnv.getStreamGraph streamGraph.setJobName(jobName) val jobGraph = streamGraph.getJobGraph() jobGraph.setAllowQueuedScheduling(true) jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(fromSavepoint, true)) val configuration = new org.apache.flink.configuration.Configuration configuration.addAll(jobGraph.getJobConfiguration) configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0") // add (and override) the settings with what the user defined val cls = classOf[LocalStreamEnvironment] val cfgField = cls.getDeclaredField("configuration") cfgField.setAccessible(true) val cofg = cfgField.get(locEnv).asInstanceOf[org.apache.flink.configuration.Configuration] configuration.addAll(cofg) if (!configuration.contains(RestOptions.BIND_PORT)) configuration.setString(RestOptions.BIND_PORT, "0") val numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism) val cfg = new MiniClusterConfiguration.Builder().setConfiguration(configuration).setNumSlotsPerTaskManager(numSlotsPerTaskManager).build val miniCluster = new MiniCluster(cfg) try { miniCluster.start() configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress.get.getPort) return miniCluster.executeJobBlocking(jobGraph) } finally { // transformations.clear miniCluster.close() } } else { throw new InvalidParameterException("flink.stream-environment.from-savepoint may only be used for local debug execution") } } From: Piotr Nowojski <pnowoj...@apache.org> Sent: Donnerstag, 17. Februar 2022 09:23 To: Cristian Constantinescu <zei...@gmail.com> Cc: Sandys-Lumsdaine, James <james.sandys-lumsda...@systematica.com>; James Sandys-Lumsdaine <jas...@hotmail.com>; user@flink.apache.org Subject: Re: Basic questions about resuming stateful Flink jobs Hi James, > Do I copy the checkpoint into a savepoint directory and treat it like a > savepoint? You don't need to copy the checkpoint. Actually you can not do that, as checkpoints are not relocatable. But you can point to the checkpoint directory and resume from it like you would from a savepoint. Regarding the testing, I would suggest taking a look at the docs [1] and MiniClusterWithClientResource in particular. If you are using it, you can access the cluster client (MiniClusterWithClientResource#getClusterClient) and this client should be an equivalent of the CLI/Rest API. You can also use it to recover from savepoints - check for `setSavepointRestoreSettings` usage in [2]. But the real question would be why do you want to do it? You might not necessarily need to test for recovery at this level. From a user code perspective, it doesn't matter if you use checkpoint/savepoint, where it's stored. IMO what you want to do is to have: 1. Proper unit tests using TestHarness(es) Again, take a look at [1]. You can setup unit tests, process some records, carefully control timers, then call `AbstractStreamOperatorTestHarness#snapshot` to take snapshot and `AbstractStreamOperatorTestHarness#initializeState` to test the recovery code path. For examples you can take a look at usages of those methods in the Flink code base. For example [3]. 2. Later, I would recommend complementing such unit tests with some end-to-end tests, that would make sure everything is integrated properly, that your cluster is configured correctly etc. Then you don't need to use MiniCluster, as you can simply use Rest API/CLI. But crucially you don't need to be so thorough with covering all of the cases on this level, especially the failure handling, as you can rely more on the unit tests. Having said that, you might want to have a test that kills/restarts one TM on an end-to-end level. Best, Piotrek [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/testing/ [2] https://github.com/apache/flink/blob/cd8ea8d5b207569f68acc5a3c8db95cd2ca47ba6/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java [3] https://github.com/apache/flink/blob/fdf40d2e0efe2eed77ca9633121691c8d1e744cb/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java Ćr., 16 lut 2022 o 21:57 Cristian Constantinescu <zei...@gmail.com<mailto:zei...@gmail.com>> napisaĆ(a): Hi James, I literally just went through what you're doing at my job. While I'm using Apache Beam and not the Flink api directly, the concepts still apply. TL;DR: it works as expected. What I did is I set up a kafka topic listener that always throws an exception if the last received message's timestamp is less than 5 minutes from when the processing happens (basically simulating a code fix after 5 minutes). Then I let the pipeline execute the normal processing and I'd send a message on the exception topic. I have set up flink to retry twice, Beam offers a flag (numberOfExecutionRetries) [1] but it boils down to one of the Flink flags here [2]. What that does is that once Flink encounters an exception, say for example like my exception throwing topic, it will restore itself from the last checkpoint which includes kafka offsets and other things that transforms might have in there. Effectively this replays the messages after the checkpoint, and of course, my exception is thrown again when it tries to reprocess that message. After the second try, Flink will give up and the Flink job will stop (just like if you cancel it). If ran in an IDE, process will stop, if ran on a Flink cluster, the job will stop. When a Flink job stops, it usually clears up its checkpoints, unless you externalize them, for Beam it's the externalizedCheckpointsEnabled flag set to true. Check the docs to see what that maps to. Then, when you restart the flink job, just add the -s Flink flag followed by the latest checkpoint path. If you're running from an IDE, say IntelliJ, you can still pass the -s flag to Main method launcher. We use a bash script to restart or Flink jobs in our UAT/PROD boxes for now, you can use this command: find "$PATH_WHERE_YOU_SAVE_STATE" -name "_metadata" -print0 | xargs -r -0 ls -1 -t | head -1 to find the latest checkpoint in that path. And you know where PATH_WHERE_YOU_SAVE_STATE is, because you have to specify it when you initially start the flink job. For Beam, that's the stateBackendStoragePath flag. This is going to pick up the latest checkpoint before the pipeline stopped and will continue from it with your updated jar that handles the exception properly. Also note that I think you can set all these flags with Java code. In Beam it's just adding to the Main method args parameter or adding them to the PipelineOptions once you build that object from args. I've never used the Flink libs, just the runner, but from [1] and [3] it looks like you can configure things in code if you prefer that. Hope it helps, Cristian [1] https://beam.apache.org/documentation/runners/flink/ [2] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/task_failure_recovery/ [3] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/savepoints/#configuration On Wed, Feb 16, 2022 at 12:28 PM Sandys-Lumsdaine, James <james.sandys-lumsda...@systematica.com<mailto:james.sandys-lumsda...@systematica.com>> wrote: Thanks for your reply, Piotr. Some follow on questions: >". Nevertheless you might consider enabling them as this allows you to >manually cancel the job if it enters an endless recovery/failure loop, fix the >underlying issue, and restart the job from the externalised checkpoint. How is this done? Are you saying the retained checkpoint (i.e. the last checkpoint that isnât deleted) can somehow be used when restarting the Flink application? If I am running in my IDE and just using the local streaming environment, how can I test my recovery code either with a retained checkpoint? All my attempts so far just say âNo checkpoint found during restore.â Do I copy the checkpoint into a savepoint directory and treat it like a savepoint? On the topic of savepoints, that web page [1] says I need to use âbin/flink savepointâ or âbin/flink stop --savepointPathâ â but again, if Iâm currently not running in a real cluster how else can I create and recover from the save points? From what Iâve read there is state, checkpoints and save points â all of them hold state - and currently I canât get any of these to restore when developing in an IDE and the program builds up all state from scratch. So what else do I need to do in my Java code to tell Flink to load a savepoint? Thanks, James. From: Piotr Nowojski <pnowoj...@apache.org<mailto:pnowoj...@apache.org>> Sent: 16 February 2022 16:36 To: James Sandys-Lumsdaine <jas...@hotmail.com<mailto:jas...@hotmail.com>> Cc: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: Basic questions about resuming stateful Flink jobs CAUTION: External email. The email originated outside of our company Hi James, Sure! The basic idea of checkpoints is that they are fully owned by the running job and used for failure recovery. Thus by default if you stopped the job, checkpoints are being removed. If you want to stop a job and then later resume working from the same point that it has previously stopped, you most likely want to use savepoints [1]. You can stop the job with a savepoint and later you can restart another job from that savepoint. Regarding the externalised checkpoints. Technically you could use them in the similar way, but there is no command like "take a checkpoint and stop the job". Nevertheless you might consider enabling them as this allows you to manually cancel the job if it enters an endless recovery/failure loop, fix the underlying issue, and restart the job from the externalised checkpoint. Best, Piotrek [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/savepoints/ Ćr., 16 lut 2022 o 16:44 James Sandys-Lumsdaine <jas...@hotmail.com<mailto:jas...@hotmail.com>> napisaĆ(a): Hi all, I have a 1.14 Flink streaming workflow with many stateful functions that has a FsStateBackend and checkpointed enabled, although I haven't set a location for the checkpointed state. I've really struggled to understand how I can stop my Flink job and restart it and ensure it carries off exactly where is left off by using the state or checkpoints or savepoints. This is not clearly explained in the book or the web documentation. Since I have no control over my Flink job id I assume I can not force Flink to pick up the state recorded under the jobId directory for the FsStateBackend. Therefore I thinkâ Flink should read back in the last checkpointed data but I don't understand how to force my program to read this in? Do I use retained checkpoints or not? How can I force my program either use the last checkpointed state (e.g. when running from my IDE, starting and stopping the program) or maybe force it not to read in the state and start completely fresh? The web documentation talks about bin/flink but I am running from my IDE so I want my Java code to control this progress using the Flink API in Java. Can anyone give me some basic pointers as I'm obviously missing something fundamental on how to allow my program to be stopped and started without losing all the state. Many thanks, James. The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. Any market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of Systematica Investments UK LLP, its parents, subsidiaries or affiliates. Systematica Investments UK LLP (âSIUKâ), which is authorised and regulated by the Financial Conduct Authority of the United Kingdom (the âFCAâ) is authorised and regulated by the Financial Conduct Authority and is registered with the U.S. Securities and Exchange Commission as an investment adviser under the Investment Advisers Act of 1940. Systematica Investments UK LLP is registered in England and Wales with a partnership number OC424197. Registered Office: Equitable House, 47 King William Street, London EC4R 9AF. Recipients of this communication should note that electronic communication, whether by email, website, SWIFT or otherwise, is an unsafe method of communication. Emails and SWIFT messages may be lost, delivered to the wrong address, intercepted or affected by delays, interference by third parties or viruses and their confidentiality, security and integrity cannot be guaranteed. None of SIGPL or any of its affiliates bear any liability or responsibility therefor. Please see the important information at www.systematica.com/disclaimer.<http://www.systematica.com/disclaimer> Please see the important information, including regarding the processing of personal data by Systematica, at www.systematica.com/PrivacyNotice<http://www.systematica.com/PrivacyNotice>. www.systematica.com<http://www.systematica.com/> Diese Nachricht ist ausschliesslich fĂŒr den Adressaten bestimmt und beinhaltet unter UmstĂ€nden vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewĂ€hrleistet werden kann, ĂŒbernehmen wir keine Haftung fĂŒr die GewĂ€hrung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtĂŒmlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller AnhĂ€nge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.