You can study LocalStreamingFileSinkTest [1] for an example of how to approach this. You can use the test harnesses [2], keeping in mind that
- initializeState is called during instance creation - the provided context indicates if state is being restored from a snapshot - snapshot is called when taking a checkpoint - notifyOfCompletedCheckpoint is called when a checkpoint is complete The outline of such a test might follow this pattern: testHarness1.setup(); testHarness1.initializeState(initState); testHarness1.open(); // setup state to checkpoint ... // capture snapshot snapshot = testHarness.snapshot(checkpointId, timestamp); // process more data, the effects of which will be lost ... // create a new test harness initialized with the state from the snapshot testHarness2.setup(); testHarness2.initializeState(snapshot); testHarness2.open(); // verify the state ... David [1] https://github.com/apache/flink/blob/release-1.10/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators On Thu, Jun 11, 2020 at 12:12 PM Abhishek Rai <abhis...@netspring.io> wrote: > Hello, > > I'm writing a test for my custom sink function. The function is stateful > and relies on checkpoint restores for maintaining consistency with the > external system that it's writing to. For integration testing of the sink > function, I have a MiniCluster based environment inside a single JVM > through which I create my job and validate its operation. > > In order to test the checkpoint restore behavior with precision, I've > disabled checkpointing and am instead using savepoints. So, my test > proceeds as follows: > > 1. Start a job. > 2. Push some data through it to the sink and to an external system. > 3. Trigger a savepoint. > 4. Push more data. > 5. Cancel the job. > 6. Restore from the savepoint captured in step 3 above. > > I can't seem to find a Java API for restoring a job from a savepoint. The > approach in the documentation and other resources is to use the CLI, which > is not an option for me. Currently, I create a RemoteStreamEnvironment > with savepointRestoreSettings set, but when I run execute(), I get the > following error: > > java.lang.IllegalStateException: No operators defined in streaming > topology. Cannot execute. > > var savepointDir = > restClusterClient_.triggerSavepoint(jobId, tmpdir).get(); > assertTrue(!savepointDir.isBlank()); > // Cancel the job and launch a new one from the save point. > restClusterClient_.cancel(jobId).get(); > var restoreSettings = SavepointRestoreSettings.forPath(savepointDir); > var env = new RemoteStreamEnvironment( > flinkMiniCluster_.host(), > flinkMiniCluster_.port(), > null, > new String[] {}, > null, > restoreSettings); > var restoredJob = env.executeAsync(); > > > Separately, is there a flink testing utility I could use for integration > testing of state checkpointing and recovery? > > Thanks, > Abhishek >