I have the following test. the problem is it doesn't end ... meaning it doesn't reach the assertion point. What am I doing wrong?
"kinesis consumer" should "consume message from kinesis stream" in { import ExecutionContext.Implicits.global val sampleData = Seq("a", "b", "c") val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.addSource(new FlinkKinesisConsumer[String]( "SampleStream", new SimpleStringSchema, consumerConfig)) .addSink(new TestSink[String]) Future(createSampleDataStream(sampleData)) //publish to kinesis stream env.execute() TestSink.values should contain theSameElementsAs (sampleData) //not executed }