You can study LocalStreamingFileSinkTest [1] for an example of how to
approach this. You can use the test harnesses [2], keeping in mind that

- initializeState is called during instance creation
- the provided context indicates if state is being restored from a snapshot
- snapshot is called when taking a checkpoint
- notifyOfCompletedCheckpoint is called when a checkpoint is complete

The outline of such a test might follow this pattern:

testHarness1.setup();
testHarness1.initializeState(initState);
testHarness1.open();

// setup state to checkpoint ...

// capture snapshot
snapshot = testHarness.snapshot(checkpointId, timestamp);

// process more data, the effects of which will be lost ...

// create a new test harness initialized with the state from the snapshot
testHarness2.setup();
testHarness2.initializeState(snapshot);
testHarness2.open();

// verify the state ...

David

[1]
https://github.com/apache/flink/blob/release-1.10/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators

On Thu, Jun 11, 2020 at 12:12 PM Abhishek Rai <abhis...@netspring.io> wrote:

> Hello,
>
> I'm writing a test for my custom sink function.  The function is stateful
> and relies on checkpoint restores for maintaining consistency with the
> external system that it's writing to.  For integration testing of the sink
> function, I have a MiniCluster based environment inside a single JVM
> through which I create my job and validate its operation.
>
> In order to test the checkpoint restore behavior with precision, I've
> disabled checkpointing and am instead using savepoints.  So, my test
> proceeds as follows:
>
> 1. Start a job.
> 2. Push some data through it to the sink and to an external system.
> 3. Trigger a savepoint.
> 4. Push more data.
> 5. Cancel the job.
> 6. Restore from the savepoint captured in step 3 above.
>
> I can't seem to find a Java API for restoring a job from a savepoint.  The
> approach in the documentation and other resources is to use the CLI, which
> is not an option for me.  Currently, I create a RemoteStreamEnvironment
> with savepointRestoreSettings set, but when I run execute(), I get the
> following error:
>
> java.lang.IllegalStateException: No operators defined in streaming
> topology. Cannot execute.
>
> var savepointDir =
>     restClusterClient_.triggerSavepoint(jobId, tmpdir).get();
> assertTrue(!savepointDir.isBlank());
> // Cancel the job and launch a new one from the save point.
> restClusterClient_.cancel(jobId).get();
> var restoreSettings = SavepointRestoreSettings.forPath(savepointDir);
> var env = new RemoteStreamEnvironment(
>     flinkMiniCluster_.host(),
>     flinkMiniCluster_.port(),
>     null,
>     new String[] {},
>     null,
>     restoreSettings);
> var restoredJob = env.executeAsync();
>
>
> Separately, is there a flink testing utility I could use for integration
> testing of state checkpointing and recovery?
>
> Thanks,
> Abhishek
>

Reply via email to