Hello, I would like to write a test code for my Flink job. Looking at flink-examples, I thought the way will be: - Create test class which extends StreamingMultipleProgramsTestBase - In each method, just write streaming job as usual, but use collection data source and iterator sink - Use TestBaseUtils.compareResultXX method to check the result.
Here is the actual code I wrote. --- class SampleTestCase extends StreamingMultipleProgramsTestBase { @Test def testCase1(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.fromElements("aaa","bbb","aaa") .map{ x => (x,1)} .keyBy(0) .timeWindow(Time.seconds(1)) .sum(1) env.execute() val result = DataStreamUtils.collect(stream.javaStream) TestBaseUtils.compareResultAsText(Lists.newArrayList(result),"(aaa,2)\n(bbb,1)") } } --- But when I ran the test. I got this error: java.lang.AssertionError: Wrong number of elements result expected:<2> but was:<0> It looks like test finishes before the end of the timeWindow, but I do not know how to fix it. Any advise would be appreciated. Thanks, Hironori Ogibayashi