[ https://issues.apache.org/jira/browse/FLINK-7368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16216974#comment-16216974 ]
ASF GitHub Bot commented on FLINK-7368: --------------------------------------- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4840#discussion_r146574784 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java --- @@ -38,29 +43,136 @@ import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR; import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK; import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * Nested data-structure to store metrics. - * - * <p>This structure is not thread-safe. */ +@ThreadSafe public class MetricStore { private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class); - final JobManagerMetricStore jobManager = new JobManagerMetricStore(); - final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>(); - final Map<String, JobMetricStore> jobs = new HashMap<>(); + private final ComponentMetricStore jobManager = new ComponentMetricStore(); + private final Map<String, TaskManagerMetricStore> taskManagers = new ConcurrentHashMap<>(); + private final Map<String, JobMetricStore> jobs = new ConcurrentHashMap<>(); + + /** + * Remove not active task managers. + * + * @param activeTaskManagers to retain. + */ + public synchronized void retainTaskManagers(List<String> activeTaskManagers) { --- End diff -- 1. performance here is not that big a deal, but code correctness is. With `synchronized` it is just easier to implement any changes here in a thread safe manner. Without it, any new developer coming here will have to understand much more assumptions about this code, like whether consistency matters here or nor? whether order of the operations/access to the fields is important or nor? etc... 2. even with `synchronized` we still need either concurrent hash maps, because we return and make them visible to the outside world by getters. So we either need concurrent hash maps or return copies of them. > MetricStore makes cpu spin at 100% > ---------------------------------- > > Key: FLINK-7368 > URL: https://issues.apache.org/jira/browse/FLINK-7368 > Project: Flink > Issue Type: Bug > Components: Metrics > Reporter: Nico Chen > Assignee: Piotr Nowojski > Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: MyHashMap.java, MyHashMapInfiniteLoopTest.java, > jm-jstack.log > > > Flink's `MetricStore` is not thread-safe. multi-treads may acess java' > hashmap inside `MetricStore` and can tirgger hashmap's infinte loop. > Recently I met the case that flink jobmanager consumed 100% cpu. A part of > stacktrace is shown below. The full jstack is in the attachment. > {code:java} > "ForkJoinPool-1-worker-19" daemon prio=10 tid=0x00007fbdacac9800 nid=0x64c1 > runnable [0x00007fbd7d1c2000] > java.lang.Thread.State: RUNNABLE > at java.util.HashMap.put(HashMap.java:494) > at > org.apache.flink.runtime.webmonitor.metrics.MetricStore.addMetric(MetricStore.java:176) > at > org.apache.flink.runtime.webmonitor.metrics.MetricStore.add(MetricStore.java:121) > at > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:198) > at > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58) > at > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188) > at akka.dispatch.OnSuccess.internal(Future.scala:212) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) > at > scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117) > at > scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334) > at > java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604) > at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784) > at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398) > {code} > There are 24 threads show same stacktrace as above to indicate they are > spining at HashMap.put(HashMap.java:494) (I am using Java 1.7.0_6). Many > posts indicate multi-threads accessing hashmap cause this problem and I > reproduce the case as well. The test code is attached. I only modify the > HashMap.transfer() by adding concurrent barriers for different treads in > order to simulate the timing of creation of cycles in hashmap's Entry. My > program's stacktrace shows it hangs at same line of > HashMap(HashMap.put(HashMap.java:494)) as the stacktrace I post above. > Even through `MetricFetcher` has a 10 seconds minimum inteverl between each > metrics qurey, it still cannot guarntee query responses do not acess > `MtricStore`'s hashmap concurrently. Thus I think it's a bug to fix. > -- This message was sent by Atlassian JIRA (v6.4.14#64029)