Hey, I have a question about using metrics based on filtered data. Basically, I have handlers for many types of events I get from my data source (in my case, Kafka), and every handler has it's own filter function. That given handler also has a Counter, incrementing every time it filters out an event (as part of the FilterFunction).
Problem arrises when I use that FilterFunction on the DataSourceStream - the handler's open() function hasn't been called and thus the metrics have never been initiated. Do I have a way of making this work? Or any other way of counting events that have been filtered out from the DataStreamSource? Handler: public abstract class Handler extends RichMapFunction<Event, String> { private transient Counter filteredCounter; private boolean isInit = false; @Override public void open(Configuration parameters) throws Exception { if (!isInit) { MetricGroup metricGroup = getRuntimeContext().getMetricGroup().addGroup(getClass().getSimpleName()); filteredCounter = metricGroup.counter(CustomMetricsManager.getFilteredSuffix()); isInit = true; } } public final FilterFunction getFilter() { return (FilterFunction<Event>) event -> { boolean res = filter(event); if (!res) { filteredCounter.inc(); } return res; }; } abstract protected boolean filter(Event event); } And when I init the DataStreamSource: Handler handler = (Handler) Class.forName(handlerName).getConstructor().newInstance(); dataStream = dataStreamSource.filter(handler.getFilter()).map(handler); Any help would be much appreciated! Thanks 🙂