[ 
https://issues.apache.org/jira/browse/FLINK-29134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sitan Pang updated FLINK-29134:
-------------------------------
    Affects Version/s: 1.15.2

> fetch metrics may cause oom(ThreadPool task pile up)
> ----------------------------------------------------
>
>                 Key: FLINK-29134
>                 URL: https://issues.apache.org/jira/browse/FLINK-29134
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Metrics
>    Affects Versions: 1.11.0, 1.15.2
>            Reporter: Sitan Pang
>            Priority: Major
>         Attachments: dump-queueTask.png, dump-threadPool.png
>
>
> When we queryMetrics we use thread pool to process the data which are 
> returned by TMs. 
> {code:java}
> private void queryMetrics(final MetricQueryServiceGateway 
> queryServiceGateway) {
>     LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress());
>     queryServiceGateway
>             .queryMetrics(timeout)
>             .whenCompleteAsync(
>                     (MetricDumpSerialization.MetricSerializationResult 
> result, Throwable t) -> {
>                         if (t != null) {
>                             LOG.debug("Fetching metrics failed.", t);
>                         } else {
>                             metrics.addAll(deserializer.deserialize(result));
>                         }
>                     },
>                     executor);
> } {code}
> The only condition we will fetch metrics is update time is larger than 
> updateInterval
> {code:java}
> public void update() {
>     synchronized (this) {
>         long currentTime = System.currentTimeMillis();
>         if (currentTime - lastUpdateTime > updateInterval) {
>             lastUpdateTime = currentTime;
>             fetchMetrics();
>         }
>     }
> } {code}
> Therefore, if we could not process the data in update-interval-time, metrics 
> data will accumulate.
> Besides, webMonitorEndpoint, restHandlers and metrics share the thread pool. 
> When we open ui, it maybe even worse.
> {code:java}
> final ScheduledExecutorService executor =
>         WebMonitorEndpoint.createExecutorService(
>                 configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
>                 configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
>                 "DispatcherRestEndpoint");
> final long updateInterval =
>         configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
> final MetricFetcher metricFetcher =
>         updateInterval == 0
>                 ? VoidMetricFetcher.INSTANCE
>                 : MetricFetcherImpl.fromConfiguration(
>                         configuration,
>                         metricQueryServiceRetriever,
>                         dispatcherGatewayRetriever,
>                         executor);
> webMonitorEndpoint =
>         restEndpointFactory.createRestEndpoint(
>                 configuration,
>                 dispatcherGatewayRetriever,
>                 resourceManagerGatewayRetriever,
>                 blobServer,
>                 executor,
>                 metricFetcher,
>                 
> highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
>                 fatalErrorHandler); {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to