
ASF GitHub Bot commented on FLINK-7368:

Github user zentol commented on a diff in the pull request:

    --- Diff: 
    @@ -181,74 +265,23 @@ private void addMetric(Map<String, String> target, 
String name, MetricDump metri
    -   // Accessors for sub MetricStores
    +   // sub MetricStore classes
    -    * Returns the {@link JobManagerMetricStore}.
    -    *
    -    * @return JobManagerMetricStore
    -    */
    -   public JobManagerMetricStore getJobManagerMetricStore() {
    -           return jobManager;
    -   }
    -   /**
    -    * Returns the {@link TaskManagerMetricStore} for the given taskmanager 
    -    *
    -    * @param tmID taskmanager ID
    -    * @return TaskManagerMetricStore for the given ID, or null if no store 
for the given argument exists
    +    * Structure containing metrics of a single component.
    -   public TaskManagerMetricStore getTaskManagerMetricStore(String tmID) {
    -           return taskManagers.get(tmID);
    -   }
    +   @ThreadSafe
    +   public static class ComponentMetricStore {
    +           public final Map<String, String> metrics;
    -   /**
    -    * Returns the {@link JobMetricStore} for the given job ID.
    -    *
    -    * @param jobID job ID
    -    * @return JobMetricStore for the given ID, or null if no store for the 
given argument exists
    -    */
    -   public JobMetricStore getJobMetricStore(String jobID) {
    -           return jobs.get(jobID);
    -   }
    -   /**
    -    * Returns the {@link TaskMetricStore} for the given job/task ID.
    -    *
    -    * @param jobID  job ID
    -    * @param taskID task ID
    -    * @return TaskMetricStore for given IDs, or null if no store for the 
given arguments exists
    -    */
    -   public TaskMetricStore getTaskMetricStore(String jobID, String taskID) {
    -           JobMetricStore job = getJobMetricStore(jobID);
    -           if (job == null) {
    -                   return null;
    +           public ComponentMetricStore() {
    --- End diff --
    all constructors can be private

> MetricStore makes cpu spin at 100%
> ----------------------------------
>                 Key: FLINK-7368
>                 URL: https://issues.apache.org/jira/browse/FLINK-7368
>             Project: Flink
>          Issue Type: Bug
>          Components: Metrics
>            Reporter: Nico Chen
>            Assignee: Piotr Nowojski
>            Priority: Blocker
>             Fix For: 1.4.0, 1.3.3
>         Attachments: MyHashMap.java, MyHashMapInfiniteLoopTest.java, 
> jm-jstack.log
> Flink's `MetricStore` is not thread-safe. multi-treads may acess java' 
> hashmap inside `MetricStore` and can tirgger hashmap's infinte loop. 
> Recently I met the case that flink jobmanager consumed 100% cpu. A part of 
> stacktrace is shown below. The full jstack is in the attachment.
> {code:java}
> "ForkJoinPool-1-worker-19" daemon prio=10 tid=0x00007fbdacac9800 nid=0x64c1 
> runnable [0x00007fbd7d1c2000]
>    java.lang.Thread.State: RUNNABLE
>         at java.util.HashMap.put(HashMap.java:494)
>         at 
> org.apache.flink.runtime.webmonitor.metrics.MetricStore.addMetric(MetricStore.java:176)
>         at 
> org.apache.flink.runtime.webmonitor.metrics.MetricStore.add(MetricStore.java:121)
>         at 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:198)
>         at 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58)
>         at 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188)
>         at akka.dispatch.OnSuccess.internal(Future.scala:212)
>         at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>         at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>         at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>         at 
> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
>         at 
> scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117)
>         at 
> scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115)
>         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>         at 
> java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265)
>         at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334)
>         at 
> java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604)
>         at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784)
>         at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646)
>         at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398)
> {code}
> There are 24 threads show same stacktrace as above to indicate they are 
> spining at HashMap.put(HashMap.java:494) (I am using Java 1.7.0_6). Many 
> posts indicate multi-threads accessing hashmap cause this problem and I 
> reproduce the case as well. The test code is attached. I only modify the 
> HashMap.transfer() by adding concurrent barriers for different treads in 
> order to simulate the timing of creation of cycles in hashmap's Entry.  My 
> program's stacktrace shows it hangs at same line of 
> HashMap(HashMap.put(HashMap.java:494)) as the stacktrace I post above.
>  Even through `MetricFetcher` has a 10 seconds minimum inteverl between each 
> metrics qurey, it still cannot guarntee query responses do not acess 
> `MtricStore`'s hashmap concurrently.  Thus I think it's a bug to fix.

This message was sent by Atlassian JIRA

Reply via email to