Hello, I'm writing a test for my custom sink function. The function is stateful and relies on checkpoint restores for maintaining consistency with the external system that it's writing to. For integration testing of the sink function, I have a MiniCluster based environment inside a single JVM through which I create my job and validate its operation.
In order to test the checkpoint restore behavior with precision, I've disabled checkpointing and am instead using savepoints. So, my test proceeds as follows: 1. Start a job. 2. Push some data through it to the sink and to an external system. 3. Trigger a savepoint. 4. Push more data. 5. Cancel the job. 6. Restore from the savepoint captured in step 3 above. I can't seem to find a Java API for restoring a job from a savepoint. The approach in the documentation and other resources is to use the CLI, which is not an option for me. Currently, I create a RemoteStreamEnvironment with savepointRestoreSettings set, but when I run execute(), I get the following error: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. var savepointDir = restClusterClient_.triggerSavepoint(jobId, tmpdir).get(); assertTrue(!savepointDir.isBlank()); // Cancel the job and launch a new one from the save point. restClusterClient_.cancel(jobId).get(); var restoreSettings = SavepointRestoreSettings.forPath(savepointDir); var env = new RemoteStreamEnvironment( flinkMiniCluster_.host(), flinkMiniCluster_.port(), null, new String[] {}, null, restoreSettings); var restoredJob = env.executeAsync(); Separately, is there a flink testing utility I could use for integration testing of state checkpointing and recovery? Thanks, Abhishek