Looking at the program on Pastebin, there are some things that look not right. I would be surprised if this program executes at all.
In particular, you are referring to outside distributed data sets inside the filter function. You are calling collect() in every filter function, which actually triggers the program execution (every time the filter function is invoked!) To make this work, you need to pull the collect call out of the filter function. Also, consider using a join, if you want to do an intersection of data sets (or contains-check). Broadcast variables are also available for filter functions. Stephan On Tue, Jun 16, 2015 at 4:28 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi, > > which version of Flink are you working with? > The master (0.9-SNAPSHOT) has a RichFilterFunction [1]. > > Best, Fabian > > [1] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFilterFunction.java > > 2015-06-16 23:52 GMT+02:00 Vinh June <hoangthevinh....@gmail.com>: > >> Hello, >> >> How do you pass a parameter to a filter function? With Map, Join, I can >> use >> withBroadcastSet to pass to RichMapFunction or RichJoinFunction, but with >> filter, how can I pass it ? >> >> I would like to pass the variable to be able to use as in line 60 here >> http://pastebin.com/cFZVCLGZ >> >> Thanks >> >> >> >> -- >> View this message in context: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/passing-variable-to-filter-function-tp1666.html >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive at Nabble.com. >> > >