[ https://issues.apache.org/jira/browse/FLINK-25801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
王俊博 updated FLINK-25801: ------------------------ Description: flink process add cpu load metric, with user know environment of cpu processor they can determine that their job is io bound /cpu bound . But flink doesn't add container access cpu processor metric, if cpu environment of taskmanager is different(Cpu cores), it's hard to calculate cpu used of flink. {code:java} //代码占位符 metrics.<Double, Gauge<Double>>gauge("Load", mxBean::getProcessCpuLoad); metrics.<Long, Gauge<Long>>gauge("Time", mxBean::getProcessCpuTime); {code} Spark give totalCores to show Number of cores available in this executor in ExecutorSummary. [https://spark.apache.org/docs/3.1.1/monitoring.html#:~:text=totalCores,in%20this%20executor.] {code:java} //代码占位符 val sb = new StringBuilder sb.append(s"""spark_info{version="$SPARK_VERSION_SHORT", revision="$SPARK_REVISION"} 1.0\n""") val store = uiRoot.asInstanceOf[SparkUI].store store.executorList(true).foreach { executor => val prefix = "metrics_executor_" val labels = Seq( "application_id" -> store.applicationInfo.id, "application_name" -> store.applicationInfo.name, "executor_id" -> executor.id ).map { case (k, v) => s"""$k="$v"""" }.mkString("{", ", ", "}") sb.append(s"${prefix}rddBlocks$labels ${executor.rddBlocks}\n") sb.append(s"${prefix}memoryUsed_bytes$labels ${executor.memoryUsed}\n") sb.append(s"${prefix}diskUsed_bytes$labels ${executor.diskUsed}\n") sb.append(s"${prefix}totalCores$labels ${executor.totalCores}\n") }{code} Spark add jvmCpuTime like this. {code:java} //代码占位符 metricRegistry.register(MetricRegistry.name("jvmCpuTime"), new Gauge[Long] { val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer val name = new ObjectName("java.lang", "type", "OperatingSystem") override def getValue: Long = { try { // return JVM process CPU time if the ProcessCpuTime method is available mBean.getAttribute(name, "ProcessCpuTime").asInstanceOf[Long] } catch { case NonFatal(_) => -1L } } {code} was: flink process add cpu load metric, with user know environment of cpu processor they can determine that their job is io bound /cpu bound . But flink doesn't add container access cpu processor metric, if cpu environment of taskmanager is different(Cpu cores), it's hard to calculate cpu used of flink. {code:java} //代码占位符 metrics.<Double, Gauge<Double>>gauge("Load", mxBean::getProcessCpuLoad); metrics.<Long, Gauge<Long>>gauge("Time", mxBean::getProcessCpuTime); {code} Spark give totalCores to show Number of cores available in this executor in ExecutorSummary. [https://spark.apache.org/docs/3.1.1/monitoring.html#:~:text=totalCores,in%20this%20executor.] {code:java} //代码占位符 val sb = new StringBuilder sb.append(s"""spark_info{version="$SPARK_VERSION_SHORT", revision="$SPARK_REVISION"} 1.0\n""") val store = uiRoot.asInstanceOf[SparkUI].store store.executorList(true).foreach { executor => val prefix = "metrics_executor_" val labels = Seq( "application_id" -> store.applicationInfo.id, "application_name" -> store.applicationInfo.name, "executor_id" -> executor.id ).map { case (k, v) => s"""$k="$v"""" }.mkString("{", ", ", "}") sb.append(s"${prefix}rddBlocks$labels ${executor.rddBlocks}\n") sb.append(s"${prefix}memoryUsed_bytes$labels ${executor.memoryUsed}\n") sb.append(s"${prefix}diskUsed_bytes$labels ${executor.diskUsed}\n") sb.append(s"${prefix}totalCores$labels ${executor.totalCores}\n") {code} > add cpu processor metric of taskmanager > --------------------------------------- > > Key: FLINK-25801 > URL: https://issues.apache.org/jira/browse/FLINK-25801 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics > Reporter: 王俊博 > Priority: Minor > > flink process add cpu load metric, with user know environment of cpu > processor they can determine that their job is io bound /cpu bound . But > flink doesn't add container access cpu processor metric, if cpu environment > of taskmanager is different(Cpu cores), it's hard to calculate cpu used of > flink. > > {code:java} > //代码占位符 > metrics.<Double, Gauge<Double>>gauge("Load", mxBean::getProcessCpuLoad); > metrics.<Long, Gauge<Long>>gauge("Time", mxBean::getProcessCpuTime); {code} > Spark give totalCores to show Number of cores available in this executor in > ExecutorSummary. > [https://spark.apache.org/docs/3.1.1/monitoring.html#:~:text=totalCores,in%20this%20executor.] > {code:java} > //代码占位符 > val sb = new StringBuilder > sb.append(s"""spark_info{version="$SPARK_VERSION_SHORT", > revision="$SPARK_REVISION"} 1.0\n""") > val store = uiRoot.asInstanceOf[SparkUI].store > store.executorList(true).foreach { executor => > val prefix = "metrics_executor_" > val labels = Seq( > "application_id" -> store.applicationInfo.id, > "application_name" -> store.applicationInfo.name, > "executor_id" -> executor.id > ).map { case (k, v) => s"""$k="$v"""" }.mkString("{", ", ", "}") > sb.append(s"${prefix}rddBlocks$labels ${executor.rddBlocks}\n") > sb.append(s"${prefix}memoryUsed_bytes$labels ${executor.memoryUsed}\n") > sb.append(s"${prefix}diskUsed_bytes$labels ${executor.diskUsed}\n") > sb.append(s"${prefix}totalCores$labels ${executor.totalCores}\n") > }{code} > Spark add jvmCpuTime like this. > {code:java} > //代码占位符 > metricRegistry.register(MetricRegistry.name("jvmCpuTime"), new Gauge[Long] { > val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer > val name = new ObjectName("java.lang", "type", "OperatingSystem") > override def getValue: Long = { > try { > // return JVM process CPU time if the ProcessCpuTime method is available > mBean.getAttribute(name, "ProcessCpuTime").asInstanceOf[Long] > } catch { > case NonFatal(_) => -1L > } > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)