Hey Sidney,

for the .filter() function, you are passing a function without an open()
method. The function that getFilter() returns, has no open() method.
What could work is creating a

Handler extends AbstractRichFunction implements MapFunction, FilterFunction

and passing those instances into the filter() and map() operator.


On Tue, Dec 17, 2019 at 3:18 AM vino yang <yanghua1...@gmail.com> wrote:

> Hi Sideny,
>
> >> I'd prefer not to consume messages I don't plan on actually handling.
>
> It depends on your design. If you produce different types into different
> partitions, then it's easy to filter different types from the Kafka
> consumer(only consume partial partition).
>
> If you do not distinguish different types in the partitions of the Kafka
> topic. You can filter messages based on type in Flink job.
>
> >> I MUST consume the messages, count those I want to filter out and then
> simply not handle them?
>
> I did not say "you MUST", I said "you can".
>
> Actually, there are serval solutions.
>
> e.g.
> 1) I described in the last mail;
> 2) filter in flink source;
> 3) filter via flink filter transform function
> 4) side output/split, selet
>
> Choosing one solution that suite your scene.
>
> The key thing in my last mail is to describe the problem of your
> reflection problem.
>
> Best,
> Vino
>
> Sidney Feiner <sidney.fei...@startapp.com> 于2019年12月16日周一 下午9:31写道:
>
>> You are right with everything you say!
>> The solution you propose is actually what I'm trying to avoid. I'd prefer
>> not to consume messages I don't plan on actually handling.
>> But from what you say it sounds I have no other choice. Am I right? I
>> MUST consume the messages, count those I want to filter out and then simply
>> not handle them?
>> Which means I must filter them in the task itself and I have no way of
>> filtering them directly from the data source?
>>
>>
>> *Sidney Feiner* */* Data Platform Developer
>> M: +972.528197720 */* Skype: sidney.feiner.startapp
>>
>> [image: emailsignature]
>>
>> ------------------------------
>> *From:* vino yang <yanghua1...@gmail.com>
>> *Sent:* Monday, December 16, 2019 7:56 AM
>> *To:* Sidney Feiner <sidney.fei...@startapp.com>
>> *Cc:* user@flink.apache.org <user@flink.apache.org>
>> *Subject:* Re: Fw: Metrics based on data filtered from DataStreamSource
>>
>> Hi Sidney,
>>
>> Firstly, the `open` method of UDF's instance is always invoked when the
>> task thread starts to run.
>>
>> From the second code snippet image that you provided, I guess you are
>> trying to get a dynamic handler with reflection technology, is
>> that correct? I also guess that you want to get a dynamic instance of a
>> handler in the runtime, correct me if I am wrong.
>>
>> IMO, you may misunderstand the program you write and the runtime of Task,
>> the purpose of your program is used to build the job graph. The business
>> logic in UDF is used to describe the user's business logic.
>>
>> For your scene, if many types of events exist in one topic, you can
>> consume them and group by the type then count them?
>>
>> Best,
>> Vino
>>
>> Sidney Feiner <sidney.fei...@startapp.com> 于2019年12月16日周一 上午12:54写道:
>>
>> Hey,
>> I have a question about using metrics based on filtered data.
>> Basically, I have handlers for many types of events I get from my data
>> source (in my case, Kafka), and every handler has it's own filter function.
>> That given handler also has a Counter, incrementing every time it filters
>> out an event (as part of the FilterFunction).
>>
>> Problem arrises when I use that FilterFunction on the DataSourceStream -
>> the handler's open() function hasn't been called and thus the metrics have
>> never been initiated.
>> Do I have a way of making this work? Or any other way of counting events
>> that have been filtered out from the DataStreamSource?
>>
>> Handler:
>>
>> public abstract class Handler extends RichMapFunction<Event, String> {
>>     private transient Counter filteredCounter;
>>     private boolean isInit = false;
>>
>>     @Override
>>     public void open(Configuration parameters) throws Exception {
>>         if (!isInit) {
>>             MetricGroup metricGroup = 
>> getRuntimeContext().getMetricGroup().addGroup(getClass().getSimpleName());
>>             filteredCounter = 
>> metricGroup.counter(CustomMetricsManager.getFilteredSuffix());
>>             isInit = true;
>>         }
>>     }
>>
>>     public final FilterFunction getFilter() {
>>         return (FilterFunction<Event>) event -> {
>>             boolean res = filter(event);
>>             if (!res) {
>>                 filteredCounter.inc();
>>             }
>>             return res;
>>         };
>>     }
>>
>>     abstract protected boolean filter(Event event);
>> }
>>
>>
>> And when I init the DataStreamSource:
>>
>> Handler handler = (Handler) 
>> Class.forName(handlerName).getConstructor().newInstance();
>> dataStream = dataStreamSource.filter(handler.getFilter()).map(handler);
>>
>>
>> Any help would be much appreciated!
>>
>> Thanks 🙂
>>
>>
>>
>>

Reply via email to