Hi Min,
In order to run "clean" integration tests with Kafka, I setup a JUnit rule for
buidling up kafka (as mentioned by konstantin), but I also use my own
KafkaDeserializer (By extending from my custom deserializer for the project)
like so
public class TestDeserializer extends MyKafkaDeserializer<MyBean> {
public final static String END_APP_MARKER = "END_APP_MARKER" ;
public TestProxylogDeserializer() {
super ( new MyParser(), new PlainMsgContentExtractor());
}
@Override
public boolean isEndOfStream(ParseResult<MyBean> nextElement) {
// Succeeded message
if (nextElement.getParseError() == null )
return false ;
// Not parseable message raw data:
if ( END_APP_MARKER .equals(nextElement.getParseError().getRawData()))
return true ;
return false ;
}
}
In my tests, I make sure to send a "END_APP_MARKER" message to Kafka. This way,
the pipeline will shut down by itself nicely once the end marker is sent.
Best regards
Theo
Von: "Konstantin Knauf" <[email protected]>
An: "min tan" <[email protected]>
CC: "user" <[email protected]>
Gesendet: Donnerstag, 13. Juni 2019 12:55:05
Betreff: Re: Flink end to end intergration test
Hi Min,
I recently published a small repository [1] containing examples of how to test
Flink applications on different levels of the testing pyramid. It also contains
one integration test, which spins up an embedded Flink cluster [2]. In contrast
to your requirements this test uses dedicated testing sources/sinks. To include
your Kafka sources/sinks in the test, I suggest you combine this with a JUnit
Rule for Kafka (e.g. [3]). In this case your sources are not finite, so you
will need to submit your job from a separate thread and terminate it manually.
Cheers,
Konstantin
[1] [ https://github.com/knaufk/flink-testing-pyramid |
https://github.com/knaufk/flink-testing-pyramid ]
[2] [
https://github.com/knaufk/flink-testing-pyramid/blob/master/src/test/java/com/github/knaufk/testing/java/StreamingJobIntegrationTest.java
|
https://github.com/knaufk/flink-testing-pyramid/blob/master/src/test/java/com/github/knaufk/testing/java/StreamingJobIntegrationTest.java
]
[3] [ https://github.com/charithe/kafka-junit |
https://github.com/charithe/kafka-junit ]
On Thu, Jun 13, 2019 at 10:24 AM < [ mailto:[email protected] | [email protected] ]
> wrote:
Hi,
I am new to Flink, at least to the testing part.
We need an end to end integration test for a flink job.
Where can I find documentation for this?
I am envisaging a test similar to that:
1) Start a local job instance in an IDE or maven test
2) Fire event jsons to the data source (i.e. a Kafka topic)
3) Retrieve result jsons from the data sink (i.e. a Kafka topic or an elastic
search index)
4) Compared result jsons with the expected ones
Since our Flink job is a streaming one, how can we tear the Flink job instance
running in an IDE?
Regards,
Min
--
Konstantin Knauf | Solutions Architect
+49 160 91394525
Planned Absences: 20. - 21.06.2019, 10.08.2019 - 31.08.2019, 05.09. -
06.09.2010
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
-- Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
--
SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln
Theo Diefenthal
T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575
[email protected] - www.scoop-software.de
Sitz der Gesellschaft: Köln, Handelsregister: Köln,
Handelsregisternummer: HRB 36625
Geschäftsführung: Dr. Oleg Balovnev, Frank Heinen,
Martin Müller-Rohde, Dr. Wolfgang Reddig, Roland Scheel