Hi Manas, I would recommend using TestHarnesses for testing. You could also use them prior to 1.10. Here is an example of setting the dependencies: https://github.com/afedulov/fraud-detection-demo/blob/master/flink-job/build.gradle#L113
You can see some examples of tests for a demo application here: https://github.com/afedulov/fraud-detection-demo/blob/master/flink-job/src/test/java/com/ververica/field/dynamicrules/RulesEvaluatorTest.java Hope this helps. Best regards, -- Alexander Fedulov | Solutions Architect +49 1514 6265796 <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng On Mon, May 18, 2020 at 1:18 PM Manas Kale <manaskal...@gmail.com> wrote: > I see, I had not considered the serialization; that was the issue. > Thank you. > > On Mon, May 18, 2020 at 12:29 PM Chesnay Schepler <ches...@apache.org> > wrote: > >> We don't publish sources for test classes. >> >> Have you considered that the sink will be serialized on job submission, >> meaning that your myTestSink instance is not the one actually used by >> the job? This typically means that have to store stuff in a static field >> instead. >> Alternatively, depending on the number of elements >> org.apache.flink.streaming.api.datastream.DataStreamUtils#collect might >> be worth a try. >> >> On 15/05/2020 12:49, Manas Kale wrote: >> > Hi, >> > How do I test process functions? I tried by implementing a sink >> > function that stores myProcessFunction's output in a list. After >> > env.execute(), I use assertions. >> > If I set a breakpoint in the myTestSink's invoke() method, I see that >> > that method is being called correctly. However, after env.execute() >> > returns, all data in sink functions is wiped clean. >> > >> > TestSink myTestSink = new myTestSink(); >> > testStream.process(new myProcessFunction()).addSink(myTestSink); >> > env.execute("test"); >> > assertEquals(expectedOutput, myTestSink.actual); >> > >> > What am I doing wrong? >> > Also, I see that a ProcessFunctionTestHarnesses has been added in >> > 1.10. I wasn't able to download its sources to understand how I could >> > use that. Have the sources not been added to maven or is it a problem >> > at my end? >> > >> > Regards, >> > Manas >> >> >>