Hi Abhishek, 

I did the same like you and tested my job with a parquet StreamingFileSink via 
a snaphot. (And run afterwards a small spark job on the parquet asserting that 
my flink output is correct) 

Good news for you is that it is easily possible to stop the job with a 
savepoint. You are already on the right tack. 

After you build your MiniCluster like so: 
private MiniClusterWithClientResource buildTestMiniCluster ( Configuration 
flinkClusterConfig) throws Exception { 
MiniClusterWithClientResource flinkTestCluster = new 
MiniClusterWithClientResource( 
new MiniClusterResourceConfiguration .Builder() 
.setConfiguration(flinkClusterConfig) 
.setNumberTaskManagers( 1 ) 
.setNumberSlotsPerTaskManager( 1 ) 
.build()); 
flinkTestCluster .before(); 
return flinkTestCluster ; 
} 
you can receive a detached cluster client from it: 
private ClusterClient <?> getDetachedClusterClient ( 
MiniClusterWithClientResource flinkTestCluster) { 
ClusterClient <?> flinkClient = flinkTestCluster.getClusterClient(); 
flinkClient .setDetached( true ); 
return flinkClient ; 
} 

Now, in your test, you build the jobgraph, and submit the job: 
JobGraph jobGraph = env .getStreamGraph().getJobGraph(); 
JobSubmissionResult submissionResult = flinkClient .submitJob( jobGraph , 
getClass().getClassLoader()); 

Afterwards, you can easily stop the job with a savepoint via that client: 
flinkClient .stopWithSavepoint( submissionResult .getJobID(), false , 
savepointDir .toURI().toString()); 

In my case, I store the checkpoint in an ignored JUnit TemporaryFolder, because 
I only care about the written parquetfile, not the savepoint itself. 

That's all nice. The not-so-nice-part is that you don't easily know when the 
job actually processed all elements from your job and you can trigger stopping 
the pipeline with the savepoint. For this purpose, I use in each of my 
integration-tests-with-savepoint a public static Semaphore. As all my jobs read 
from a kafka source (using com.salesforce.kafka.test.junit4. 
SharedKafkaTestResource) , I have a custom KafkaDeserializationSchema extending 
the default one from my job and implement isEndOfStream: 

@Override 
public boolean isEndOfStream ( V nextElement) { 
if ( "EOF" .equals(nextElement.getId())) { 
LOCK .release(); 
} 
return false ; 
} 
Finally, I finish writing the testdata in my test setup with a marker message 
"EOF". When that is received, I release the semaphore and the unittest thread 
executes the flinkClient.stopWithSavepoint line. This works because the 
minicluster runs in the same JVM and as the savepoint/checkpoint marker will 
flow through the pipeline and can't overtake my prior messages anymore, so all 
data will be written and I can run my assertions after the "stopWithSavepoint" 
line of code as this runs synchronously. 

Hope that helps. 

Best regards 
Theo 




Von: "David Anderson" <da...@alpinegizmo.com> 
An: "Abhishek Rai" <abhis...@netspring.io> 
CC: "user" <user@flink.apache.org> 
Gesendet: Freitag, 12. Juni 2020 20:21:07 
Betreff: Re: Restore from savepoint through Java API 

You can study LocalStreamingFileSinkTest [1] for an example of how to approach 
this. You can use the test harnesses [2], keeping in mind that 
- initializeState is called during instance creation 
- the provided context indicates if state is being restored from a snapshot 
- snapshot is called when taking a checkpoint 
- notifyOfCompletedCheckpoint is called when a checkpoint is complete 

The outline of such a test might follow this pattern: 

testHarness1.setup(); 
testHarness1.initializeState(initState); 
testHarness1.open(); 

// setup state to checkpoint ... 

// capture snapshot 
snapshot = testHarness.snapshot(checkpointId, timestamp); 

// process more data, the effects of which will be lost ... 

// create a new test harness initialized with the state from the snapshot 
testHarness2.setup(); 
testHarness2.initializeState(snapshot); 
testHarness2.open(); 

// verify the state ... 

David 

[1] [ 
https://github.com/apache/flink/blob/release-1.10/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
 | 
https://github.com/apache/flink/blob/release-1.10/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
 ] 
[2] [ 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators
 | 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators
 ] 

On Thu, Jun 11, 2020 at 12:12 PM Abhishek Rai < [ mailto:abhis...@netspring.io 
| abhis...@netspring.io ] > wrote: 



Hello, 
I'm writing a test for my custom sink function. The function is stateful and 
relies on checkpoint restores for maintaining consistency with the external 
system that it's writing to. For integration testing of the sink function, I 
have a MiniCluster based environment inside a single JVM through which I create 
my job and validate its operation. 

In order to test the checkpoint restore behavior with precision, I've disabled 
checkpointing and am instead using savepoints. So, my test proceeds as follows: 

1. Start a job. 
2. Push some data through it to the sink and to an external system. 
3. Trigger a savepoint. 
4. Push more data. 
5. Cancel the job. 
6. Restore from the savepoint captured in step 3 above. 

I can't seem to find a Java API for restoring a job from a savepoint. The 
approach in the documentation and other resources is to use the CLI, which is 
not an option for me. Currently, I create a RemoteStreamEnvironment with 
savepointRestoreSettings set, but when I run execute(), I get the following 
error: 

java.lang.IllegalStateException: No operators defined in streaming topology. 
Cannot execute. 

var savepointDir = 
restClusterClient_ .triggerSavepoint(jobId , tmpdir).get() ; 
assertTrue (!savepointDir.isBlank()) ; 
// Cancel the job and launch a new one from the save point. 
restClusterClient_ .cancel(jobId).get() ; 
var restoreSettings = SavepointRestoreSettings. forPath (savepointDir) ; 
var env = new RemoteStreamEnvironment( 
flinkMiniCluster_ .host() , 
flinkMiniCluster_ .port() , 
null, 
new String[] {} , 
null, 
restoreSettings) ; 
var restoredJob = env.executeAsync() ; 

Separately, is there a flink testing utility I could use for integration 
testing of state checkpointing and recovery? 

Thanks, 
Abhishek 




Reply via email to