Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4492#discussion_r132142385
  
    --- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
 ---
    @@ -101,127 +98,112 @@ public void update() {
     
        private void fetchMetrics() {
                try {
    -                   Option<scala.Tuple2<ActorGateway, Integer>> 
jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort();
    -                   if (jobManagerGatewayAndWebPort.isDefined()) {
    -                           ActorGateway jobManager = 
jobManagerGatewayAndWebPort.get()._1();
    +                   Optional<JobManagerGateway> optJobManagerGateway = 
retriever.getJobManagerGatewayNow();
    +                   if (optJobManagerGateway.isPresent()) {
    +                           final JobManagerGateway jobManagerGateway = 
optJobManagerGateway.get();
     
                                /**
                                 * Remove all metrics that belong to a job that 
is not running and no longer archived.
                                 */
    -                           Future<Object> jobDetailsFuture = 
jobManager.ask(new RequestJobDetails(true, true), timeout);
    -                           jobDetailsFuture
    -                                   .onSuccess(new OnSuccess<Object>() {
    -                                           @Override
    -                                           public void onSuccess(Object 
result) throws Throwable {
    -                                                   MultipleJobsDetails 
details = (MultipleJobsDetails) result;
    +                           CompletableFuture<MultipleJobsDetails> 
jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout);
    +
    +                           jobDetailsFuture.whenCompleteAsync(
    +                                   (MultipleJobsDetails jobDetails, 
Throwable throwable) -> {
    +                                           if (throwable != null) {
    +                                                   LOG.debug("Fetching of 
JobDetails failed.", throwable);
    +                                           } else {
                                                        ArrayList<String> 
toRetain = new ArrayList<>();
    -                                                   for (JobDetails job : 
details.getRunningJobs()) {
    +                                                   for (JobDetails job : 
jobDetails.getRunningJobs()) {
                                                                
toRetain.add(job.getJobId().toString());
                                                        }
    -                                                   for (JobDetails job : 
details.getFinishedJobs()) {
    +                                                   for (JobDetails job : 
jobDetails.getFinishedJobs()) {
                                                                
toRetain.add(job.getJobId().toString());
                                                        }
                                                        synchronized (metrics) {
                                                                
metrics.jobs.keySet().retainAll(toRetain);
                                                        }
                                                }
    -                                   }, ctx);
    -                           logErrorOnFailure(jobDetailsFuture, "Fetching 
of JobDetails failed.");
    +                                   },
    +                                   executor);
     
    -                           String jobManagerPath = jobManager.path();
    -                           String queryServicePath = 
jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME;
    -                           ActorRef jobManagerQueryService = 
actorSystem.actorFor(queryServicePath);
    +                           String jobManagerPath = 
jobManagerGateway.getAddress();
    +                           String jmQueryServicePath = 
jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME;
     
    -                           queryMetrics(jobManagerQueryService);
    +                           retrieveAndQueryMetrics(jmQueryServicePath);
     
                                /**
                                 * We first request the list of all registered 
task managers from the job manager, and then
                                 * request the respective metric dump from each 
task manager.
                                 *
                                 * <p>All stored metrics that do not belong to 
a registered task manager will be removed.
                                 */
    -                           Future<Object> registeredTaskManagersFuture = 
jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout);
    -                           registeredTaskManagersFuture
    -                                   .onSuccess(new OnSuccess<Object>() {
    -                                           @Override
    -                                           public void onSuccess(Object 
result) throws Throwable {
    -                                                   Iterable<Instance> 
taskManagers = ((JobManagerMessages.RegisteredTaskManagers) 
result).asJavaIterable();
    -                                                   List<String> 
activeTaskManagers = new ArrayList<>();
    -                                                   for (Instance 
taskManager : taskManagers) {
    -                                                           
activeTaskManagers.add(taskManager.getId().toString());
    -
    -                                                           String 
taskManagerPath = taskManager.getTaskManagerGateway().getAddress();
    -                                                           String 
queryServicePath = taskManagerPath.substring(0, 
taskManagerPath.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + 
taskManager.getTaskManagerID().getResourceIdString();
    -                                                           ActorRef 
taskManagerQueryService = actorSystem.actorFor(queryServicePath);
    -
    -                                                           
queryMetrics(taskManagerQueryService);
    -                                                   }
    -                                                   synchronized (metrics) 
{ // remove all metrics belonging to unregistered task managers
    +                           CompletableFuture<Collection<Instance>> 
taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
    +
    +                           taskManagersFuture.whenCompleteAsync(
    +                                   (Collection<Instance> taskManagers, 
Throwable throwable) -> {
    +                                           if (throwable != null) {
    +                                                   LOG.debug("Fetching 
list of registered TaskManagers failed.", throwable);
    +                                           } else {
    +                                                   List<String> 
activeTaskManagers = taskManagers.stream().map(
    +                                                           
taskManagerInstance -> {
    +                                                                   final 
String taskManagerAddress = 
taskManagerInstance.getTaskManagerGateway().getAddress();
    +                                                                   final 
String tmQueryServicePath = taskManagerAddress.substring(0, 
taskManagerAddress.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + 
taskManagerInstance.getTaskManagerID().getResourceIdString();
    +
    +                                                                   
retrieveAndQueryMetrics(tmQueryServicePath);
    +
    +                                                                   return 
taskManagerInstance.getId().toString();
    +                                                           
}).collect(Collectors.toList());
    +
    +                                                   synchronized (metrics) {
                                                                
metrics.taskManagers.keySet().retainAll(activeTaskManagers);
                                                        }
                                                }
    -                                   }, ctx);
    -                           logErrorOnFailure(registeredTaskManagersFuture, 
"Fetchin list of registered TaskManagers failed.");
    +                                   },
    +                                   executor);
                        }
                } catch (Exception e) {
                        LOG.warn("Exception while fetching metrics.", e);
                }
        }
     
    -   private void logErrorOnFailure(Future<Object> future, final String 
message) {
    -           future.onFailure(new OnFailure() {
    -                   @Override
    -                   public void onFailure(Throwable failure) throws 
Throwable {
    -                           LOG.debug(message, failure);
    -                   }
    -           }, ctx);
    -   }
    -
        /**
    -    * Requests a metric dump from the given actor.
    +    * Retrieves and queries the specified QueryServiceGateway.
         *
    -    * @param actor ActorRef to request the dump from
    -     */
    -   private void queryMetrics(ActorRef actor) {
    -           Future<Object> metricQueryFuture = new 
BasicGateway(actor).ask(MetricQueryService.getCreateDump(), timeout);
    -           metricQueryFuture
    -                   .onSuccess(new OnSuccess<Object>() {
    -                           @Override
    -                           public void onSuccess(Object result) throws 
Throwable {
    -                                   addMetrics(result);
    +    * @param queryServicePath specifying the QueryServiceGateway
    +    */
    +   private void retrieveAndQueryMetrics(String queryServicePath) {
    +           final CompletableFuture<MetricQueryServiceGateway> 
jmQueryServiceGatewayFuture = 
queryServiceRetriever.retrieveService(queryServicePath);
    +
    +           jmQueryServiceGatewayFuture.whenCompleteAsync(
    +                   (MetricQueryServiceGateway queryServiceGateway, 
Throwable t) -> {
    +                           if (t != null) {
    +                                   LOG.debug("Could not retrieve 
QueryServiceGateway.", t);
    +                           } else {
    +                                   queryMetrics(queryServiceGateway);
                                }
    -                   }, ctx);
    -           logErrorOnFailure(metricQueryFuture, "Fetching metrics 
failed.");
    -   }
    -
    -   private void addMetrics(Object result) {
    -           MetricDumpSerialization.MetricSerializationResult data = 
(MetricDumpSerialization.MetricSerializationResult) result;
    -           List<MetricDump> dumpedMetrics = deserializer.deserialize(data);
    -           for (MetricDump metric : dumpedMetrics) {
    -                   metrics.add(metric);
    -           }
    +                   },
    +                   executor);
        }
     
        /**
    -    * Helper class that allows mocking of the answer.
    -     */
    -   static class BasicGateway {
    -           private final ActorRef actor;
    -
    -           private BasicGateway(ActorRef actor) {
    -                   this.actor = actor;
    -           }
    -
    -           /**
    -            * Sends a message asynchronously and returns its response. The 
response to the message is
    -            * returned as a future.
    -            *
    -            * @param message Message to be sent
    -            * @param timeout Timeout until the Future is completed with an 
AskTimeoutException
    -            * @return Future which contains the response to the sent 
message
    -            */
    -           public Future<Object> ask(Object message, FiniteDuration 
timeout) {
    -                   return Patterns.ask(actor, message, new 
Timeout(timeout));
    -           }
    +    * Query the metrics from the given QueryServiceGateway.
    +    *
    +    * @param queryServiceGateway to query for metrics
    +    */
    +   private void queryMetrics(final MetricQueryServiceGateway 
queryServiceGateway) {
    +           queryServiceGateway
    +                   .queryMetrics(timeout)
    +                   .whenCompleteAsync(
    +                           
(MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
    +                                   if (t != null) {
    +                                           LOG.debug("Fetching metrics 
failed.", t);
    +                                   } else {
    +                                           List<MetricDump> dumpedMetrics 
= deserializer.deserialize(result);
    +                                           for (MetricDump metric : 
dumpedMetrics) {
    --- End diff --
    
    Is this really how FLINK-7368 will be solved? I thought there is still some 
discussion ongoing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to