[ 
https://issues.apache.org/jira/browse/FLINK-33162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-33162:
----------------------------------
    Description: 
when starting a job with large number of taskmanagers, the jobmanager of the 
job failed to respond to and rest request. when look into the jstack we found 
all the 4 threads are server metrics fetcher.
{code:java}
// code placeholder
"Flink-DispatcherRestEndpoint-thread-4" #91 daemon prio=5 os_prio=0 
tid=0x00007f17e7823000 nid=0x246 waiting for monitor entry [0x00007f178e9fe000] 
  java.lang.Thread.State: BLOCKED (on object monitor)       at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)
 - waiting to lock <0x00000003d5f62638> (a 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)     at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
     at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
 Source) at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)  
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
     at java.lang.Thread.run(Thread.java:748)
   Locked ownable synchronizers:        - <0x00000003ce80d8f0> (a 
java.util.concurrent.ThreadPoolExecutor$Worker)

"Flink-DispatcherRestEndpoint-thread-3" #88 daemon prio=5 os_prio=0 
tid=0x00007f17e88af000 nid=0x243 waiting for monitor entry [0x00007f1790dfe000] 
  java.lang.Thread.State: BLOCKED (on object monitor)       at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)
 - waiting to lock <0x00000003d5f62638> (a 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)     at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
     at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
 Source) at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)  
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
     at java.lang.Thread.run(Thread.java:748)
   Locked ownable synchronizers:        - <0x00000003ce80df88> (a 
java.util.concurrent.ThreadPoolExecutor$Worker) 

"Flink-DispatcherRestEndpoint-thread-2" #79 daemon prio=5 os_prio=0 
tid=0x00007f1793473800 nid=0x23a runnable [0x00007f17922fd000]   
java.lang.Thread.State: RUNNABLE   at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.add(MetricStore.java:216)
   at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:82)
 - locked <0x00000003d5f62638> (a 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)      at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
     at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
 Source) at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)  
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
     at java.lang.Thread.run(Thread.java:748)
   Locked ownable synchronizers:        - <0x00000003ce811120> (a 
java.util.concurrent.ThreadPoolExecutor$Worker)

"Flink-DispatcherRestEndpoint-thread-1" #76 daemon prio=5 os_prio=0 
tid=0x00007f17a56f5000 nid=0x237 waiting for monitor entry [0x00007f1792cfd000] 
  java.lang.Thread.State: BLOCKED (on object monitor)       at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)
 - waiting to lock <0x00000003d5f62638> (a 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)     at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
     at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
 Source) at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)  
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
     at java.lang.Thread.run(Thread.java:748)
   Locked ownable synchronizers:        - <0x00000003ce8115f0> (a 
java.util.concurrent.ThreadPoolExecutor$Worker){code}
 

I suggest to enable a policy to reject unhandlable request executor
{code:java}
// code placeholder

Executors.newScheduledThreadPool( numThreads, new 
ExecutorThreadFactory.Builder() .setThreadPriority(threadPriority) 
.setPoolName("Flink-" + componentName) .build());


final MetricFetcher metricFetcher =
        updateInterval == 0
                ? VoidMetricFetcher.INSTANCE
                : MetricFetcherImpl.fromConfiguration(
                        configuration,
                        metricQueryServiceRetriever,
                        dispatcherGatewayRetriever,
                        executor);

webMonitorEndpoint =
        restEndpointFactory.createRestEndpoint(
                configuration,
                dispatcherGatewayRetriever,
                resourceManagerGatewayRetriever,
                blobServer,
                executor,
                metricFetcher,
                
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
                fatalErrorHandler);
 {code}

  was:
when starting a job with large number of taskmanagers, the jobmanager of the 
job failed to respond to and rest request. when look into the jstack we found 
all the 4 threads are server metrics fetcher.
{code:java}
// code placeholder
"Flink-DispatcherRestEndpoint-thread-4" #91 daemon prio=5 os_prio=0 
tid=0x00007f17e7823000 nid=0x246 waiting for monitor entry [0x00007f178e9fe000] 
  java.lang.Thread.State: BLOCKED (on object monitor)       at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)
 - waiting to lock <0x00000003d5f62638> (a 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)     at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
     at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
 Source) at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)  
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
     at java.lang.Thread.run(Thread.java:748)
   Locked ownable synchronizers:        - <0x00000003ce80d8f0> (a 
java.util.concurrent.ThreadPoolExecutor$Worker)

"Flink-DispatcherRestEndpoint-thread-3" #88 daemon prio=5 os_prio=0 
tid=0x00007f17e88af000 nid=0x243 waiting for monitor entry [0x00007f1790dfe000] 
  java.lang.Thread.State: BLOCKED (on object monitor)       at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)
 - waiting to lock <0x00000003d5f62638> (a 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)     at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
     at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
 Source) at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)  
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
     at java.lang.Thread.run(Thread.java:748)
   Locked ownable synchronizers:        - <0x00000003ce80df88> (a 
java.util.concurrent.ThreadPoolExecutor$Worker) 

"Flink-DispatcherRestEndpoint-thread-2" #79 daemon prio=5 os_prio=0 
tid=0x00007f1793473800 nid=0x23a runnable [0x00007f17922fd000]   
java.lang.Thread.State: RUNNABLE   at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.add(MetricStore.java:216)
   at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:82)
 - locked <0x00000003d5f62638> (a 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)      at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
     at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
 Source) at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)  
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
     at java.lang.Thread.run(Thread.java:748)
   Locked ownable synchronizers:        - <0x00000003ce811120> (a 
java.util.concurrent.ThreadPoolExecutor$Worker)

"Flink-DispatcherRestEndpoint-thread-1" #76 daemon prio=5 os_prio=0 
tid=0x00007f17a56f5000 nid=0x237 waiting for monitor entry [0x00007f1792cfd000] 
  java.lang.Thread.State: BLOCKED (on object monitor)       at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)
 - waiting to lock <0x00000003d5f62638> (a 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)     at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
     at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
 Source) at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)  
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
     at java.lang.Thread.run(Thread.java:748)
   Locked ownable synchronizers:        - <0x00000003ce8115f0> (a 
java.util.concurrent.ThreadPoolExecutor$Worker){code}
 

I suggest to seperate use different executor for MetricFetcher and 
webMonitorEndpoint. to make sure they do not affect each other.

 
{code:java}
// code placeholder
final MetricFetcher metricFetcher =
        updateInterval == 0
                ? VoidMetricFetcher.INSTANCE
                : MetricFetcherImpl.fromConfiguration(
                        configuration,
                        metricQueryServiceRetriever,
                        dispatcherGatewayRetriever,
                        executor);

webMonitorEndpoint =
        restEndpointFactory.createRestEndpoint(
                configuration,
                dispatcherGatewayRetriever,
                resourceManagerGatewayRetriever,
                blobServer,
                executor,
                metricFetcher,
                
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
                fatalErrorHandler);
 {code}


> seperate the executor in DefaultDispatcherResourceManagerComponentFactory for 
> MetricFetcher and webMonitorEndpoint
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33162
>                 URL: https://issues.apache.org/jira/browse/FLINK-33162
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / REST
>    Affects Versions: 1.13.1
>            Reporter: xiaogang zhou
>            Priority: Major
>             Fix For: 1.19.0
>
>
> when starting a job with large number of taskmanagers, the jobmanager of the 
> job failed to respond to and rest request. when look into the jstack we found 
> all the 4 threads are server metrics fetcher.
> {code:java}
> // code placeholder
> "Flink-DispatcherRestEndpoint-thread-4" #91 daemon prio=5 os_prio=0 
> tid=0x00007f17e7823000 nid=0x246 waiting for monitor entry 
> [0x00007f178e9fe000]   java.lang.Thread.State: BLOCKED (on object monitor)    
>  at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)
>  - waiting to lock <0x00000003d5f62638> (a 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)     at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
>      at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
>  Source) at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>    at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>    at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)      
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
>    Locked ownable synchronizers:      - <0x00000003ce80d8f0> (a 
> java.util.concurrent.ThreadPoolExecutor$Worker)
> "Flink-DispatcherRestEndpoint-thread-3" #88 daemon prio=5 os_prio=0 
> tid=0x00007f17e88af000 nid=0x243 waiting for monitor entry 
> [0x00007f1790dfe000]   java.lang.Thread.State: BLOCKED (on object monitor)    
>  at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)
>  - waiting to lock <0x00000003d5f62638> (a 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)     at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
>      at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
>  Source) at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>    at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>    at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)      
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
>    Locked ownable synchronizers:      - <0x00000003ce80df88> (a 
> java.util.concurrent.ThreadPoolExecutor$Worker) 
> "Flink-DispatcherRestEndpoint-thread-2" #79 daemon prio=5 os_prio=0 
> tid=0x00007f1793473800 nid=0x23a runnable [0x00007f17922fd000]   
> java.lang.Thread.State: RUNNABLE at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.add(MetricStore.java:216)
>    at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:82)
>  - locked <0x00000003d5f62638> (a 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)      at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
>      at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
>  Source) at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>    at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>    at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)      
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
>    Locked ownable synchronizers:      - <0x00000003ce811120> (a 
> java.util.concurrent.ThreadPoolExecutor$Worker)
> "Flink-DispatcherRestEndpoint-thread-1" #76 daemon prio=5 os_prio=0 
> tid=0x00007f17a56f5000 nid=0x237 waiting for monitor entry 
> [0x00007f1792cfd000]   java.lang.Thread.State: BLOCKED (on object monitor)    
>  at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)
>  - waiting to lock <0x00000003d5f62638> (a 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)     at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
>      at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
>  Source) at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>    at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>    at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)      
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
>    Locked ownable synchronizers:      - <0x00000003ce8115f0> (a 
> java.util.concurrent.ThreadPoolExecutor$Worker){code}
>  
> I suggest to enable a policy to reject unhandlable request executor
> {code:java}
> // code placeholder
> Executors.newScheduledThreadPool( numThreads, new 
> ExecutorThreadFactory.Builder() .setThreadPriority(threadPriority) 
> .setPoolName("Flink-" + componentName) .build());
> final MetricFetcher metricFetcher =
>         updateInterval == 0
>                 ? VoidMetricFetcher.INSTANCE
>                 : MetricFetcherImpl.fromConfiguration(
>                         configuration,
>                         metricQueryServiceRetriever,
>                         dispatcherGatewayRetriever,
>                         executor);
> webMonitorEndpoint =
>         restEndpointFactory.createRestEndpoint(
>                 configuration,
>                 dispatcherGatewayRetriever,
>                 resourceManagerGatewayRetriever,
>                 blobServer,
>                 executor,
>                 metricFetcher,
>                 
> highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
>                 fatalErrorHandler);
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to