[ https://issues.apache.org/jira/browse/FLINK-29134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sitan Pang updated FLINK-29134: ------------------------------- Affects Version/s: 1.15.2 > fetch metrics may cause oom(ThreadPool task pile up) > ---------------------------------------------------- > > Key: FLINK-29134 > URL: https://issues.apache.org/jira/browse/FLINK-29134 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics > Affects Versions: 1.11.0, 1.15.2 > Reporter: Sitan Pang > Priority: Major > Attachments: dump-queueTask.png, dump-threadPool.png > > > When we queryMetrics we use thread pool to process the data which are > returned by TMs. > {code:java} > private void queryMetrics(final MetricQueryServiceGateway > queryServiceGateway) { > LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress()); > queryServiceGateway > .queryMetrics(timeout) > .whenCompleteAsync( > (MetricDumpSerialization.MetricSerializationResult > result, Throwable t) -> { > if (t != null) { > LOG.debug("Fetching metrics failed.", t); > } else { > metrics.addAll(deserializer.deserialize(result)); > } > }, > executor); > } {code} > The only condition we will fetch metrics is update time is larger than > updateInterval > {code:java} > public void update() { > synchronized (this) { > long currentTime = System.currentTimeMillis(); > if (currentTime - lastUpdateTime > updateInterval) { > lastUpdateTime = currentTime; > fetchMetrics(); > } > } > } {code} > Therefore, if we could not process the data in update-interval-time, metrics > data will accumulate. > Besides, webMonitorEndpoint, restHandlers and metrics share the thread pool. > When we open ui, it maybe even worse. > {code:java} > final ScheduledExecutorService executor = > WebMonitorEndpoint.createExecutorService( > configuration.getInteger(RestOptions.SERVER_NUM_THREADS), > configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), > "DispatcherRestEndpoint"); > final long updateInterval = > configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL); > final MetricFetcher metricFetcher = > updateInterval == 0 > ? VoidMetricFetcher.INSTANCE > : MetricFetcherImpl.fromConfiguration( > configuration, > metricQueryServiceRetriever, > dispatcherGatewayRetriever, > executor); > webMonitorEndpoint = > restEndpointFactory.createRestEndpoint( > configuration, > dispatcherGatewayRetriever, > resourceManagerGatewayRetriever, > blobServer, > executor, > metricFetcher, > > highAvailabilityServices.getClusterRestEndpointLeaderElectionService(), > fatalErrorHandler); {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)