Hello,
I would like to write a test code for my Flink job.
Looking at flink-examples, I thought the way will be:
- Create test class which extends StreamingMultipleProgramsTestBase
- In each method, just write streaming job as usual, but use
collection data source and iterator sink
- Use TestBaseUtils.compareResultXX method to check the result.
Here is the actual code I wrote.
---
class SampleTestCase extends StreamingMultipleProgramsTestBase {
@Test
def testCase1(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromElements("aaa","bbb","aaa")
.map{ x => (x,1)}
.keyBy(0)
.timeWindow(Time.seconds(1))
.sum(1)
env.execute()
val result = DataStreamUtils.collect(stream.javaStream)
TestBaseUtils.compareResultAsText(Lists.newArrayList(result),"(aaa,2)\n(bbb,1)")
}
}
---
But when I ran the test. I got this error:
java.lang.AssertionError: Wrong number of elements result expected:<2>
but was:<0>
It looks like test finishes before the end of the timeWindow, but I do
not know how to fix it.
Any advise would be appreciated.
Thanks,
Hironori Ogibayashi