Hi Min, 

In order to run "clean" integration tests with Kafka, I setup a JUnit rule for 
buidling up kafka (as mentioned by konstantin), but I also use my own 
KafkaDeserializer (By extending from my custom deserializer for the project) 
like so 

public class TestDeserializer extends MyKafkaDeserializer<MyBean> { 

public final static String END_APP_MARKER = "END_APP_MARKER" ; 

public TestProxylogDeserializer() { 
super ( new MyParser(), new PlainMsgContentExtractor()); 
} 

@Override 
public boolean isEndOfStream(ParseResult<MyBean> nextElement) { 
// Succeeded message 
if (nextElement.getParseError() == null ) 
return false ; 

// Not parseable message raw data: 
if ( END_APP_MARKER .equals(nextElement.getParseError().getRawData())) 
return true ; 

return false ; 
} 
} 

In my tests, I make sure to send a "END_APP_MARKER" message to Kafka. This way, 
the pipeline will shut down by itself nicely once the end marker is sent. 

Best regards 
Theo 


Von: "Konstantin Knauf" <konstan...@ververica.com> 
An: "min tan" <min....@ubs.com> 
CC: "user" <user@flink.apache.org> 
Gesendet: Donnerstag, 13. Juni 2019 12:55:05 
Betreff: Re: Flink end to end intergration test 

Hi Min, 
I recently published a small repository [1] containing examples of how to test 
Flink applications on different levels of the testing pyramid. It also contains 
one integration test, which spins up an embedded Flink cluster [2]. In contrast 
to your requirements this test uses dedicated testing sources/sinks. To include 
your Kafka sources/sinks in the test, I suggest you combine this with a JUnit 
Rule for Kafka (e.g. [3]). In this case your sources are not finite, so you 
will need to submit your job from a separate thread and terminate it manually. 

Cheers, 

Konstantin 

[1] [ https://github.com/knaufk/flink-testing-pyramid | 
https://github.com/knaufk/flink-testing-pyramid ] 
[2] [ 
https://github.com/knaufk/flink-testing-pyramid/blob/master/src/test/java/com/github/knaufk/testing/java/StreamingJobIntegrationTest.java
 | 
https://github.com/knaufk/flink-testing-pyramid/blob/master/src/test/java/com/github/knaufk/testing/java/StreamingJobIntegrationTest.java
 ] 
[3] [ https://github.com/charithe/kafka-junit | 
https://github.com/charithe/kafka-junit ] 



On Thu, Jun 13, 2019 at 10:24 AM < [ mailto:min....@ubs.com | min....@ubs.com ] 
> wrote: 





Hi, 



I am new to Flink, at least to the testing part. 



We need an end to end integration test for a flink job. 



Where can I find documentation for this? 



I am envisaging a test similar to that: 

1) Start a local job instance in an IDE or maven test 

2) Fire event jsons to the data source (i.e. a Kafka topic) 

3) Retrieve result jsons from the data sink (i.e. a Kafka topic or an elastic 
search index) 

4) Compared result jsons with the expected ones 



Since our Flink job is a streaming one, how can we tear the Flink job instance 
running in an IDE? 



Regards, 



Min 







-- 


Konstantin Knauf | Solutions Architect 

+49 160 91394525 




Planned Absences: 20. - 21.06.2019, 10.08.2019 - 31.08.2019, 05.09. - 
06.09.2010 





-- 


Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany 

-- Data Artisans GmbH 
Registered at Amtsgericht Charlottenburg: HRB 158244 B 
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen 


-- 
SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln 
Theo Diefenthal 

T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575 
theo.diefent...@scoop-software.de - www.scoop-software.de 
Sitz der Gesellschaft: Köln, Handelsregister: Köln, 
Handelsregisternummer: HRB 36625 
Geschäftsführung: Dr. Oleg Balovnev, Frank Heinen, 
Martin Müller-Rohde, Dr. Wolfgang Reddig, Roland Scheel 

Reply via email to