Hi Manas,

I would recommend using TestHarnesses for testing. You could also use them
prior to 1.10. Here is an example of setting the dependencies:
https://github.com/afedulov/fraud-detection-demo/blob/master/flink-job/build.gradle#L113

You can see some examples of tests for a demo application here:
https://github.com/afedulov/fraud-detection-demo/blob/master/flink-job/src/test/java/com/ververica/field/dynamicrules/RulesEvaluatorTest.java
Hope this helps.

Best regards,

--

Alexander Fedulov | Solutions Architect

+49 1514 6265796

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng



On Mon, May 18, 2020 at 1:18 PM Manas Kale <manaskal...@gmail.com> wrote:

> I see, I had not considered the serialization; that was the issue.
> Thank you.
>
> On Mon, May 18, 2020 at 12:29 PM Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> We don't publish sources for test classes.
>>
>> Have you considered that the sink will be serialized on job submission,
>> meaning that your myTestSink instance is not the one actually used by
>> the job? This typically means that have to store stuff in a static field
>> instead.
>> Alternatively, depending on the number of elements
>> org.apache.flink.streaming.api.datastream.DataStreamUtils#collect might
>> be worth a try.
>>
>> On 15/05/2020 12:49, Manas Kale wrote:
>> > Hi,
>> > How do I test process functions? I tried by implementing a sink
>> > function that stores myProcessFunction's output in a list. After
>> > env.execute(), I use assertions.
>> > If I set a breakpoint in the myTestSink's invoke() method, I see that
>> > that method is being called correctly. However, after env.execute()
>> > returns, all data in sink functions is wiped clean.
>> >
>> > TestSink myTestSink = new myTestSink();
>> > testStream.process(new myProcessFunction()).addSink(myTestSink);
>> > env.execute("test");
>> > assertEquals(expectedOutput, myTestSink.actual);
>> >
>> > What am I doing wrong?
>> >  Also, I see that a ProcessFunctionTestHarnesses has been added in
>> > 1.10. I wasn't able to download its sources to understand how I could
>> > use that. Have the sources not been added to maven or is it a problem
>> > at my end?
>> >
>> > Regards,
>> > Manas
>>
>>
>>

Reply via email to