Hi Tovi, What you need is the TwoInputStreamOperatorTestHarness. This will allow you to do something like:
TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = new TwoInputStreamOperatorTestHarness<>(myoperator); testHarness.setup(); testHarness.open(); testHarness.processWatermark1(new Watermark(17)); testHarness.processWatermark2(new Watermark(17)); testHarness.processElement1(new StreamRecord<>(5, 12L)); testHarness.processWatermark1(new Watermark(42)); testHarness.processWatermark2(new Watermark(42)); testHarness.processElement2(new StreamRecord<>("6", 13L)); and then use testHarness.getOutput() to get the output and compare it against the expected one. If you have access to the Flink source code, I would recommend you to have a look at the CoProcessOperatorTest for an example. Or you can find it here: https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java <https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java> Hope this helps, Kostas > On Dec 7, 2017, at 5:54 PM, Sofer, Tovi <tovi.so...@citi.com> wrote: > > Hi group, > > > > What is the best practice for testing CoFlatMap operator correctness? > > We have two source functions, each emits data to stream, and a connect > between them, and I want to make sure that when streamA element arrive before > stream element, a certain behavior happens. > > How can I test this case? > > Using env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > and emitting timestamp and watermark per element didn’t help, and still each > element arrive in unexpected order. > > Thanks in advance, > Tovi