GreatEugenius commented on code in PR #74: URL: https://github.com/apache/flink-agents/pull/74#discussion_r2224287483
########## python/flink_agents/runtime/flink_metric_group.py: ########## @@ -0,0 +1,160 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from typing import Any + +from typing_extensions import override + +from flink_agents.api.metric_group import Counter, Gauge, Histogram, Meter, MetricGroup + + +class FlinkMetricGroup(MetricGroup): + """Implementation of MetricGroup for flink execution environment.""" + def __init__(self, j_metric_group: Any) -> None: + """Initialize a flink runner context with the given java runner context. + + Parameters + ---------- + j_runner_context : Any + Java runner context used to synchronize data between Python and Java. + """ + self._j_metric_group = j_metric_group + + @override + def get_sub_group(self, name: str) -> "FlinkMetricGroup": + return FlinkMetricGroup(self._j_metric_group.getSubGroup(name)) + + @override + def get_counter(self, name: str) -> "FlinkCounter": + return FlinkCounter(self._j_metric_group.getCounter(name)) + + @override + def get_meter(self, name: str) -> "FlinkMeter": + return FlinkMeter(self._j_metric_group.getMeter(name)) + + @override + def get_histogram(self, name: str, window_size: int =100) -> "FlinkHistogram": + return FlinkHistogram(self._j_metric_group.getHistogram(name, window_size)) + + @override + def get_gauge(self, name: str) -> "FlinkGauge": + return FlinkGauge(self._j_metric_group.getGauge(name)) + +class FlinkCounter(Counter): + """Implementation of Counter for flink execution environment.""" + def __init__(self, j_counter: Any) -> None: + """Initialize a flink runner context with the given java runner context. + + Parameters + ---------- + j_counter : Any + Java counter used for measuring the count of events. + """ + self._j_counter = j_counter + + @override + def inc(self, n: int = 1) -> None: + """Increment the current count by the given value.""" + self._j_counter.inc(n) + + @override + def dec(self, n: int = 1) -> None: + """Decrement the current count by the given value.""" + self._j_counter.dec(n) + + @override + def get_count(self) -> int: + """Return the current count.""" + return self._j_counter.getCount() + +class FlinkMeter(Meter): + """Implementation of Meter for flink execution environment.""" + def __init__(self, j_meter: Any) -> None: + """Initialize a flink meter with the given java meter. + + Parameters + ---------- + j_meter : Any + Java meter measures throughput. + """ + self._j_meter = j_meter + + @override + def mark(self, n: int = 1) -> None: + """Mark the occurrence of n events.""" + self._j_meter.markEvent(n) + + @override + def get_rate(self) -> float: + """Return the current event rate per second.""" + return self._j_meter.getRate() + +class FlinkHistogram(Histogram): + """Implementation of Histogram for flink execution environment.""" + def __init__(self, j_histogram: Any) -> None: + """Initialize a flink histogram with the given java histogram. + + Parameters + ---------- + j_histogram : Any + Java histogram used for recording values and computing statistical + summaries. + """ + self._j_histogram = j_histogram + self._j_statistics = j_histogram.getStatistics() + + @override + def update(self, value: int) -> None: + """Record a new value into the histogram.""" + self._j_histogram.update(value) + + @override + def get_mean(self) -> float: + """Return the average value.""" + return self._j_statistics.getMean() + + @override + def get_max(self) -> int: + """Return the maximum recorded value.""" + return self._j_statistics.getMax() + + @override + def get_min(self) -> int: + """Return the minimum recorded value.""" + return self._j_statistics.getMin() + +class FlinkGauge(Gauge): + """Implementation of Gauge for flink execution environment.""" + def __init__(self, j_gauge: Any) -> None: + """Initialize a flink gauge with the given java gauge. + + Parameters + ---------- + j_gauge : Any + Java gauge for recording a string value. + """ + self._j_gauge = j_gauge + + @override + def update(self, value: str) -> None: Review Comment: This is to ensure consistency with the types returned by get_value, preventing confusion for the user. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org