TisonKun commented on a change in pull request #7927: [FLINK-11603][metrics] 
Port the MetricQueryService to the new RpcEndpoint
URL: https://github.com/apache/flink/pull/7927#discussion_r269523708
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 ##########
 @@ -73,62 +71,48 @@ public String filterCharacters(String input) {
 
        private final long messageSizeLimit;
 
-       public MetricQueryService(long messageSizeLimit) {
+       public MetricQueryService(RpcService rpcService, String endpointId, 
long messageSizeLimit) {
+               super(rpcService, endpointId);
                this.messageSizeLimit = messageSizeLimit;
        }
 
        @Override
-       public void postStop() {
+       public CompletableFuture<Void> onStop() {
                serializer.close();
+               return CompletableFuture.completedFuture(null);
        }
 
-       @Override
-       public void onReceive(Object message) {
-               try {
-                       if (message instanceof AddMetric) {
-                               AddMetric added = (AddMetric) message;
-
-                               String metricName = added.metricName;
-                               Metric metric = added.metric;
-                               AbstractMetricGroup group = added.group;
-
-                               QueryScopeInfo info = 
group.getQueryServiceMetricInfo(FILTER);
-
-                               if (metric instanceof Counter) {
-                                       counters.put((Counter) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
-                               } else if (metric instanceof Gauge) {
-                                       gauges.put((Gauge<?>) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
-                               } else if (metric instanceof Histogram) {
-                                       histograms.put((Histogram) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
-                               } else if (metric instanceof Meter) {
-                                       meters.put((Meter) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
-                               }
-                       } else if (message instanceof RemoveMetric) {
-                               Metric metric = (((RemoveMetric) 
message).metric);
-                               if (metric instanceof Counter) {
-                                       this.counters.remove(metric);
-                               } else if (metric instanceof Gauge) {
-                                       this.gauges.remove(metric);
-                               } else if (metric instanceof Histogram) {
-                                       this.histograms.remove(metric);
-                               } else if (metric instanceof Meter) {
-                                       this.meters.remove(metric);
-                               }
-                       } else if (message instanceof CreateDump) {
-                               
MetricDumpSerialization.MetricSerializationResult dump = 
serializer.serialize(counters, gauges, histograms, meters);
-
-                               dump = enforceSizeLimit(dump);
-
-                               getSender().tell(dump, getSelf());
-                       } else {
-                               LOG.warn("MetricQueryServiceActor received an 
invalid message. " + message.toString());
-                               getSender().tell(new Status.Failure(new 
IOException("MetricQueryServiceActor received an invalid message. " + 
message.toString())), getSelf());
-                       }
-               } catch (Exception e) {
-                       LOG.warn("An exception occurred while processing a 
message.", e);
+       public void addMetric(String metricName, Metric metric, 
AbstractMetricGroup group) {
+               QueryScopeInfo info = group.getQueryServiceMetricInfo(FILTER);
+
+               if (metric instanceof Counter) {
+                       counters.put((Counter) metric, new Tuple2<>(info, 
FILTER.filterCharacters(metricName)));
+               } else if (metric instanceof Gauge) {
+                       gauges.put((Gauge<?>) metric, new Tuple2<>(info, 
FILTER.filterCharacters(metricName)));
+               } else if (metric instanceof Histogram) {
+                       histograms.put((Histogram) metric, new Tuple2<>(info, 
FILTER.filterCharacters(metricName)));
+               } else if (metric instanceof Meter) {
+                       meters.put((Meter) metric, new Tuple2<>(info, 
FILTER.filterCharacters(metricName)));
                }
        }
 
+       public void removeMetric(Metric metric) {
 
 Review comment:
   If we ensure calling these methods as RPCs, we should always run it in main 
thread and thus be linearlized. Anyway use `runAsync` add lite burden but we 
can confirm nothing wired should be taken care of.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to