Hi Sidney,

Firstly, the `open` method of UDF's instance is always invoked when the
task thread starts to run.

>From the second code snippet image that you provided, I guess you are
trying to get a dynamic handler with reflection technology, is
that correct? I also guess that you want to get a dynamic instance of a
handler in the runtime, correct me if I am wrong.

IMO, you may misunderstand the program you write and the runtime of Task,
the purpose of your program is used to build the job graph. The business
logic in UDF is used to describe the user's business logic.

For your scene, if many types of events exist in one topic, you can consume
them and group by the type then count them?

Best,
Vino

Sidney Feiner <sidney.fei...@startapp.com> 于2019年12月16日周一 上午12:54写道:

> Hey,
> I have a question about using metrics based on filtered data.
> Basically, I have handlers for many types of events I get from my data
> source (in my case, Kafka), and every handler has it's own filter function.
> That given handler also has a Counter, incrementing every time it filters
> out an event (as part of the FilterFunction).
>
> Problem arrises when I use that FilterFunction on the DataSourceStream -
> the handler's open() function hasn't been called and thus the metrics have
> never been initiated.
> Do I have a way of making this work? Or any other way of counting events
> that have been filtered out from the DataStreamSource?
>
> Handler:
>
> public abstract class Handler extends RichMapFunction<Event, String> {
>     private transient Counter filteredCounter;
>     private boolean isInit = false;
>
>     @Override
>     public void open(Configuration parameters) throws Exception {
>         if (!isInit) {
>             MetricGroup metricGroup = 
> getRuntimeContext().getMetricGroup().addGroup(getClass().getSimpleName());
>             filteredCounter = 
> metricGroup.counter(CustomMetricsManager.getFilteredSuffix());
>             isInit = true;
>         }
>     }
>
>     public final FilterFunction getFilter() {
>         return (FilterFunction<Event>) event -> {
>             boolean res = filter(event);
>             if (!res) {
>                 filteredCounter.inc();
>             }
>             return res;
>         };
>     }
>
>     abstract protected boolean filter(Event event);
> }
>
>
> And when I init the DataStreamSource:
>
> Handler handler = (Handler) 
> Class.forName(handlerName).getConstructor().newInstance();
> dataStream = dataStreamSource.filter(handler.getFilter()).map(handler);
>
>
> Any help would be much appreciated!
>
> Thanks 🙂
>
>
>
>

Reply via email to