[ https://issues.apache.org/jira/browse/FLINK-4544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15608808#comment-15608808 ]
ASF GitHub Bot commented on FLINK-4544: --------------------------------------- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2445 The approach looks good, but I think it would be good to adjust a few things: - We are migrating most of the Scala code in `flink-runtime` to Java (for example in FLIP-6), so would be good if this new code was also Java - Some of the Gauges use some form of JMX querying (`getAttribute(ObjectName)`) that looks more expensive then directly accessing a bean (calling a method) - Some metrics may not be available on all JVMs (like mapped direct memory). Can this make sure we don't log an exception on each access to the Gauge, when the JMX query fails? > TaskManager metrics are vulnerable to custom JMX bean installation > ------------------------------------------------------------------ > > Key: FLINK-4544 > URL: https://issues.apache.org/jira/browse/FLINK-4544 > Project: Flink > Issue Type: Bug > Components: Metrics > Affects Versions: 1.1.2 > Reporter: Stephan Ewen > Assignee: Chesnay Schepler > Fix For: 1.2.0, 1.1.4 > > > The TaskManager's CPU load magic may fail when JMX providers are overwritten. > The TaskManager logic checks if the class > {{com.sun.management.OperatingSystemMXBean}} is available. If yes, it assumes > that the {{ManagementFactory.getOperatingSystemMXBean()}} is of that type. > That is not necessarily the case. > This is visible in the Cassandra tests, as Cassandra overrides the JMX > provider - every heartbeat causes an exception that is logged (See below), > flooding the log, killing the heartbeat message. > I would also suggest to move the entire metrics code out of the > {{TaskManager}} class into a dedicated class {{TaskManagerJvmMetrics}}. That > one can, with a static method, install the metrics into the TaskManager's > metric group. > Sample stack trace when default platform beans are overridden: > {code} > 23914 [flink-akka.actor.default-dispatcher-3] WARN > org.apache.flink.runtime.taskmanager.TaskManager - Error retrieving CPU Load > through OperatingSystemMXBean > java.lang.IllegalArgumentException: object is not an instance of declaring > class > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anon$3$$anonfun$getValue$2.apply(TaskManager.scala:2351) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anon$3$$anonfun$getValue$2.apply(TaskManager.scala:2351) > at scala.Option.map(Option.scala:145) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anon$3.getValue(TaskManager.scala:2351) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anon$3.getValue(TaskManager.scala:2348) > at > com.codahale.metrics.json.MetricsModule$GaugeSerializer.serialize(MetricsModule.java:32) > at > com.codahale.metrics.json.MetricsModule$GaugeSerializer.serialize(MetricsModule.java:20) > at > com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeFields(MapSerializer.java:616) > at > com.fasterxml.jackson.databind.ser.std.MapSerializer.serialize(MapSerializer.java:519) > at > com.fasterxml.jackson.databind.ser.std.MapSerializer.serialize(MapSerializer.java:31) > at > com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130) > at > com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:2444) > at > com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:355) > at > com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1442) > at > com.codahale.metrics.json.MetricsModule$MetricRegistrySerializer.serialize(MetricsModule.java:186) > at > com.codahale.metrics.json.MetricsModule$MetricRegistrySerializer.serialize(MetricsModule.java:171) > at > com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130) > at > com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3631) > at > com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:3022) > at > org.apache.flink.runtime.taskmanager.TaskManager.sendHeartbeatToJobManager(TaskManager.scala:1278) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:309) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.testingUtils.TestingTaskManagerLike$$anonfun$handleTestingMessage$1.applyOrElse(TestingTaskManagerLike.scala:65) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)