Hey Sidney, for the .filter() function, you are passing a function without an open() method. The function that getFilter() returns, has no open() method. What could work is creating a
Handler extends AbstractRichFunction implements MapFunction, FilterFunction and passing those instances into the filter() and map() operator. On Tue, Dec 17, 2019 at 3:18 AM vino yang <yanghua1...@gmail.com> wrote: > Hi Sideny, > > >> I'd prefer not to consume messages I don't plan on actually handling. > > It depends on your design. If you produce different types into different > partitions, then it's easy to filter different types from the Kafka > consumer(only consume partial partition). > > If you do not distinguish different types in the partitions of the Kafka > topic. You can filter messages based on type in Flink job. > > >> I MUST consume the messages, count those I want to filter out and then > simply not handle them? > > I did not say "you MUST", I said "you can". > > Actually, there are serval solutions. > > e.g. > 1) I described in the last mail; > 2) filter in flink source; > 3) filter via flink filter transform function > 4) side output/split, selet > > Choosing one solution that suite your scene. > > The key thing in my last mail is to describe the problem of your > reflection problem. > > Best, > Vino > > Sidney Feiner <sidney.fei...@startapp.com> 于2019年12月16日周一 下午9:31写道: > >> You are right with everything you say! >> The solution you propose is actually what I'm trying to avoid. I'd prefer >> not to consume messages I don't plan on actually handling. >> But from what you say it sounds I have no other choice. Am I right? I >> MUST consume the messages, count those I want to filter out and then simply >> not handle them? >> Which means I must filter them in the task itself and I have no way of >> filtering them directly from the data source? >> >> >> *Sidney Feiner* */* Data Platform Developer >> M: +972.528197720 */* Skype: sidney.feiner.startapp >> >> [image: emailsignature] >> >> ------------------------------ >> *From:* vino yang <yanghua1...@gmail.com> >> *Sent:* Monday, December 16, 2019 7:56 AM >> *To:* Sidney Feiner <sidney.fei...@startapp.com> >> *Cc:* user@flink.apache.org <user@flink.apache.org> >> *Subject:* Re: Fw: Metrics based on data filtered from DataStreamSource >> >> Hi Sidney, >> >> Firstly, the `open` method of UDF's instance is always invoked when the >> task thread starts to run. >> >> From the second code snippet image that you provided, I guess you are >> trying to get a dynamic handler with reflection technology, is >> that correct? I also guess that you want to get a dynamic instance of a >> handler in the runtime, correct me if I am wrong. >> >> IMO, you may misunderstand the program you write and the runtime of Task, >> the purpose of your program is used to build the job graph. The business >> logic in UDF is used to describe the user's business logic. >> >> For your scene, if many types of events exist in one topic, you can >> consume them and group by the type then count them? >> >> Best, >> Vino >> >> Sidney Feiner <sidney.fei...@startapp.com> 于2019年12月16日周一 上午12:54写道: >> >> Hey, >> I have a question about using metrics based on filtered data. >> Basically, I have handlers for many types of events I get from my data >> source (in my case, Kafka), and every handler has it's own filter function. >> That given handler also has a Counter, incrementing every time it filters >> out an event (as part of the FilterFunction). >> >> Problem arrises when I use that FilterFunction on the DataSourceStream - >> the handler's open() function hasn't been called and thus the metrics have >> never been initiated. >> Do I have a way of making this work? Or any other way of counting events >> that have been filtered out from the DataStreamSource? >> >> Handler: >> >> public abstract class Handler extends RichMapFunction<Event, String> { >> private transient Counter filteredCounter; >> private boolean isInit = false; >> >> @Override >> public void open(Configuration parameters) throws Exception { >> if (!isInit) { >> MetricGroup metricGroup = >> getRuntimeContext().getMetricGroup().addGroup(getClass().getSimpleName()); >> filteredCounter = >> metricGroup.counter(CustomMetricsManager.getFilteredSuffix()); >> isInit = true; >> } >> } >> >> public final FilterFunction getFilter() { >> return (FilterFunction<Event>) event -> { >> boolean res = filter(event); >> if (!res) { >> filteredCounter.inc(); >> } >> return res; >> }; >> } >> >> abstract protected boolean filter(Event event); >> } >> >> >> And when I init the DataStreamSource: >> >> Handler handler = (Handler) >> Class.forName(handlerName).getConstructor().newInstance(); >> dataStream = dataStreamSource.filter(handler.getFilter()).map(handler); >> >> >> Any help would be much appreciated! >> >> Thanks 🙂 >> >> >> >>