Hi, If you look at the example from my own project you'll see that this is not a problem (if you test it like this).
In some rare testing cases you may run into this problem and for those: have a look at what I did a few weeks ago for testing the PubSub connector: https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedFullTopologyTest.java#L146 Here I'm using the fact that the DeserializationSchema interface has a method isEndOfStream that can be used to terminate a stream. Niels On Mon, Aug 3, 2020 at 5:24 PM Vijayendra Yadav <contact....@gmail.com> wrote: > Thank you Arvid, David and Niels for your valuable inputs. One last > Question: How do I terminate the flink streaming execution environment > after the integration test is completed? > > Regards > Vijay > > On Sun, Aug 2, 2020 at 12:27 PM David Anderson <da...@alpinegizmo.com> > wrote: > >> Vijay, >> >> There's a section of the docs that describes some strategies for writing >> tests of various types, and it includes some Scala examples [1]. >> >> There are also some nice examples from Konstantin Knauf in [2], though >> they are mostly in Java. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html >> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/testing.html> >> [2] https://github.com/knaufk/flink-testing-pyramid >> >> Best, >> David >> >> On Sun, Aug 2, 2020 at 12:14 PM Arvid Heise <ar...@ververica.com> wrote: >> >>> Hi Vijay, >>> >>> Any unit test of Flink operators is actually an IT case as it involves a >>> large portion of the stack. A real unit test, would be over a factored out >>> logic class. >>> >>> Similar to Niels, I'd recommend to use simple sources (env.fromElements) >>> and sinks to inject the data and retrieve the data and put the logic under >>> test in the middle. That may be a part of your pipeline or even the whole >>> pipeline. >>> >>> If you want to have some scala inspiration, have a look at: >>> >>> https://github.com/apache/flink/blob/5f0183fe79d10ac36101f60f2589062a39630f96/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala#L56-L82 >>> . It's on table API but should be quite easy to translate to datastream API >>> if needed. >>> >>> On Sat, Aug 1, 2020 at 4:03 PM Niels Basjes <ni...@basjes.nl> wrote: >>> >>>> No, I only have Java. >>>> >>>> On Fri, 31 Jul 2020, 21:57 Vijayendra Yadav, <contact....@gmail.com> >>>> wrote: >>>> >>>>> Thank You Niels. Would you have something for the scala object class. >>>>> Say for example if I want to implement a unit test ( not integration test) >>>>> for below code or similar : >>>>> >>>>> >>>>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala >>>>> >>>>> Regards, >>>>> Vijay >>>>> >>>>> On Fri, Jul 31, 2020 at 12:22 PM Niels Basjes <ni...@basjes.nl> wrote: >>>>> >>>>>> Does this test in one of my own projects do what you are looking for? >>>>>> >>>>>> >>>>>> https://github.com/nielsbasjes/yauaa/blob/1e1ceb85c507134614186e3e60952112a2daabff/udfs/flink/src/test/java/nl/basjes/parse/useragent/flink/TestUserAgentAnalysisMapperClass.java#L107 >>>>>> >>>>>> >>>>>> On Fri, 31 Jul 2020, 20:20 Vijayendra Yadav, <contact....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Team, >>>>>>> >>>>>>> Looking for some help and reference code / material to implement >>>>>>> unit tests of possible scenarios in Flink *streaming *Code that >>>>>>> should assert specific cases. >>>>>>> >>>>>>> Regards, >>>>>>> Vijay >>>>>>> >>>>>> >>> >>> -- >>> >>> Arvid Heise | Senior Java Developer >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Toni) Cheng >>> >> -- Best regards / Met vriendelijke groeten, Niels Basjes