[ 
https://issues.apache.org/jira/browse/FLINK-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16570083#comment-16570083
 ] 

ASF GitHub Bot commented on FLINK-7812:
---------------------------------------

asfgit closed pull request #4801: [FLINK-7812] Log system resources metrics
URL: https://github.com/apache/flink/pull/4801
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/metric_configuration.html 
b/docs/_includes/generated/metric_configuration.html
index aef8fbb4f60..98054e94224 100644
--- a/docs/_includes/generated/metric_configuration.html
+++ b/docs/_includes/generated/metric_configuration.html
@@ -67,5 +67,15 @@
             <td style="word-wrap: 
break-word;">"&lt;host&gt;.taskmanager.&lt;tm_id&gt;.&lt;job_name&gt;"</td>
             <td>Defines the scope format string that is applied to all metrics 
scoped to a job on a TaskManager.</td>
         </tr>
+        <tr>
+            <td><h5>metrics.system-resource</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>metrics.system-resource-probing-interval</h5></td>
+            <td style="word-wrap: break-word;">5000</td>
+            <td></td>
+        </tr>
     </tbody>
 </table>
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 55f626ed016..554e1c5b1f2 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1396,6 +1396,150 @@ Thus, in order to infer the metric identifier:
   </tbody>
 </table>
 
+### System resources
+
+System resources reporting is disabled by default. When 
`metrics.system-resource`
+is enabled additional metrics listed below will be available on Job- and 
TaskManager.
+System resources metrics are updated periodically and they present average 
values for a
+configured interval (`metrics.system-resource-probing-interval`).
+
+System resources reporting requires an optional dependency to be present on the
+classpath (for example placed in Flink's `lib` directory):
+
+  - `com.github.oshi:oshi-core:3.4.0` (licensed under EPL 1.0 license)
+
+Including it's transitive dependencies:
+
+  - `net.java.dev.jna:jna-platform:jar:4.2.2`
+  - `net.java.dev.jna:jna:jar:4.2.2`
+
+Failures in this regard will be reported as warning messages like 
`NoClassDefFoundError`
+logged by `SystemResourcesMetricsInitializer` during the startup.
+
+#### System CPU
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Scope</th>
+      <th class="text-left" style="width: 25%">Infix</th>
+      <th class="text-left" style="width: 23%">Metrics</th>
+      <th class="text-left" style="width: 32%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <th rowspan="12"><strong>Job-/TaskManager</strong></th>
+      <td rowspan="12">System.CPU</td>
+      <td>Usage</td>
+      <td>Overall % of CPU usage on the machine.</td>
+    </tr>
+    <tr>
+      <td>Idle</td>
+      <td>% of CPU Idle usage on the machine.</td>
+    </tr>
+    <tr>
+      <td>Sys</td>
+      <td>% of System CPU usage on the machine.</td>
+    </tr>
+    <tr>
+      <td>User</td>
+      <td>% of User CPU usage on the machine.</td>
+    </tr>
+    <tr>
+      <td>IOWait</td>
+      <td>% of IOWait CPU usage on the machine.</td>
+    </tr>
+    <tr>
+      <td>Irq</td>
+      <td>% of Irq CPU usage on the machine.</td>
+    </tr>
+    <tr>
+      <td>SoftIrq</td>
+      <td>% of SoftIrq CPU usage on the machine.</td>
+    </tr>
+    <tr>
+      <td>Nice</td>
+      <td>% of Nice Idle usage on the machine.</td>
+    </tr>
+    <tr>
+      <td>Load1min</td>
+      <td>Average CPU load over 1 minute</td>
+    </tr>
+    <tr>
+      <td>Load5min</td>
+      <td>Average CPU load over 5 minute</td>
+    </tr>
+    <tr>
+      <td>Load15min</td>
+      <td>Average CPU load over 15 minute</td>
+    </tr>
+    <tr>
+      <td>UsageCPU*</td>
+      <td>% of CPU usage per each processor</td>
+    </tr>
+  </tbody>
+</table>
+
+#### System memory
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Scope</th>
+      <th class="text-left" style="width: 25%">Infix</th>
+      <th class="text-left" style="width: 23%">Metrics</th>
+      <th class="text-left" style="width: 32%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <th rowspan="4"><strong>Job-/TaskManager</strong></th>
+      <td rowspan="2">System.Memory</td>
+      <td>Available</td>
+      <td>Available memory in bytes</td>
+    </tr>
+    <tr>
+      <td>Total</td>
+      <td>Total memory in bytes</td>
+    </tr>
+    <tr>
+      <td rowspan="2">System.Swap</td>
+      <td>Used</td>
+      <td>Used swap bytes</td>
+    </tr>
+    <tr>
+      <td>Total</td>
+      <td>Total swap in bytes</td>
+    </tr>
+  </tbody>
+</table>
+
+#### System network
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Scope</th>
+      <th class="text-left" style="width: 25%">Infix</th>
+      <th class="text-left" style="width: 23%">Metrics</th>
+      <th class="text-left" style="width: 32%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <th rowspan="2"><strong>Job-/TaskManager</strong></th>
+      <td rowspan="2">System.Network.INTERFACE_NAME</td>
+      <td>ReceiveRate</td>
+      <td>Average receive rate in bytes per second</td>
+    </tr>
+    <tr>
+      <td>SendRate</td>
+      <td>Average send rate in bytes per second</td>
+    </tr>
+  </tbody>
+</table>
+
 ## Latency tracking
 
 Flink allows to track the latency of records traveling through the system. To 
enable the latency tracking
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index 1b308217770..7b717bdd05b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -18,12 +18,18 @@
 
 package org.apache.flink.configuration;
 
+import org.apache.flink.api.common.time.Time;
+
 import javax.annotation.Nonnull;
 
 import java.io.File;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 
+import static 
org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS;
+import static 
org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL;
+
 /**
  * Utility class for {@link Configuration} related helper functions.
  */
@@ -69,6 +75,19 @@ public static MemorySize 
getTaskManagerHeapMemory(Configuration configuration) {
                }
        }
 
+       /**
+        * @return extracted {@link 
MetricOptions#SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL} or {@code 
Optional.empty()} if
+        * {@link MetricOptions#SYSTEM_RESOURCE_METRICS} are disabled.
+        */
+       public static Optional<Time> 
getSystemResourceMetricsProbingInterval(Configuration configuration) {
+               if (!configuration.getBoolean(SYSTEM_RESOURCE_METRICS)) {
+                       return Optional.empty();
+               } else {
+                       return Optional.of(Time.milliseconds(
+                               
configuration.getLong(SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL)));
+               }
+       }
+
        /**
         * Extracts the task manager directories for temporary files as defined 
by
         * {@link org.apache.flink.configuration.CoreOptions#TMP_DIRS}.
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index 3b11645fae8..f9fd02423d8 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -110,6 +110,20 @@
                        .defaultValue(128)
                        .withDescription("Defines the number of measured 
latencies to maintain at each operator.");
 
+       /**
+        * Whether Flink should report system resource metrics such as 
machine's CPU, memory or network usage.
+        */
+       public static final ConfigOption<Boolean> SYSTEM_RESOURCE_METRICS =
+               key("metrics.system-resource")
+                       .defaultValue(false);
+       /**
+        * Interval between probing of system resource metrics specified in 
milliseconds. Has an effect only when
+        * {@link #SYSTEM_RESOURCE_METRICS} is enabled.
+        */
+       public static final ConfigOption<Long> 
SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL =
+               key("metrics.system-resource-probing-interval")
+                       .defaultValue(5000L);
+
        private MetricOptions() {
        }
 }
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index e163881d90b..bc4a3cb8409 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -306,6 +306,13 @@ under the License.
                        <groupId>org.reflections</groupId>
                        <artifactId>reflections</artifactId>
                </dependency>
+
+               <!-- Used only for additional logging. Optional because of 
unclear EPL 1.0 license compatibility. -->
+               <dependency>
+                       <groupId>com.github.oshi</groupId>
+                       <artifactId>oshi-core</artifactId>
+                       <optional>true</optional>
+               </dependency>
        </dependencies>
 
        <!-- Dependency Management to converge transitive dependency versions 
-->
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index b429de5d7de..ddd3751cc2a 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -346,7 +346,10 @@ protected void startClusterComponents(
                                clusterInformation,
                                webMonitorEndpoint.getRestBaseUrl());
 
-                       jobManagerMetricGroup = 
MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, 
rpcService.getAddress());
+                       jobManagerMetricGroup = 
MetricUtils.instantiateJobManagerMetricGroup(
+                               metricRegistry,
+                               rpcService.getAddress(),
+                               
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
 
                        final HistoryServerArchivist historyServerArchivist = 
HistoryServerArchivist.createHistoryServerArchivist(configuration, 
webMonitorEndpoint);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index 3fd268a1aeb..b150631cddd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.metrics.util;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -41,9 +43,12 @@
 import java.lang.management.ClassLoadingMXBean;
 import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
 import java.lang.management.ThreadMXBean;
 import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.metrics.util.SystemResourcesMetricsInitializer.instantiateSystemMetrics;
 
 /**
  * Utility class to register pre-defined metric sets.
@@ -57,7 +62,8 @@ private MetricUtils() {
 
        public static JobManagerMetricGroup instantiateJobManagerMetricGroup(
                        final MetricRegistry metricRegistry,
-                       final String hostname) {
+                       final String hostname,
+                       final Optional<Time> systemResourceProbeInterval) {
                final JobManagerMetricGroup jobManagerMetricGroup = new 
JobManagerMetricGroup(
                        metricRegistry,
                        hostname);
@@ -67,13 +73,17 @@ public static JobManagerMetricGroup 
instantiateJobManagerMetricGroup(
                // initialize the JM metrics
                instantiateStatusMetrics(statusGroup);
 
+               if (systemResourceProbeInterval.isPresent()) {
+                       instantiateSystemMetrics(jobManagerMetricGroup, 
systemResourceProbeInterval.get());
+               }
                return jobManagerMetricGroup;
        }
 
        public static TaskManagerMetricGroup instantiateTaskManagerMetricGroup(
                        MetricRegistry metricRegistry,
                        TaskManagerLocation taskManagerLocation,
-                       NetworkEnvironment network) {
+                       NetworkEnvironment network,
+                       Optional<Time> systemResourceProbeInterval) {
                final TaskManagerMetricGroup taskManagerMetricGroup = new 
TaskManagerMetricGroup(
                        metricRegistry,
                        taskManagerLocation.getHostname(),
@@ -88,6 +98,9 @@ public static TaskManagerMetricGroup 
instantiateTaskManagerMetricGroup(
                        .addGroup("Network");
                instantiateNetworkMetrics(networkGroup, network);
 
+               if (systemResourceProbeInterval.isPresent()) {
+                       instantiateSystemMetrics(taskManagerMetricGroup, 
systemResourceProbeInterval.get());
+               }
                return taskManagerMetricGroup;
        }
 
@@ -105,37 +118,16 @@ public static void instantiateStatusMetrics(
        private static void instantiateNetworkMetrics(
                MetricGroup metrics,
                final NetworkEnvironment network) {
-               metrics.<Long, Gauge<Long>>gauge("TotalMemorySegments", new 
Gauge<Long> () {
-                       @Override
-                       public Long getValue() {
-                               return (long) 
network.getNetworkBufferPool().getTotalNumberOfMemorySegments();
-                       }
-               });
 
-               metrics.<Long, Gauge<Long>>gauge("AvailableMemorySegments", new 
Gauge<Long> () {
-                       @Override
-                       public Long getValue() {
-                               return (long) 
network.getNetworkBufferPool().getNumberOfAvailableMemorySegments();
-                       }
-               });
+               final NetworkBufferPool networkBufferPool = 
network.getNetworkBufferPool();
+               metrics.<Integer, Gauge<Integer>>gauge("TotalMemorySegments", 
networkBufferPool::getTotalNumberOfMemorySegments);
+               metrics.<Integer, 
Gauge<Integer>>gauge("AvailableMemorySegments", 
networkBufferPool::getNumberOfAvailableMemorySegments);
        }
 
        private static void instantiateClassLoaderMetrics(MetricGroup metrics) {
                final ClassLoadingMXBean mxBean = 
ManagementFactory.getClassLoadingMXBean();
-
-               metrics.<Long, Gauge<Long>>gauge("ClassesLoaded", new 
Gauge<Long> () {
-                       @Override
-                       public Long getValue() {
-                               return mxBean.getTotalLoadedClassCount();
-                       }
-               });
-
-               metrics.<Long, Gauge<Long>>gauge("ClassesUnloaded", new 
Gauge<Long> () {
-                       @Override
-                       public Long getValue() {
-                               return mxBean.getUnloadedClassCount();
-                       }
-               });
+               metrics.<Long, Gauge<Long>>gauge("ClassesLoaded", 
mxBean::getTotalLoadedClassCount);
+               metrics.<Long, Gauge<Long>>gauge("ClassesUnloaded", 
mxBean::getUnloadedClassCount);
        }
 
        private static void instantiateGarbageCollectorMetrics(MetricGroup 
metrics) {
@@ -144,66 +136,26 @@ private static void 
instantiateGarbageCollectorMetrics(MetricGroup metrics) {
                for (final GarbageCollectorMXBean garbageCollector: 
garbageCollectors) {
                        MetricGroup gcGroup = 
metrics.addGroup(garbageCollector.getName());
 
-                       gcGroup.<Long, Gauge<Long>>gauge("Count", new 
Gauge<Long> () {
-                               @Override
-                               public Long getValue() {
-                                       return 
garbageCollector.getCollectionCount();
-                               }
-                       });
-
-                       gcGroup.<Long, Gauge<Long>>gauge("Time", new 
Gauge<Long> () {
-                               @Override
-                               public Long getValue() {
-                                       return 
garbageCollector.getCollectionTime();
-                               }
-                       });
+                       gcGroup.<Long, Gauge<Long>>gauge("Count", 
garbageCollector::getCollectionCount);
+                       gcGroup.<Long, Gauge<Long>>gauge("Time", 
garbageCollector::getCollectionTime);
                }
        }
 
        private static void instantiateMemoryMetrics(MetricGroup metrics) {
-               final MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
+               final MemoryUsage heapMemoryUsage = 
ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
+               final MemoryUsage nonHeapMemoryUsage = 
ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage();
 
                MetricGroup heap = metrics.addGroup("Heap");
 
-               heap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () {
-                       @Override
-                       public Long getValue() {
-                               return mxBean.getHeapMemoryUsage().getUsed();
-                       }
-               });
-               heap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () {
-                       @Override
-                       public Long getValue() {
-                               return 
mxBean.getHeapMemoryUsage().getCommitted();
-                       }
-               });
-               heap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () {
-                       @Override
-                       public Long getValue() {
-                               return mxBean.getHeapMemoryUsage().getMax();
-                       }
-               });
+               heap.<Long, Gauge<Long>>gauge("Used", heapMemoryUsage::getUsed);
+               heap.<Long, Gauge<Long>>gauge("Committed", 
heapMemoryUsage::getCommitted);
+               heap.<Long, Gauge<Long>>gauge("Max", heapMemoryUsage::getMax);
 
                MetricGroup nonHeap = metrics.addGroup("NonHeap");
 
-               nonHeap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () {
-                       @Override
-                       public Long getValue() {
-                               return mxBean.getNonHeapMemoryUsage().getUsed();
-                       }
-               });
-               nonHeap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> 
() {
-                       @Override
-                       public Long getValue() {
-                               return 
mxBean.getNonHeapMemoryUsage().getCommitted();
-                       }
-               });
-               nonHeap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () {
-                       @Override
-                       public Long getValue() {
-                               return mxBean.getNonHeapMemoryUsage().getMax();
-                       }
-               });
+               nonHeap.<Long, Gauge<Long>>gauge("Used", 
nonHeapMemoryUsage::getUsed);
+               nonHeap.<Long, Gauge<Long>>gauge("Committed", 
nonHeapMemoryUsage::getCommitted);
+               nonHeap.<Long, Gauge<Long>>gauge("Max", 
nonHeapMemoryUsage::getMax);
 
                final MBeanServer con = 
ManagementFactory.getPlatformMBeanServer();
 
@@ -239,30 +191,15 @@ public Long getValue() {
        private static void instantiateThreadMetrics(MetricGroup metrics) {
                final ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
 
-               metrics.<Integer, Gauge<Integer>>gauge("Count", new 
Gauge<Integer> () {
-                       @Override
-                       public Integer getValue() {
-                               return mxBean.getThreadCount();
-                       }
-               });
+               metrics.<Integer, Gauge<Integer>>gauge("Count", 
mxBean::getThreadCount);
        }
 
        private static void instantiateCPUMetrics(MetricGroup metrics) {
                try {
                        final com.sun.management.OperatingSystemMXBean mxBean = 
(com.sun.management.OperatingSystemMXBean) 
ManagementFactory.getOperatingSystemMXBean();
 
-                       metrics.<Double, Gauge<Double>>gauge("Load", new 
Gauge<Double> () {
-                               @Override
-                               public Double getValue() {
-                                       return mxBean.getProcessCpuLoad();
-                               }
-                       });
-                       metrics.<Long, Gauge<Long>>gauge("Time", new 
Gauge<Long> () {
-                               @Override
-                               public Long getValue() {
-                                       return mxBean.getProcessCpuTime();
-                               }
-                       });
+                       metrics.<Double, Gauge<Double>>gauge("Load", 
mxBean::getProcessCpuLoad);
+                       metrics.<Long, Gauge<Long>>gauge("Time", 
mxBean::getProcessCpuTime);
                } catch (Exception e) {
                        LOG.warn("Cannot access 
com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
                                " - CPU load metrics will not be available.", 
e);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/SystemResourcesCounter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/SystemResourcesCounter.java
new file mode 100644
index 00000000000..73656172429
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/SystemResourcesCounter.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.api.common.time.Time;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import oshi.SystemInfo;
+import oshi.hardware.CentralProcessor;
+import oshi.hardware.CentralProcessor.TickType;
+import oshi.hardware.HardwareAbstractionLayer;
+import oshi.hardware.NetworkIF;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Daemon thread probing system resources.
+ *
+ * <p>To accurately and consistently report CPU and network usage we have to 
periodically probe
+ * CPU ticks and network sent/received bytes and then convert those values to 
CPU usage and
+ * send/receive byte rates.
+ */
+@ThreadSafe
+public class SystemResourcesCounter extends Thread {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(SystemResourcesCounter.class);
+
+       private final long probeIntervalMs;
+       private final SystemInfo systemInfo = new SystemInfo();
+       private final HardwareAbstractionLayer hardwareAbstractionLayer = 
systemInfo.getHardware();
+
+       private volatile boolean running = true;
+
+       private long[] previousCpuTicks;
+       private long[] bytesReceivedPerInterface;
+       private long[] bytesSentPerInterface;
+
+       private volatile double cpuUser;
+       private volatile double cpuNice;
+       private volatile double cpuSys;
+       private volatile double cpuIdle;
+       private volatile double cpuIOWait;
+       private volatile double cpuIrq;
+       private volatile double cpuSoftIrq;
+       private volatile double cpuUsage;
+
+       private volatile double cpuLoad1;
+       private volatile double cpuLoad5;
+       private volatile double cpuLoad15;
+
+       private AtomicReferenceArray<Double> cpuUsagePerProcessor;
+
+       private final String[] networkInterfaceNames;
+
+       private AtomicLongArray receiveRatePerInterface;
+       private AtomicLongArray sendRatePerInterface;
+
+       public SystemResourcesCounter(Time probeInterval) {
+               probeIntervalMs = probeInterval.toMilliseconds();
+               checkState(this.probeIntervalMs > 0);
+
+               setName(SystemResourcesCounter.class.getSimpleName() + " 
probing thread");
+
+               cpuUsagePerProcessor = new 
AtomicReferenceArray<>(hardwareAbstractionLayer.getProcessor().getLogicalProcessorCount());
+
+               NetworkIF[] networkIFs = 
hardwareAbstractionLayer.getNetworkIFs();
+               bytesReceivedPerInterface = new long[networkIFs.length];
+               bytesSentPerInterface = new long[networkIFs.length];
+               receiveRatePerInterface = new 
AtomicLongArray(networkIFs.length);
+               sendRatePerInterface = new AtomicLongArray(networkIFs.length);
+               networkInterfaceNames = new String[networkIFs.length];
+
+               for (int i = 0; i < networkInterfaceNames.length; i++) {
+                       networkInterfaceNames[i] = networkIFs[i].getName();
+               }
+       }
+
+       @Override
+       public void run() {
+               try {
+                       while (running) {
+                               
calculateCPUUsage(hardwareAbstractionLayer.getProcessor());
+                               
calculateNetworkUsage(hardwareAbstractionLayer.getNetworkIFs());
+                               Thread.sleep(probeIntervalMs);
+                       }
+               } catch (InterruptedException e) {
+                       if (running) {
+                               LOG.warn("{} has failed", 
SystemResourcesCounter.class.getSimpleName(), e);
+                       }
+               }
+       }
+
+       public void shutdown() throws InterruptedException {
+               running = false;
+               interrupt();
+               join();
+       }
+
+       public double getCpuUser() {
+               return cpuUser;
+       }
+
+       public double getCpuNice() {
+               return cpuNice;
+       }
+
+       public double getCpuSys() {
+               return cpuSys;
+       }
+
+       public double getCpuIdle() {
+               return cpuIdle;
+       }
+
+       public double getIOWait() {
+               return cpuIOWait;
+       }
+
+       public double getCpuIrq() {
+               return cpuIrq;
+       }
+
+       public double getCpuSoftIrq() {
+               return cpuSoftIrq;
+       }
+
+       public double getCpuUsage() {
+               return cpuUsage;
+       }
+
+       public double getCpuLoad1() {
+               return cpuLoad1;
+       }
+
+       public double getCpuLoad5() {
+               return cpuLoad5;
+       }
+
+       public double getCpuLoad15() {
+               return cpuLoad15;
+       }
+
+       public int getProcessorsCount() {
+               return cpuUsagePerProcessor.length();
+       }
+
+       public double getCpuUsagePerProcessor(int processor) {
+               return cpuUsagePerProcessor.get(processor);
+       }
+
+       public String[] getNetworkInterfaceNames() {
+               return networkInterfaceNames;
+       }
+
+       public long getReceiveRatePerInterface(int interfaceNo) {
+               return receiveRatePerInterface.get(interfaceNo);
+       }
+
+       public long getSendRatePerInterface(int interfaceNo) {
+               return sendRatePerInterface.get(interfaceNo);
+       }
+
+       private void calculateCPUUsage(CentralProcessor processor) {
+               long[] ticks = processor.getSystemCpuLoadTicks();
+               if (this.previousCpuTicks == null) {
+                       this.previousCpuTicks = ticks;
+               }
+
+               long userTicks = ticks[TickType.USER.getIndex()] - 
previousCpuTicks[TickType.USER.getIndex()];
+               long niceTicks = ticks[TickType.NICE.getIndex()] - 
previousCpuTicks[TickType.NICE.getIndex()];
+               long sysTicks = ticks[TickType.SYSTEM.getIndex()] - 
previousCpuTicks[TickType.SYSTEM.getIndex()];
+               long idleTicks = ticks[TickType.IDLE.getIndex()] - 
previousCpuTicks[TickType.IDLE.getIndex()];
+               long iowaitTicks = ticks[TickType.IOWAIT.getIndex()] - 
previousCpuTicks[TickType.IOWAIT.getIndex()];
+               long irqTicks = ticks[TickType.IRQ.getIndex()] - 
previousCpuTicks[TickType.IRQ.getIndex()];
+               long softIrqTicks = ticks[TickType.SOFTIRQ.getIndex()] - 
previousCpuTicks[TickType.SOFTIRQ.getIndex()];
+               long totalCpuTicks = userTicks + niceTicks + sysTicks + 
idleTicks + iowaitTicks + irqTicks + softIrqTicks;
+               this.previousCpuTicks = ticks;
+
+               cpuUser = 100d * userTicks / totalCpuTicks;
+               cpuNice = 100d * niceTicks / totalCpuTicks;
+               cpuSys = 100d * sysTicks / totalCpuTicks;
+               cpuIdle = 100d * idleTicks / totalCpuTicks;
+               cpuIOWait = 100d * iowaitTicks / totalCpuTicks;
+               cpuIrq = 100d * irqTicks / totalCpuTicks;
+               cpuSoftIrq = 100d * softIrqTicks / totalCpuTicks;
+
+               cpuUsage = processor.getSystemCpuLoad() * 100;
+
+               double[] loadAverage = processor.getSystemLoadAverage(3);
+               cpuLoad1 = (loadAverage[0] < 0 ? Double.NaN : loadAverage[0]);
+               cpuLoad5 = (loadAverage[1] < 0 ? Double.NaN : loadAverage[1]);
+               cpuLoad15 = (loadAverage[2] < 0 ? Double.NaN : loadAverage[2]);
+
+               double[] load = processor.getProcessorCpuLoadBetweenTicks();
+               checkState(load.length == cpuUsagePerProcessor.length());
+               for (int i = 0; i < load.length; i++) {
+                       cpuUsagePerProcessor.set(i, load[i] * 100);
+               }
+       }
+
+       private void calculateNetworkUsage(NetworkIF[] networkIFs) {
+               checkState(networkIFs.length == 
receiveRatePerInterface.length());
+
+               for (int i = 0; i < networkIFs.length; i++) {
+                       NetworkIF networkIF = networkIFs[i];
+                       networkIF.updateNetworkStats();
+
+                       receiveRatePerInterface.set(i, 
(networkIF.getBytesRecv() - bytesReceivedPerInterface[i]) * 1000 / 
probeIntervalMs);
+                       sendRatePerInterface.set(i, (networkIF.getBytesSent() - 
bytesSentPerInterface[i]) * 1000 / probeIntervalMs);
+
+                       bytesReceivedPerInterface[i] = networkIF.getBytesRecv();
+                       bytesSentPerInterface[i] = networkIF.getBytesSent();
+               }
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/SystemResourcesMetricsInitializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/SystemResourcesMetricsInitializer.java
new file mode 100644
index 00000000000..01a93474d5f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/SystemResourcesMetricsInitializer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import oshi.SystemInfo;
+import oshi.hardware.GlobalMemory;
+import oshi.hardware.HardwareAbstractionLayer;
+
+/**
+ * Utility class to initialize system resource metrics.
+ */
+public class SystemResourcesMetricsInitializer {
+       private static final Logger LOG = 
LoggerFactory.getLogger(SystemResourcesMetricsInitializer.class);
+
+       public static void instantiateSystemMetrics(MetricGroup metricGroup, 
Time probeInterval) {
+               try {
+                       MetricGroup system = metricGroup.addGroup("System");
+
+                       SystemResourcesCounter systemResourcesCounter = new 
SystemResourcesCounter(probeInterval);
+                       systemResourcesCounter.start();
+
+                       SystemInfo systemInfo = new SystemInfo();
+                       HardwareAbstractionLayer hardwareAbstractionLayer = 
systemInfo.getHardware();
+
+                       instantiateMemoryMetrics(system.addGroup("Memory"), 
hardwareAbstractionLayer.getMemory());
+                       instantiateSwapMetrics(system.addGroup("Swap"), 
hardwareAbstractionLayer.getMemory());
+                       instantiateCPUMetrics(system.addGroup("CPU"), 
systemResourcesCounter);
+                       instantiateNetworkMetrics(system.addGroup("Network"), 
systemResourcesCounter);
+               }
+               catch (NoClassDefFoundError ex) {
+                       LOG.warn(
+                               "Failed to initialize system resource metrics 
because of missing class definitions." +
+                               " Did you forget to explicitly add the 
oshi-core optional dependency?",
+                               ex);
+               }
+       }
+
+       private static void instantiateMemoryMetrics(MetricGroup metrics, 
GlobalMemory memory) {
+               metrics.<Long, Gauge<Long>>gauge("Available", 
memory::getAvailable);
+               metrics.<Long, Gauge<Long>>gauge("Total", memory::getTotal);
+       }
+
+       private static void instantiateSwapMetrics(MetricGroup metrics, 
GlobalMemory memory) {
+               metrics.<Long, Gauge<Long>>gauge("Used", memory::getSwapUsed);
+               metrics.<Long, Gauge<Long>>gauge("Total", memory::getSwapTotal);
+       }
+
+       private static void instantiateCPUMetrics(MetricGroup metrics, 
SystemResourcesCounter usageCounter) {
+               metrics.<Double, Gauge<Double>>gauge("Usage", 
usageCounter::getCpuUsage);
+               metrics.<Double, Gauge<Double>>gauge("Idle", 
usageCounter::getCpuIdle);
+               metrics.<Double, Gauge<Double>>gauge("Sys", 
usageCounter::getCpuSys);
+               metrics.<Double, Gauge<Double>>gauge("User", 
usageCounter::getCpuUser);
+               metrics.<Double, Gauge<Double>>gauge("IOWait", 
usageCounter::getIOWait);
+               metrics.<Double, Gauge<Double>>gauge("Nice", 
usageCounter::getCpuNice);
+               metrics.<Double, Gauge<Double>>gauge("Irq", 
usageCounter::getCpuIrq);
+               metrics.<Double, Gauge<Double>>gauge("SoftIrq", 
usageCounter::getCpuSoftIrq);
+
+               metrics.<Double, Gauge<Double>>gauge("Load1min", 
usageCounter::getCpuLoad1);
+               metrics.<Double, Gauge<Double>>gauge("Load5min", 
usageCounter::getCpuLoad5);
+               metrics.<Double, Gauge<Double>>gauge("Load15min", 
usageCounter::getCpuLoad15);
+
+               for (int i = 0; i < usageCounter.getProcessorsCount(); i++) {
+                       final int processor = i;
+                       metrics.<Double, Gauge<Double>>gauge(
+                               String.format("UsageCPU%d", processor),
+                               () -> 
usageCounter.getCpuUsagePerProcessor(processor));
+               }
+       }
+
+       private static void instantiateNetworkMetrics(MetricGroup metrics, 
SystemResourcesCounter usageCounter) {
+               for (int i = 0; i < 
usageCounter.getNetworkInterfaceNames().length; i++) {
+                       MetricGroup interfaceGroup = 
metrics.addGroup(usageCounter.getNetworkInterfaceNames()[i]);
+
+                       final int interfaceNo = i;
+                       interfaceGroup.<Long, Gauge<Long>>gauge("ReceiveRate", 
() -> usageCounter.getReceiveRatePerInterface(interfaceNo));
+                       interfaceGroup.<Long, Gauge<Long>>gauge("SendRate", () 
-> usageCounter.getSendRatePerInterface(interfaceNo));
+               }
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 97ab5a55298..8054a383739 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -24,6 +24,7 @@
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
@@ -354,7 +355,10 @@ public void start() throws Exception {
                                // bring up the dispatcher that launches 
JobManagers when jobs submitted
                                LOG.info("Starting job dispatcher(s) for 
JobManger");
 
-                               this.jobManagerMetricGroup = 
MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, "localhost");
+                               this.jobManagerMetricGroup = 
MetricUtils.instantiateJobManagerMetricGroup(
+                                       metricRegistry,
+                                       "localhost",
+                                       
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
 
                                final HistoryServerArchivist 
historyServerArchivist = 
HistoryServerArchivist.createHistoryServerArchivist(configuration, 
dispatcherRestEndpoint);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index c1f0cc84a39..9ab7f807ae5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -367,7 +367,8 @@ public static TaskExecutor startTaskManager(
                TaskManagerMetricGroup taskManagerMetricGroup = 
MetricUtils.instantiateTaskManagerMetricGroup(
                        metricRegistry,
                        taskManagerServices.getTaskManagerLocation(),
-                       taskManagerServices.getNetworkEnvironment());
+                       taskManagerServices.getNetworkEnvironment(),
+                       
taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
 
                TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 86bc46db4ff..eec39ef2975 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -41,6 +42,7 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
+import java.util.Optional;
 
 import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;
 import static org.apache.flink.util.MathUtils.checkedDownCast;
@@ -83,6 +85,10 @@
 
        private final boolean localRecoveryEnabled;
 
+       private boolean systemResourceMetricsEnabled;
+
+       private Optional<Time> systemResourceMetricsProbingInterval;
+
        public TaskManagerServicesConfiguration(
                        InetAddress taskManagerAddress,
                        String[] tmpDirPaths,
@@ -95,7 +101,8 @@ public TaskManagerServicesConfiguration(
                        MemoryType memoryType,
                        boolean preAllocateMemory,
                        float memoryFraction,
-                       long timerServiceShutdownTimeout) {
+                       long timerServiceShutdownTimeout,
+                       Optional<Time> systemResourceMetricsProbingInterval) {
 
                this.taskManagerAddress = checkNotNull(taskManagerAddress);
                this.tmpDirPaths = checkNotNull(tmpDirPaths);
@@ -113,6 +120,9 @@ public TaskManagerServicesConfiguration(
                checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " +
                        "service shutdown timeout must be greater or equal to 
0.");
                this.timerServiceShutdownTimeout = timerServiceShutdownTimeout;
+
+               this.systemResourceMetricsEnabled = 
systemResourceMetricsEnabled;
+               this.systemResourceMetricsProbingInterval = 
checkNotNull(systemResourceMetricsProbingInterval);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -179,6 +189,10 @@ public long getTimerServiceShutdownTimeout() {
                return timerServiceShutdownTimeout;
        }
 
+       public Optional<Time> getSystemResourceMetricsProbingInterval() {
+               return systemResourceMetricsProbingInterval;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Parsing of Flink configuration
        // 
--------------------------------------------------------------------------------------------
@@ -276,7 +290,8 @@ public static TaskManagerServicesConfiguration 
fromConfiguration(
                        memType,
                        preAllocateMemory,
                        memoryFraction,
-                       timerServiceShutdownTimeout);
+                       timerServiceShutdownTimeout,
+                       
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
        }
 
        // 
--------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2a8f49267d9..0855991eccf 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2519,7 +2519,8 @@ object JobManager {
 
     val jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
       metricRegistry,
-      configuration.getString(JobManagerOptions.ADDRESS))
+      configuration.getString(JobManagerOptions.ADDRESS),
+      
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration))
 
     (instanceManager,
       scheduler,
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index fb578617816..6b9e4a8d1c4 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -47,7 +47,7 @@ import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.messages.JobManagerMessages
 import 
org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, 
StoppingFailure, StoppingResponse}
 import org.apache.flink.runtime.metrics.groups.{JobManagerMetricGroup, 
TaskManagerMetricGroup}
-import org.apache.flink.runtime.metrics.util.MetricUtils
+import org.apache.flink.runtime.metrics.util.{MetricUtils, 
SystemResourcesMetricsInitializer}
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager
 import org.apache.flink.runtime.taskexecutor.{TaskExecutor, 
TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
@@ -246,7 +246,8 @@ class LocalFlinkMiniCluster(
     val taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
       metricRegistryOpt.get,
       taskManagerServices.getTaskManagerLocation(),
-      taskManagerServices.getNetworkEnvironment())
+      taskManagerServices.getNetworkEnvironment(),
+      taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval)
 
     val props = getTaskManagerProps(
       taskManagerClass,
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 62fe86250d5..1de48489f71 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -2019,7 +2019,8 @@ object TaskManager {
     val taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
       metricRegistry,
       taskManagerServices.getTaskManagerLocation(),
-      taskManagerServices.getNetworkEnvironment())
+      taskManagerServices.getNetworkEnvironment(),
+      taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval)
 
     // create the actor properties (which define the actor constructor 
parameters)
     val tmProps = getTaskManagerProps(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index db040232f69..4d1806038b8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -106,7 +106,8 @@ public void testMetricRegistryLifeCycle() throws Exception {
                        TaskManagerMetricGroup taskManagerMetricGroup = 
MetricUtils.instantiateTaskManagerMetricGroup(
                                metricRegistry,
                                taskManagerServices.getTaskManagerLocation(),
-                               taskManagerServices.getNetworkEnvironment());
+                               taskManagerServices.getNetworkEnvironment(),
+                               
taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
 
                        // create the task manager
                        final Props tmProps = TaskManager.getTaskManagerProps(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/utils/SystemResourcesCounterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/utils/SystemResourcesCounterTest.java
new file mode 100644
index 00000000000..f6c228d605a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/utils/SystemResourcesCounterTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.utils;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.metrics.util.SystemResourcesCounter;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Tests for {@link SystemResourcesCounter}.
+ */
+public class SystemResourcesCounterTest {
+
+       private static final double EPSILON = 0.01;
+
+       @Test
+       public void testObtainAnyMetrics() throws InterruptedException {
+               SystemResourcesCounter systemResources = new 
SystemResourcesCounter(Time.milliseconds(10));
+               double initialCpuIdle = systemResources.getCpuIdle();
+
+               systemResources.start();
+               // wait for stats to update/calculate
+               try {
+                       double cpuIdle;
+                       do {
+                               Thread.sleep(1);
+                               cpuIdle = systemResources.getCpuIdle();
+                       }
+                       while (initialCpuIdle == cpuIdle || 
Double.isNaN(cpuIdle) || cpuIdle == 0.0);
+               }
+               finally {
+                       systemResources.shutdown();
+                       systemResources.join();
+               }
+
+               double totalCpuUsage = systemResources.getCpuIrq() +
+                       systemResources.getCpuNice() +
+                       systemResources.getCpuSoftIrq() +
+                       systemResources.getCpuSys() +
+                       systemResources.getCpuUser() +
+                       systemResources.getIOWait();
+
+               assertTrue(
+                       "There should be at least one processor",
+                       systemResources.getProcessorsCount() > 0);
+               assertTrue(
+                       "There should be at least one network interface",
+                       systemResources.getNetworkInterfaceNames().length > 0);
+               assertEquals(100.0, totalCpuUsage + 
systemResources.getCpuIdle(), EPSILON);
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
index 6374cf80ff9..586f937f41b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
@@ -27,6 +27,7 @@
 import org.junit.Test;
 
 import java.net.InetAddress;
+import java.util.Optional;
 
 import static org.apache.flink.util.MathUtils.checkedDownCast;
 import static org.junit.Assert.assertEquals;
@@ -108,6 +109,7 @@ private static TaskManagerServicesConfiguration getTmConfig(
                        memType,
                        false,
                        managedMemoryFraction,
-                       0);
+                       0,
+                       Optional.empty());
        }
 }
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index f9746e16532..80cf8de877d 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -231,6 +231,12 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>com.github.oshi</groupId>
+                       <artifactId>oshi-core</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
                <!-- utility to scan classpaths -->
                <dependency>
                        <groupId>org.reflections</groupId>
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
new file mode 100644
index 00000000000..6d9a7b03703
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST;
+import static 
org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests for proper initialization of the system resource metrics.
+ */
+public class SystemResourcesMetricsITCase {
+
+       @ClassRule
+       public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+               new MiniClusterResourceConfiguration.Builder()
+                       .setConfiguration(getConfiguration())
+                       .setNumberTaskManagers(1)
+                       .setNumberSlotsPerTaskManager(1)
+                       .build());
+
+       private static Configuration getConfiguration() {
+               Configuration configuration = new Configuration();
+               configuration.setBoolean(SYSTEM_RESOURCE_METRICS, true);
+               configuration.setString(REPORTERS_LIST, "test_reporter");
+               configuration.setString("metrics.reporter.test_reporter.class", 
TestReporter.class.getName());
+               return configuration;
+       }
+
+       @Test
+       public void startTaskManagerAndCheckForRegisteredSystemMetrics() throws 
Exception {
+               assertEquals(1, TestReporter.OPENED_REPORTERS.size());
+               TestReporter reporter = 
TestReporter.OPENED_REPORTERS.iterator().next();
+
+               List<String> expectedPatterns = getExpectedPatterns();
+
+               Collection<String> gaugeNames = reporter.getGauges().values();
+
+               for (String expectedPattern : expectedPatterns) {
+                       boolean found = false;
+                       for (String gaugeName : gaugeNames) {
+                               if (gaugeName.matches(expectedPattern)) {
+                                       found = true;
+                               }
+                       }
+                       if (!found) {
+                               fail(String.format("Failed to find gauge [%s] 
in registered gauges [%s]", expectedPattern, gaugeNames));
+                       }
+               }
+       }
+
+       private static List<String> getExpectedPatterns() {
+               String[] expectedGauges = new String[] {
+                       "System.CPU.Idle",
+                       "System.CPU.Sys",
+                       "System.CPU.User",
+                       "System.CPU.IOWait",
+                       "System.CPU.Irq",
+                       "System.CPU.SoftIrq",
+                       "System.CPU.Nice",
+                       "System.Memory.Available",
+                       "System.Memory.Total",
+                       "System.Swap.Used",
+                       "System.Swap.Total",
+                       "System.Network.*ReceiveRate",
+                       "System.Network.*SendRate"
+               };
+
+               String[] expectedHosts = new String[] {
+                       "localhost.taskmanager.([a-f0-9\\\\-])*.",
+                       "localhost.jobmanager."
+               };
+
+               List<String> patterns = new ArrayList<>();
+               for (String expectedHost : expectedHosts) {
+                       for (String expectedGauge : expectedGauges) {
+                               patterns.add(expectedHost + expectedGauge);
+                       }
+               }
+               return patterns;
+       }
+
+       /**
+        * Test metric reporter that exposes registered metrics.
+        */
+       public static final class TestReporter extends AbstractReporter {
+               public static final Set<TestReporter> OPENED_REPORTERS = 
ConcurrentHashMap.newKeySet();
+
+               @Override
+               public String filterCharacters(String input) {
+                       return input;
+               }
+
+               @Override
+               public void open(MetricConfig config) {
+                       OPENED_REPORTERS.add(this);
+               }
+
+               @Override
+               public void close() {
+                       OPENED_REPORTERS.remove(this);
+               }
+
+               public Map<Gauge<?>, String> getGauges() {
+                       return gauges;
+               }
+       }
+}
diff --git a/pom.xml b/pom.xml
index 434f5779f10..35b57dabfe4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -299,8 +299,14 @@ under the License.
                                <version>1.1.4</version>
                        </dependency>
 
+                       <dependency>
+                               <groupId>com.github.oshi</groupId>
+                               <artifactId>oshi-core</artifactId>
+                               <version>3.4.0</version>
+                       </dependency>
+
                        <!-- Make sure we use a consistent avro version between 
Flink and Hadoop -->            
-                       <dependency>
+                       <dependency>
                                <groupId>org.apache.avro</groupId>
                                <artifactId>avro</artifactId>
                                <version>${avro.version}</version>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Log system resources as metrics
> -------------------------------
>
>                 Key: FLINK-7812
>                 URL: https://issues.apache.org/jira/browse/FLINK-7812
>             Project: Flink
>          Issue Type: New Feature
>          Components: Metrics
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to