Hi,
using DataStreamUtils.collect() in a test is difficult due to
synchronization problems, as you discovered yourself.

What I propose is to write a custom Sink that collects data and verifies
the results. Verification should both happen in the invoke() method and in
close(). For the sink, you should set the parallelism to 1 to ensure that
all data goes to one sink.

Another option is to use https://github.com/ottogroup/flink-spector which
provides good ways of specifying expected outputs. Maybe Alex has something
else to say about it, I'm looping him hin.

Cheers,
Aljoscha

On Fri, 27 May 2016 at 09:33 Hironori Ogibayashi <ogibaya...@gmail.com>
wrote:

> Hello,
>
> I would like to write a test code for my Flink job.
> Looking at flink-examples, I thought the way will be:
> - Create test class which extends StreamingMultipleProgramsTestBase
> - In each method, just write streaming job as usual, but use
> collection data source and iterator sink
> - Use TestBaseUtils.compareResultXX method to check the result.
>
> Here is the actual code I wrote.
>
> ---
> class SampleTestCase extends StreamingMultipleProgramsTestBase {
>
>   @Test
>   def testCase1(): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val stream = env.fromElements("aaa","bbb","aaa")
>       .map{ x => (x,1)}
>       .keyBy(0)
>       .timeWindow(Time.seconds(1))
>       .sum(1)
>
>     env.execute()
>
>     val result = DataStreamUtils.collect(stream.javaStream)
>
> TestBaseUtils.compareResultAsText(Lists.newArrayList(result),"(aaa,2)\n(bbb,1)")
>   }
> }
> ---
>
> But when I ran the test. I got this error:
>
> java.lang.AssertionError: Wrong number of elements result expected:<2>
> but was:<0>
>
> It looks like test finishes before the end of the timeWindow, but I do
> not know how to fix it.
> Any advise would be appreciated.
>
> Thanks,
> Hironori Ogibayashi
>

Reply via email to