AndrewJSchofield commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1369829206


##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/LastValueTracker.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A LastValueTracker uses a ConcurrentMap to maintain historic values for a 
given key, and return
+ * a previous value and an Instant for that value.
+ *
+ * @param <T> The type of the value.
+ */
+public class LastValueTracker<T> {
+    private final ConcurrentMap<MetricKey, 
AtomicReference<InstantAndValue<T>>> counters = new ConcurrentHashMap<>();
+
+    /**
+     * Return the last instant/value for the given MetricKey, or 
Optional.empty if there isn't one.
+     *
+     * @param metricKey the key for which to calculate a getAndSet.
+     * @param now the timestamp for the new value.
+     * @param value the current value.
+     * @return the timestamp of the previous entry and its value. If there
+     *     isn't a previous entry, then this method returns {@link 
Optional#empty()}
+     */
+    public Optional<InstantAndValue<T>> getAndSet(MetricKey metricKey, Instant 
now, T value) {
+        InstantAndValue<T> instantAndValue = new InstantAndValue<>(now, value);
+        AtomicReference<InstantAndValue<T>> valueOrNull = counters
+            .putIfAbsent(metricKey, new AtomicReference<>(instantAndValue));
+
+        // there wasn't already an entry, so return empty.
+        if (valueOrNull == null) {
+            return Optional.empty();
+        }
+
+        // Update the atomic ref to point to our new InstantAndValue, but get 
the previous value
+        InstantAndValue<T> previousValue = 
valueOrNull.getAndSet(instantAndValue);
+
+        // Return the instance and the value.
+        return Optional.of(new 
InstantAndValue<>(previousValue.getIntervalStart(), previousValue.getValue()));

Review Comment:
   You could just return the `Optional.of(previousValue)` since 
`InstantAndValue` contains only final members and cannot change.



##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * <ol>
+ *     <li>{@link Gauge}</li>
+ *     <li>{@link Measurable}</li>
+ * </ol>
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * <p>
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * <p>
+ *
+ * Should total and sum be treated as a monotonically increasing counter ?

Review Comment:
   It seems a bit strange having this kind of narrative in here. This should be 
an authoritative statement of behaviour, with a lot less "should" and "seem".



##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class represents a telemetry metric that does not yet contain resource 
tags.
+ * These additional resource tags will be added before emitting metrics by the 
telemetry reporter.
+ */
+public class SinglePointMetric implements MetricKeyable {
+
+    private final MetricKey key;
+    private final Metric.Builder metricBuilder;
+
+    private SinglePointMetric(MetricKey key, Metric.Builder metricBuilder) {
+        this.key = key;
+        this.metricBuilder = metricBuilder;
+    }
+
+    @Override
+    public MetricKey key() {
+        return key;
+    }
+
+    public Metric.Builder metric() {

Review Comment:
   I think this is actually `builder()` rather than `metric()` to be less 
confusing.



##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class represents a telemetry metric that does not yet contain resource 
tags.
+ * These additional resource tags will be added before emitting metrics by the 
telemetry reporter.
+ */
+public class SinglePointMetric implements MetricKeyable {
+
+    private final MetricKey key;
+    private final Metric.Builder metricBuilder;
+
+    private SinglePointMetric(MetricKey key, Metric.Builder metricBuilder) {
+        this.key = key;
+        this.metricBuilder = metricBuilder;
+    }
+
+    @Override
+    public MetricKey key() {
+        return key;
+    }
+
+    public Metric.Builder metric() {
+        return metricBuilder;
+    }
+
+    public static SinglePointMetric create(MetricKey metricKey, Metric.Builder 
metric) {
+        return new SinglePointMetric(metricKey, metric);
+    }
+
+    /*
+        Methods to construct gauge metric type.
+     */
+    public static SinglePointMetric gauge(MetricKey metricKey, Number value, 
Instant timestamp) {
+        NumberDataPoint.Builder point = point(timestamp, value);
+        return gauge(metricKey, point);
+    }
+
+    public static SinglePointMetric gauge(MetricKey metricKey, double value, 
Instant timestamp) {
+        NumberDataPoint.Builder point = point(timestamp, value);
+        return gauge(metricKey, point);
+    }
+
+    /*
+        Methods to construct sum metric type.
+     */
+
+    public static SinglePointMetric sum(MetricKey metricKey, double value, 
boolean monotonic, Instant timestamp) {
+        return sum(metricKey, value, monotonic, timestamp, null);
+    }
+
+    public static SinglePointMetric sum(MetricKey metricKey, double value, 
boolean monotonic, Instant timestamp,
+        Instant startTimestamp) {
+        NumberDataPoint.Builder point = point(timestamp, value);
+        if (startTimestamp != null) {
+            point.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
+        }
+
+        return sum(metricKey, 
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, monotonic, point);
+    }
+
+    public static SinglePointMetric deltaSum(MetricKey metricKey, double 
value, boolean monotonic,
+        Instant timestamp, Instant startTimestamp) {
+        NumberDataPoint.Builder point = point(timestamp, value)

Review Comment:
   I think you require non-null for startTimestamp here too.



##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/Provider.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.resource.v1.Resource;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.metrics.MetricsContext;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides specification which are used to collect metrics.
+ */
+public interface Provider extends Configurable {
+
+    /**
+     * Validate that all the data required for generating correct metrics is 
present. The provider
+     * will be disabled if validation fails.
+     *
+     * @param metricsContext {@link MetricsContext}
+     * @return false if all the data required for generating correct metrics 
is missing, true
+     * otherwise.
+     */
+    boolean validate(MetricsContext metricsContext, Map<String, ?> config);
+
+    /**
+     * Domain of the active provider i.e. specifies prefix to the metrics.
+     *
+     * @return Domain in string format.
+     */
+    String domain();
+
+    /**
+     * The metrics resource for this provider which will be used to generate 
the metrics.
+     *
+     * @return A fully formed {@link Resource} will all the tags.

Review Comment:
   Typo in comment. `with all the tags` I think.



##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/Provider.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.resource.v1.Resource;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.metrics.MetricsContext;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides specification which are used to collect metrics.
+ */
+public interface Provider extends Configurable {

Review Comment:
   I'd give this a more descriptive name than `Provider`. There are already 
lots of providers, so maybe it's a `MetricsProvider`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to