Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4492#discussion_r132142385 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java --- @@ -101,127 +98,112 @@ public void update() { private void fetchMetrics() { try { - Option<scala.Tuple2<ActorGateway, Integer>> jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort(); - if (jobManagerGatewayAndWebPort.isDefined()) { - ActorGateway jobManager = jobManagerGatewayAndWebPort.get()._1(); + Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow(); + if (optJobManagerGateway.isPresent()) { + final JobManagerGateway jobManagerGateway = optJobManagerGateway.get(); /** * Remove all metrics that belong to a job that is not running and no longer archived. */ - Future<Object> jobDetailsFuture = jobManager.ask(new RequestJobDetails(true, true), timeout); - jobDetailsFuture - .onSuccess(new OnSuccess<Object>() { - @Override - public void onSuccess(Object result) throws Throwable { - MultipleJobsDetails details = (MultipleJobsDetails) result; + CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout); + + jobDetailsFuture.whenCompleteAsync( + (MultipleJobsDetails jobDetails, Throwable throwable) -> { + if (throwable != null) { + LOG.debug("Fetching of JobDetails failed.", throwable); + } else { ArrayList<String> toRetain = new ArrayList<>(); - for (JobDetails job : details.getRunningJobs()) { + for (JobDetails job : jobDetails.getRunningJobs()) { toRetain.add(job.getJobId().toString()); } - for (JobDetails job : details.getFinishedJobs()) { + for (JobDetails job : jobDetails.getFinishedJobs()) { toRetain.add(job.getJobId().toString()); } synchronized (metrics) { metrics.jobs.keySet().retainAll(toRetain); } } - }, ctx); - logErrorOnFailure(jobDetailsFuture, "Fetching of JobDetails failed."); + }, + executor); - String jobManagerPath = jobManager.path(); - String queryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME; - ActorRef jobManagerQueryService = actorSystem.actorFor(queryServicePath); + String jobManagerPath = jobManagerGateway.getAddress(); + String jmQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME; - queryMetrics(jobManagerQueryService); + retrieveAndQueryMetrics(jmQueryServicePath); /** * We first request the list of all registered task managers from the job manager, and then * request the respective metric dump from each task manager. * * <p>All stored metrics that do not belong to a registered task manager will be removed. */ - Future<Object> registeredTaskManagersFuture = jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout); - registeredTaskManagersFuture - .onSuccess(new OnSuccess<Object>() { - @Override - public void onSuccess(Object result) throws Throwable { - Iterable<Instance> taskManagers = ((JobManagerMessages.RegisteredTaskManagers) result).asJavaIterable(); - List<String> activeTaskManagers = new ArrayList<>(); - for (Instance taskManager : taskManagers) { - activeTaskManagers.add(taskManager.getId().toString()); - - String taskManagerPath = taskManager.getTaskManagerGateway().getAddress(); - String queryServicePath = taskManagerPath.substring(0, taskManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManager.getTaskManagerID().getResourceIdString(); - ActorRef taskManagerQueryService = actorSystem.actorFor(queryServicePath); - - queryMetrics(taskManagerQueryService); - } - synchronized (metrics) { // remove all metrics belonging to unregistered task managers + CompletableFuture<Collection<Instance>> taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout); + + taskManagersFuture.whenCompleteAsync( + (Collection<Instance> taskManagers, Throwable throwable) -> { + if (throwable != null) { + LOG.debug("Fetching list of registered TaskManagers failed.", throwable); + } else { + List<String> activeTaskManagers = taskManagers.stream().map( + taskManagerInstance -> { + final String taskManagerAddress = taskManagerInstance.getTaskManagerGateway().getAddress(); + final String tmQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManagerInstance.getTaskManagerID().getResourceIdString(); + + retrieveAndQueryMetrics(tmQueryServicePath); + + return taskManagerInstance.getId().toString(); + }).collect(Collectors.toList()); + + synchronized (metrics) { metrics.taskManagers.keySet().retainAll(activeTaskManagers); } } - }, ctx); - logErrorOnFailure(registeredTaskManagersFuture, "Fetchin list of registered TaskManagers failed."); + }, + executor); } } catch (Exception e) { LOG.warn("Exception while fetching metrics.", e); } } - private void logErrorOnFailure(Future<Object> future, final String message) { - future.onFailure(new OnFailure() { - @Override - public void onFailure(Throwable failure) throws Throwable { - LOG.debug(message, failure); - } - }, ctx); - } - /** - * Requests a metric dump from the given actor. + * Retrieves and queries the specified QueryServiceGateway. * - * @param actor ActorRef to request the dump from - */ - private void queryMetrics(ActorRef actor) { - Future<Object> metricQueryFuture = new BasicGateway(actor).ask(MetricQueryService.getCreateDump(), timeout); - metricQueryFuture - .onSuccess(new OnSuccess<Object>() { - @Override - public void onSuccess(Object result) throws Throwable { - addMetrics(result); + * @param queryServicePath specifying the QueryServiceGateway + */ + private void retrieveAndQueryMetrics(String queryServicePath) { + final CompletableFuture<MetricQueryServiceGateway> jmQueryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath); + + jmQueryServiceGatewayFuture.whenCompleteAsync( + (MetricQueryServiceGateway queryServiceGateway, Throwable t) -> { + if (t != null) { + LOG.debug("Could not retrieve QueryServiceGateway.", t); + } else { + queryMetrics(queryServiceGateway); } - }, ctx); - logErrorOnFailure(metricQueryFuture, "Fetching metrics failed."); - } - - private void addMetrics(Object result) { - MetricDumpSerialization.MetricSerializationResult data = (MetricDumpSerialization.MetricSerializationResult) result; - List<MetricDump> dumpedMetrics = deserializer.deserialize(data); - for (MetricDump metric : dumpedMetrics) { - metrics.add(metric); - } + }, + executor); } /** - * Helper class that allows mocking of the answer. - */ - static class BasicGateway { - private final ActorRef actor; - - private BasicGateway(ActorRef actor) { - this.actor = actor; - } - - /** - * Sends a message asynchronously and returns its response. The response to the message is - * returned as a future. - * - * @param message Message to be sent - * @param timeout Timeout until the Future is completed with an AskTimeoutException - * @return Future which contains the response to the sent message - */ - public Future<Object> ask(Object message, FiniteDuration timeout) { - return Patterns.ask(actor, message, new Timeout(timeout)); - } + * Query the metrics from the given QueryServiceGateway. + * + * @param queryServiceGateway to query for metrics + */ + private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) { + queryServiceGateway + .queryMetrics(timeout) + .whenCompleteAsync( + (MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> { + if (t != null) { + LOG.debug("Fetching metrics failed.", t); + } else { + List<MetricDump> dumpedMetrics = deserializer.deserialize(result); + for (MetricDump metric : dumpedMetrics) { --- End diff -- Is this really how FLINK-7368 will be solved? I thought there is still some discussion ongoing.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---