Hi,

If you look at the example from my own project you'll see that this is not
a problem (if you test it like this).

In some rare testing cases you may run into this problem and for those:
have a look at what I did a few weeks ago for testing the PubSub connector:
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedFullTopologyTest.java#L146

Here I'm using the fact that the DeserializationSchema interface has a
method isEndOfStream that can be used to terminate a stream.

Niels

On Mon, Aug 3, 2020 at 5:24 PM Vijayendra Yadav <contact....@gmail.com>
wrote:

> Thank you Arvid, David and Niels for your valuable inputs. One last
> Question: How do I terminate the flink streaming execution environment
> after the integration test is completed?
>
> Regards
> Vijay
>
> On Sun, Aug 2, 2020 at 12:27 PM David Anderson <da...@alpinegizmo.com>
> wrote:
>
>> Vijay,
>>
>> There's a section of the docs that describes some strategies for writing
>> tests of various types, and it includes some Scala examples [1].
>>
>> There are also some nice examples from Konstantin Knauf in [2], though
>> they are mostly in Java.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/testing.html>
>> [2] https://github.com/knaufk/flink-testing-pyramid
>>
>> Best,
>> David
>>
>> On Sun, Aug 2, 2020 at 12:14 PM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Vijay,
>>>
>>> Any unit test of Flink operators is actually an IT case as it involves a
>>> large portion of the stack. A real unit test, would be over a factored out
>>> logic class.
>>>
>>> Similar to Niels, I'd recommend to use simple sources (env.fromElements)
>>> and sinks to inject the data and retrieve the data and put the logic under
>>> test in the middle. That may be a part of your pipeline or even the whole
>>> pipeline.
>>>
>>> If you want to have some scala inspiration, have a look at:
>>>
>>> https://github.com/apache/flink/blob/5f0183fe79d10ac36101f60f2589062a39630f96/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala#L56-L82
>>> . It's on table API but should be quite easy to translate to datastream API
>>> if needed.
>>>
>>> On Sat, Aug 1, 2020 at 4:03 PM Niels Basjes <ni...@basjes.nl> wrote:
>>>
>>>> No, I only have Java.
>>>>
>>>> On Fri, 31 Jul 2020, 21:57 Vijayendra Yadav, <contact....@gmail.com>
>>>> wrote:
>>>>
>>>>> Thank You Niels. Would you have something for the scala object class.
>>>>> Say for example if I want to implement a unit test ( not integration test)
>>>>> for below code or similar  :
>>>>>
>>>>>
>>>>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
>>>>>
>>>>> Regards,
>>>>> Vijay
>>>>>
>>>>> On Fri, Jul 31, 2020 at 12:22 PM Niels Basjes <ni...@basjes.nl> wrote:
>>>>>
>>>>>> Does this test in one of my own projects do what you are looking for?
>>>>>>
>>>>>>
>>>>>> https://github.com/nielsbasjes/yauaa/blob/1e1ceb85c507134614186e3e60952112a2daabff/udfs/flink/src/test/java/nl/basjes/parse/useragent/flink/TestUserAgentAnalysisMapperClass.java#L107
>>>>>>
>>>>>>
>>>>>> On Fri, 31 Jul 2020, 20:20 Vijayendra Yadav, <contact....@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Team,
>>>>>>>
>>>>>>> Looking for some help and reference code / material to implement
>>>>>>> unit tests of possible scenarios in Flink *streaming *Code that
>>>>>>> should assert specific cases.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Vijay
>>>>>>>
>>>>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to