Hi, using DataStreamUtils.collect() in a test is difficult due to synchronization problems, as you discovered yourself.
What I propose is to write a custom Sink that collects data and verifies the results. Verification should both happen in the invoke() method and in close(). For the sink, you should set the parallelism to 1 to ensure that all data goes to one sink. Another option is to use https://github.com/ottogroup/flink-spector which provides good ways of specifying expected outputs. Maybe Alex has something else to say about it, I'm looping him hin. Cheers, Aljoscha On Fri, 27 May 2016 at 09:33 Hironori Ogibayashi <ogibaya...@gmail.com> wrote: > Hello, > > I would like to write a test code for my Flink job. > Looking at flink-examples, I thought the way will be: > - Create test class which extends StreamingMultipleProgramsTestBase > - In each method, just write streaming job as usual, but use > collection data source and iterator sink > - Use TestBaseUtils.compareResultXX method to check the result. > > Here is the actual code I wrote. > > --- > class SampleTestCase extends StreamingMultipleProgramsTestBase { > > @Test > def testCase1(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val stream = env.fromElements("aaa","bbb","aaa") > .map{ x => (x,1)} > .keyBy(0) > .timeWindow(Time.seconds(1)) > .sum(1) > > env.execute() > > val result = DataStreamUtils.collect(stream.javaStream) > > TestBaseUtils.compareResultAsText(Lists.newArrayList(result),"(aaa,2)\n(bbb,1)") > } > } > --- > > But when I ran the test. I got this error: > > java.lang.AssertionError: Wrong number of elements result expected:<2> > but was:<0> > > It looks like test finishes before the end of the timeWindow, but I do > not know how to fix it. > Any advise would be appreciated. > > Thanks, > Hironori Ogibayashi >