Looking at the program on Pastebin, there are some things that look not
right. I would be surprised if this program executes at all.

In particular, you are referring to outside distributed data sets inside
the filter function. You are calling collect() in every filter function,
which actually triggers the program execution (every time the filter
function is invoked!)

To make this work, you need to pull the collect call out of the filter
function.

Also, consider using a join, if you want to do an intersection of data sets
(or contains-check). Broadcast variables are also available for filter
functions.

Stephan


On Tue, Jun 16, 2015 at 4:28 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> which version of Flink are you working with?
> The master (0.9-SNAPSHOT) has a RichFilterFunction [1].
>
> Best, Fabian
>
> [1]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFilterFunction.java
>
> 2015-06-16 23:52 GMT+02:00 Vinh June <hoangthevinh....@gmail.com>:
>
>> Hello,
>>
>> How do you pass a parameter to a filter function? With Map, Join, I can
>> use
>> withBroadcastSet to pass to RichMapFunction or RichJoinFunction, but with
>> filter, how can I pass it ?
>>
>> I would like to pass the variable to be able to use as in line 60 here
>> http://pastebin.com/cFZVCLGZ
>>
>> Thanks
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/passing-variable-to-filter-function-tp1666.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>

Reply via email to