You are right with everything you say!
The solution you propose is actually what I'm trying to avoid. I'd prefer not 
to consume messages I don't plan on actually handling.
But from what you say it sounds I have no other choice. Am I right? I MUST 
consume the messages, count those I want to filter out and then simply not 
handle them?
Which means I must filter them in the task itself and I have no way of 
filtering them directly from the data source?



Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]


________________________________
From: vino yang <yanghua1...@gmail.com>
Sent: Monday, December 16, 2019 7:56 AM
To: Sidney Feiner <sidney.fei...@startapp.com>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: Fw: Metrics based on data filtered from DataStreamSource

Hi Sidney,

Firstly, the `open` method of UDF's instance is always invoked when the task 
thread starts to run.

From the second code snippet image that you provided, I guess you are trying to 
get a dynamic handler with reflection technology, is that correct? I also guess 
that you want to get a dynamic instance of a handler in the runtime, correct me 
if I am wrong.

IMO, you may misunderstand the program you write and the runtime of Task, the 
purpose of your program is used to build the job graph. The business logic in 
UDF is used to describe the user's business logic.

For your scene, if many types of events exist in one topic, you can consume 
them and group by the type then count them?

Best,
Vino

Sidney Feiner <sidney.fei...@startapp.com<mailto:sidney.fei...@startapp.com>> 
于2019年12月16日周一 上午12:54写道:
Hey,
I have a question about using metrics based on filtered data.
Basically, I have handlers for many types of events I get from my data source 
(in my case, Kafka), and every handler has it's own filter function.
That given handler also has a Counter, incrementing every time it filters out 
an event (as part of the FilterFunction).

Problem arrises when I use that FilterFunction on the DataSourceStream - the 
handler's open() function hasn't been called and thus the metrics have never 
been initiated.
Do I have a way of making this work? Or any other way of counting events that 
have been filtered out from the DataStreamSource?

Handler:

public abstract class Handler extends RichMapFunction<Event, String> {
    private transient Counter filteredCounter;
    private boolean isInit = false;

    @Override
    public void open(Configuration parameters) throws Exception {
        if (!isInit) {
            MetricGroup metricGroup = 
getRuntimeContext().getMetricGroup().addGroup(getClass().getSimpleName());
            filteredCounter = 
metricGroup.counter(CustomMetricsManager.getFilteredSuffix());
            isInit = true;
        }
    }

    public final FilterFunction getFilter() {
        return (FilterFunction<Event>) event -> {
            boolean res = filter(event);
            if (!res) {
                filteredCounter.inc();
            }
            return res;
        };
    }

    abstract protected boolean filter(Event event);
}

And when I init the DataStreamSource:

Handler handler = (Handler) 
Class.forName(handlerName).getConstructor().newInstance();
dataStream = dataStreamSource.filter(handler.getFilter()).map(handler);

Any help would be much appreciated!

Thanks 🙂



Reply via email to