My guess would be that the consumer does not stop running once it
exhausted the kinesis stream. Which makes sense since this isn't a batch
job.
(Wouldn't want the source to shut down just because it happened to catch
up with your input ;) )
On 12/14/2020 8:09 AM, Avi Levi wrote:
I have the following test. the problem is it doesn't end ... meaning
it doesn't reach the assertion point. What am I doing wrong?
|"kinesis consumer" should "consume message from kinesis stream" in {
import ExecutionContext.Implicits.global val sampleData = Seq("a",
"b", "c") val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment env.addSource(new
FlinkKinesisConsumer[String]( "SampleStream", new SimpleStringSchema,
consumerConfig)) .addSink(new TestSink[String])
Future(createSampleDataStream(sampleData)) //publish to kinesis stream
env.execute() TestSink.values should contain theSameElementsAs
(sampleData) //not executed }|