Hello,

I'm writing a test for my custom sink function.  The function is stateful
and relies on checkpoint restores for maintaining consistency with the
external system that it's writing to.  For integration testing of the sink
function, I have a MiniCluster based environment inside a single JVM
through which I create my job and validate its operation.

In order to test the checkpoint restore behavior with precision, I've
disabled checkpointing and am instead using savepoints.  So, my test
proceeds as follows:

1. Start a job.
2. Push some data through it to the sink and to an external system.
3. Trigger a savepoint.
4. Push more data.
5. Cancel the job.
6. Restore from the savepoint captured in step 3 above.

I can't seem to find a Java API for restoring a job from a savepoint.  The
approach in the documentation and other resources is to use the CLI, which
is not an option for me.  Currently, I create a RemoteStreamEnvironment
with savepointRestoreSettings set, but when I run execute(), I get the
following error:

java.lang.IllegalStateException: No operators defined in streaming
topology. Cannot execute.

var savepointDir =
    restClusterClient_.triggerSavepoint(jobId, tmpdir).get();
assertTrue(!savepointDir.isBlank());
// Cancel the job and launch a new one from the save point.
restClusterClient_.cancel(jobId).get();
var restoreSettings = SavepointRestoreSettings.forPath(savepointDir);
var env = new RemoteStreamEnvironment(
    flinkMiniCluster_.host(),
    flinkMiniCluster_.port(),
    null,
    new String[] {},
    null,
    restoreSettings);
var restoredJob = env.executeAsync();


Separately, is there a flink testing utility I could use for integration
testing of state checkpointing and recovery?

Thanks,
Abhishek

Reply via email to