I have the following test. the problem is it doesn't end ... meaning it
doesn't reach the assertion point. What am I doing wrong?

"kinesis consumer" should "consume message from kinesis stream" in {
    import ExecutionContext.Implicits.global
    val sampleData = Seq("a", "b", "c")
    val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
    env.addSource(new FlinkKinesisConsumer[String](
     "SampleStream", new SimpleStringSchema, consumerConfig))
    .addSink(new TestSink[String])

Future(createSampleDataStream(sampleData)) //publish to kinesis stream
env.execute()
TestSink.values should contain theSameElementsAs (sampleData) //not executed
 }

Reply via email to