The reason why the gauge value is not updating is because you are not
actually updating the gauge,
but register a new gauge under the same name. The subsequent
registration are ignored, and should've
logged a warning.
I suggest to make your gauge stateful by adding a field for the
opTimeInSec with a setter which you call
in addMetricData(...).
On 21.06.2017 11:48, sohimankotia wrote:
Here it is :
import com.codahale.metrics.SlidingWindowReservoir;
import in.dailyhunt.cis.enrichments.datatype.BasicInfoTuple;
import in.dailyhunt.cis.enrichments.datatype.SinkTuple;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.util.Collector;
public class DBFlatMap extends RichFlatMapFunction<BasicInfoTuple,
SinkTuple> {
private transient Meter meter;
private transient org.apache.flink.metrics.Histogram histogram;
@Override
public void open(Configuration parameters) throws Exception {
/*
some app specific code
*/
com.codahale.metrics.Meter meter1 = new
com.codahale.metrics.Meter();
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new
DropwizardMeterWrapper(meter1));
com.codahale.metrics.Histogram histogram1 =
new com.codahale.metrics.Histogram(new
SlidingWindowReservoir(500));
this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new
DropwizardHistogramWrapper(histogram1));
}
@Override
public void flatMap(BasicInfoTuple in, Collector<SinkTuple> out) throws
Exception {
long start = System.currentTimeMillis();
incrementCounter("input-in-group-and-KeyElect-Flow",
this.getRuntimeContext());
this.dbOperations();
addMetricData(start, System.currentTimeMillis());
this.meter.markEvent();
}
private void dbOperations() {
// Db Operation and some app logic
}
public void incrementCounter(String counterName, RuntimeContext
runtimeContext) {
if (runtimeContext == null) {
return;
}
LongCounter lc = runtimeContext.getLongCounter(counterName);
if (lc == null) {
lc = new LongCounter();
runtimeContext.addAccumulator(counterName, lc);
}
lc.add(1L);
}
private void addMetricData(long startTime, long endTime) {
final long opTimeInSec = (endTime - startTime) / 1000;
this.histogram.update(opTimeInSec);
getRuntimeContext().getMetricGroup()
.gauge("DbOpGauge", (Gauge<String>) () ->
String.valueOf(opTimeInSec));
}
}
--
View this message in context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Gauge-Metric-is-not-getting-updated-on-Job-Dashboard-tp13842p13888.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at
Nabble.com.