[ https://issues.apache.org/jira/browse/FLINK-7381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16116742#comment-16116742 ]
ASF GitHub Bot commented on FLINK-7381: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4492#discussion_r131675749 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java --- @@ -101,127 +98,112 @@ public void update() { private void fetchMetrics() { try { - Option<scala.Tuple2<ActorGateway, Integer>> jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort(); - if (jobManagerGatewayAndWebPort.isDefined()) { - ActorGateway jobManager = jobManagerGatewayAndWebPort.get()._1(); + Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow(); + if (optJobManagerGateway.isPresent()) { + final JobManagerGateway jobManagerGateway = optJobManagerGateway.get(); /** * Remove all metrics that belong to a job that is not running and no longer archived. */ - Future<Object> jobDetailsFuture = jobManager.ask(new RequestJobDetails(true, true), timeout); - jobDetailsFuture - .onSuccess(new OnSuccess<Object>() { - @Override - public void onSuccess(Object result) throws Throwable { - MultipleJobsDetails details = (MultipleJobsDetails) result; + CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout); + + jobDetailsFuture.whenCompleteAsync( + (MultipleJobsDetails jobDetails, Throwable throwable) -> { + if (throwable != null) { + LOG.debug("Fetching of JobDetails failed.", throwable); + } else { ArrayList<String> toRetain = new ArrayList<>(); - for (JobDetails job : details.getRunningJobs()) { + for (JobDetails job : jobDetails.getRunningJobs()) { toRetain.add(job.getJobId().toString()); } - for (JobDetails job : details.getFinishedJobs()) { + for (JobDetails job : jobDetails.getFinishedJobs()) { toRetain.add(job.getJobId().toString()); } synchronized (metrics) { metrics.jobs.keySet().retainAll(toRetain); } } - }, ctx); - logErrorOnFailure(jobDetailsFuture, "Fetching of JobDetails failed."); + }, + executor); - String jobManagerPath = jobManager.path(); - String queryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME; - ActorRef jobManagerQueryService = actorSystem.actorFor(queryServicePath); + String jobManagerPath = jobManagerGateway.getAddress(); + String jmQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME; - queryMetrics(jobManagerQueryService); + retrieveAndQueryMetrics(jmQueryServicePath); /** * We first request the list of all registered task managers from the job manager, and then * request the respective metric dump from each task manager. * * <p>All stored metrics that do not belong to a registered task manager will be removed. */ - Future<Object> registeredTaskManagersFuture = jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout); - registeredTaskManagersFuture - .onSuccess(new OnSuccess<Object>() { - @Override - public void onSuccess(Object result) throws Throwable { - Iterable<Instance> taskManagers = ((JobManagerMessages.RegisteredTaskManagers) result).asJavaIterable(); - List<String> activeTaskManagers = new ArrayList<>(); - for (Instance taskManager : taskManagers) { - activeTaskManagers.add(taskManager.getId().toString()); - - String taskManagerPath = taskManager.getTaskManagerGateway().getAddress(); - String queryServicePath = taskManagerPath.substring(0, taskManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManager.getTaskManagerID().getResourceIdString(); - ActorRef taskManagerQueryService = actorSystem.actorFor(queryServicePath); - - queryMetrics(taskManagerQueryService); - } - synchronized (metrics) { // remove all metrics belonging to unregistered task managers + CompletableFuture<Collection<Instance>> taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout); + + taskManagersFuture.whenCompleteAsync( + (Collection<Instance> taskManagers, Throwable throwable) -> { + if (throwable != null) { + LOG.debug("Fetching list of registered TaskManagers failed.", throwable); + } else { + List<String> activeTaskManagers = taskManagers.stream().map( + taskManagerInstance -> { + final String taskManagerAddress = taskManagerInstance.getTaskManagerGateway().getAddress(); + final String tmQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManagerInstance.getTaskManagerID().getResourceIdString(); + + retrieveAndQueryMetrics(tmQueryServicePath); + + return taskManagerInstance.getId().toString(); + }).collect(Collectors.toList()); + + synchronized (metrics) { metrics.taskManagers.keySet().retainAll(activeTaskManagers); } } - }, ctx); - logErrorOnFailure(registeredTaskManagersFuture, "Fetchin list of registered TaskManagers failed."); + }, + executor); } } catch (Exception e) { LOG.warn("Exception while fetching metrics.", e); } } - private void logErrorOnFailure(Future<Object> future, final String message) { - future.onFailure(new OnFailure() { - @Override - public void onFailure(Throwable failure) throws Throwable { - LOG.debug(message, failure); - } - }, ctx); - } - /** - * Requests a metric dump from the given actor. + * Retrieves and queries the specified QueryServiceGateway. * - * @param actor ActorRef to request the dump from - */ - private void queryMetrics(ActorRef actor) { - Future<Object> metricQueryFuture = new BasicGateway(actor).ask(MetricQueryService.getCreateDump(), timeout); - metricQueryFuture - .onSuccess(new OnSuccess<Object>() { - @Override - public void onSuccess(Object result) throws Throwable { - addMetrics(result); + * @param queryServicePath specifying the QueryServiceGateway + */ + private void retrieveAndQueryMetrics(String queryServicePath) { + final CompletableFuture<MetricQueryServiceGateway> jmQueryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath); --- End diff -- variable name is misleading, as we could also be retrieving a taskmanager query service. > Decouple WebRuntimeMonitor from ActorGateway > -------------------------------------------- > > Key: FLINK-7381 > URL: https://issues.apache.org/jira/browse/FLINK-7381 > Project: Flink > Issue Type: Sub-task > Components: Webfrontend > Affects Versions: 1.4.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Labels: flip-6 > > The {{WebRuntimeMonitor}} has a hard wired dependency on the {{ActorGateway}} > in order to communicate with the {{JobManager}}. In order to make it work > with the {{JobMaster}} (Flip-6), we have to abstract this dependency away. I > propose to add a {{JobManagerGateway}} interface which can be implemented > using Akka for the old {{JobManager}} code. The Flip-6 {{JobMasterGateway}} > can then directly inherit from this interface. -- This message was sent by Atlassian JIRA (v6.4.14#64029)