Hi Tovi, testing the behavior of a data flow with respect to the order of records from different sources is tricky. Source functions are working independently of each other and it is not easily possible to control the order in which records is shipped (and received) across source functions.
You could implement source functions that only emit records (and possibly watermarks) when being triggered from your test code. Having two of these sources in a program, you could choose in which order the sources emit records and watermarks. However, you would need to ensure that a record is completely processed (the program came to a hold), before you emit the next record (from the same or another source) to avoid race conditions. You could do this with timeouts, but this is very fragile and I would not recommend it. Btw. watermarks have no effect on the order in which records are processed. They only determine the event-time of an operator. In case of a co-operator, this is the the smaller watermark time of both input streams. So, the input records of a co-operator are not aligned or hold back based on their timestamps or watermarks. Instead, an operator can put "early" records into state and process them when a later watermark arrives, which means that all relevant records from both inputs have been received. I hope this helps, Fabian 2017-12-10 10:04 GMT+01:00 Sofer, Tovi <tovi.so...@citi.com>: > Hi Kostas, > > > > Thank you for the suggestion. > > But in our case we want to do either a component test that involves > several steps, where the CoFlatMap is one step in the middle, or > integration test that test the whole flow, which involves also the > CoFlatMap. > > And we trying to understand how to test such scenario so that results are > predictable, and that elements from main stream arrive after elements from > control stream, or other way around. > > > > Thanks again, > > Tovi > > > > *From:* Kostas Kloudas [mailto:k.klou...@data-artisans.com] > *Sent:* יום ה 07 דצמבר 2017 19:11 > *To:* Sofer, Tovi [ICG-IT] <ts72...@imceu.eu.ssmb.com> > *Cc:* user@flink.apache.org > *Subject:* Re: Testing CoFlatMap correctness > > > > Hi Tovi, > > > > What you need is the TwoInputStreamOperatorTestHarness. This will allow > you to do something like: > > > > TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = > new TwoInputStreamOperatorTestHarness<>(myoperator); > > testHarness.setup(); > testHarness.open(); > > testHarness.processWatermark1(new Watermark(17)); > testHarness.processWatermark2(new Watermark(17)); > testHarness.processElement1(new StreamRecord<>(5, 12L)); > > testHarness.processWatermark1(new Watermark(42)); > testHarness.processWatermark2(new Watermark(42)); > testHarness.processElement2(new StreamRecord<>("6", 13L)); > > > > and then use testHarness.getOutput() to get the output and compare it > against the expected one. > > > > If you have access to the Flink source code, I would recommend you to have > a look at the CoProcessOperatorTest for an example. > > > > Or you can find it here: https://github.com/apache/flink/blob/master/ > flink-streaming-java/src/test/java/org/apache/flink/ > streaming/api/operators/co/CoProcessOperatorTest.java > <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_blob_master_flink-2Dstreaming-2Djava_src_test_java_org_apache_flink_streaming_api_operators_co_CoProcessOperatorTest.java&d=DwMFaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=bfLStYBPfgr58eRbGoW11gp3x4kr3rJ99_MiSMX5oOs&m=gBef8R0NU-syKQC30s15-0u2EacsQc1Nc_-YiEJOKu8&s=JMo6NemjvMcOawmPTAuffrC8WfvZZppabhaJ8o5IJdY&e=> > > > > Hope this helps, > > Kostas > > > > > > On Dec 7, 2017, at 5:54 PM, Sofer, Tovi <tovi.so...@citi.com> wrote: > > > > Hi group, > > > > What is the best practice for testing CoFlatMap operator correctness? > > We have two source functions, each emits data to stream, and a connect > between them, and I want to make sure that when streamA element arrive > before stream element, a certain behavior happens. > > How can I test this case? > > Using env.setStreamTimeCharacteristic(TimeCharacteristic.*EventTime*); > > and emitting timestamp and watermark per element didn’t help, and still each > element arrive in unexpected order. > > > > Thanks in advance, > > Tovi > > >