bbejeck commented on code in PR #18953:
URL: https://github.com/apache/kafka/pull/18953#discussion_r1960453578


##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -259,10 +317,10 @@ public void 
shouldPassCorrectMetricsDynamicInstances(final boolean stateUpdaterE
             
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streamsOne);
 
             final List<MetricName> streamsTaskMetricNames = 
streamsOne.metrics().values().stream().map(Metric::metricName)
-                    .filter(metricName -> 
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+                    .filter(metricName -> 
metricName.tags().containsKey("task-id")).toList();

Review Comment:
   side cleanup



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -144,6 +157,51 @@ public void tearDown() throws Exception {
         if (!streamsSecondApplicationProperties.isEmpty()) {
             
IntegrationTestUtils.purgeLocalStreamsState(streamsSecondApplicationProperties);
         }
+        if (globalStoreIterator != null) {
+            globalStoreIterator.close();
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"INFO", "DEBUG", "TRACE"})
+    public void shouldPushGlobalThreadMetricsToBroker(final String 
recordingLevel) throws Exception {

Review Comment:
   New test for validating broker plugin emits the global thread metrics.  I 
thought of updating the existing broker plugin test, but it's busy enough as is.



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -525,7 +619,7 @@ public void exportMetrics(final AuthorizableRequestContext 
context, final Client
                         .stream()
                         .flatMap(rm -> rm.getScopeMetricsList().stream())
                         .flatMap(sm -> sm.getMetricsList().stream())
-                        .map(metric -> metric.getGauge())
+                        
.map(org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.Metric::getGauge)

Review Comment:
   side cleanup as pointed out by IntelliJ not sure using a method handle here 
and below is better than using a lambda in this case due to needing the fully 
qualified name



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -144,6 +157,51 @@ public void tearDown() throws Exception {
         if (!streamsSecondApplicationProperties.isEmpty()) {
             
IntegrationTestUtils.purgeLocalStreamsState(streamsSecondApplicationProperties);
         }
+        if (globalStoreIterator != null) {
+            globalStoreIterator.close();

Review Comment:
   Required from test - explaination below adding the global store



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -215,21 +273,21 @@ public void shouldPushMetricsToBroker(final String 
recordingLevel) throws Except
     public void shouldPassMetrics(final String topologyType, final boolean 
stateUpdaterEnabled) throws Exception {
         // Streams metrics should get passed to Admin and Consumer
         streamsApplicationProperties = props(stateUpdaterEnabled);
-        final Topology topology = topologyType.equals("simple") ? 
simpleTopology() : complexTopology();
+        final Topology topology = topologyType.equals("simple") ? 
simpleTopology(false) : complexTopology();
        
         try (final KafkaStreams streams = new KafkaStreams(topology, 
streamsApplicationProperties)) {
             IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
 
             final List<MetricName> streamsThreadMetrics = 
streams.metrics().values().stream().map(Metric::metricName)
-                    .filter(metricName -> 
metricName.tags().containsKey("thread-id")).collect(Collectors.toList());
+                    .filter(metricName -> 
metricName.tags().containsKey("thread-id")).toList();

Review Comment:
   side cleanup



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -215,21 +273,21 @@ public void shouldPushMetricsToBroker(final String 
recordingLevel) throws Except
     public void shouldPassMetrics(final String topologyType, final boolean 
stateUpdaterEnabled) throws Exception {
         // Streams metrics should get passed to Admin and Consumer
         streamsApplicationProperties = props(stateUpdaterEnabled);
-        final Topology topology = topologyType.equals("simple") ? 
simpleTopology() : complexTopology();
+        final Topology topology = topologyType.equals("simple") ? 
simpleTopology(false) : complexTopology();
        
         try (final KafkaStreams streams = new KafkaStreams(topology, 
streamsApplicationProperties)) {
             IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
 
             final List<MetricName> streamsThreadMetrics = 
streams.metrics().values().stream().map(Metric::metricName)
-                    .filter(metricName -> 
metricName.tags().containsKey("thread-id")).collect(Collectors.toList());
+                    .filter(metricName -> 
metricName.tags().containsKey("thread-id")).toList();
 
             final List<MetricName> streamsClientMetrics = 
streams.metrics().values().stream().map(Metric::metricName)
-                    .filter(metricName -> 
metricName.group().equals("stream-metrics")).collect(Collectors.toList());
+                    .filter(metricName -> 
metricName.group().equals("stream-metrics")).toList();

Review Comment:
   side cleanup



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -350,10 +408,10 @@ public void passedMetricsShouldNotLeakIntoClientMetrics() 
throws Exception {
             IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
 
             final List<MetricName> streamsThreadMetrics = 
streams.metrics().values().stream().map(Metric::metricName)
-                    .filter(metricName -> 
metricName.tags().containsKey("thread-id")).collect(Collectors.toList());
+                    .filter(metricName -> 
metricName.tags().containsKey("thread-id")).toList();

Review Comment:
   side cleanup here and the next



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -293,24 +351,24 @@ public void 
shouldPassCorrectMetricsDynamicInstances(final boolean stateUpdaterE
                 );
 
                 final List<MetricName> streamsOneTaskMetrics = 
streamsOne.metrics().values().stream().map(Metric::metricName)
-                        .filter(metricName -> 
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+                        .filter(metricName -> 
metricName.tags().containsKey("task-id")).toList();
                 final List<MetricName> streamsOneStateMetrics = 
streamsOne.metrics().values().stream().map(Metric::metricName)
-                        .filter(metricName -> 
metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+                        .filter(metricName -> 
metricName.group().equals("stream-state-metrics")).toList();
                 
                 final List<MetricName> consumerOnePassedTaskMetrics = 
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT)
-                        
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> 
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+                        
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> 
metricName.tags().containsKey("task-id")).toList();
                 final List<MetricName> consumerOnePassedStateMetrics = 
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT)
-                        
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> 
metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+                        
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> 
metricName.group().equals("stream-state-metrics")).toList();
 
                 final List<MetricName> streamsTwoTaskMetrics = 
streamsTwo.metrics().values().stream().map(Metric::metricName)
-                        .filter(metricName -> 
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+                        .filter(metricName -> 
metricName.tags().containsKey("task-id")).toList();
                 final List<MetricName> streamsTwoStateMetrics = 
streamsTwo.metrics().values().stream().map(Metric::metricName)
-                        .filter(metricName -> 
metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+                        .filter(metricName -> 
metricName.group().equals("stream-state-metrics")).toList();
                 
                 final List<MetricName> consumerTwoPassedTaskMetrics = 
INTERCEPTING_CONSUMERS.get(SECOND_INSTANCE_CLIENT)
-                        
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> 
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+                        
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> 
metricName.tags().containsKey("task-id")).toList();
                 final List<MetricName> consumerTwoPassedStateMetrics = 
INTERCEPTING_CONSUMERS.get(SECOND_INSTANCE_CLIENT)
-                        
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> 
metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+                        
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> 
metricName.group().equals("stream-state-metrics")).toList();

Review Comment:
   side cleanup



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -259,10 +317,10 @@ public void 
shouldPassCorrectMetricsDynamicInstances(final boolean stateUpdaterE
             
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streamsOne);
 
             final List<MetricName> streamsTaskMetricNames = 
streamsOne.metrics().values().stream().map(Metric::metricName)
-                    .filter(metricName -> 
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+                    .filter(metricName -> 
metricName.tags().containsKey("task-id")).toList();
 
             final List<MetricName> consumerPassedStreamTaskMetricNames = 
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName)
-                    .filter(metricName -> 
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+                    .filter(metricName -> 
metricName.tags().containsKey("task-id")).toList();

Review Comment:
   side cleanup



##########
streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java:
##########
@@ -37,10 +38,10 @@ public class StreamsThreadMetricsDelegatingReporter 
implements MetricsReporter {
     private final String stateUpdaterThreadId;
 
 
-    public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[], 
byte[]> consumer, final String threadId, final String stateUpdaterThreadId) {
+    public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[], 
byte[]> consumer, final String threadId, final Optional<String> 
stateUpdaterThreadId) {
         this.consumer = Objects.requireNonNull(consumer);
         this.threadId = Objects.requireNonNull(threadId);
-        this.stateUpdaterThreadId = 
Objects.requireNonNull(stateUpdaterThreadId);
+        this.stateUpdaterThreadId = stateUpdaterThreadId.orElse("");

Review Comment:
   Thinking this should be `stateUpdaterThreadId.orElse(null);` instead - 
thoughts?



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -419,8 +485,36 @@ private Topology complexTopology() {
         return builder.build();
     }
 
-    private Topology simpleTopology() {
+    private void addGlobalStore(final StreamsBuilder builder) {
+        builder.addGlobalStore(Stores.keyValueStoreBuilder(
+                Stores.inMemoryKeyValueStore("iq-test-store"),
+                Serdes.String(),
+                Serdes.String()
+        ),
+                globalStoreTopic,
+                Consumed.with(Serdes.String(), Serdes.String()),
+                () -> new Processor<>() {
+                    private KeyValueStore<String, String> store;
+
+                    @Override
+                    public void init(final ProcessorContext<Void, Void> 
context) {
+                        store = context.getStateStore("iq-test-store");
+                    }
+
+                    @Override
+                    public void process(final Record<String, String> record) {
+                        store.put(record.key(), record.value());
+                        globalStoreIterator = store.all();

Review Comment:
   The store iterator is intentionally not closed here as it needs to be open 
during the test so the Streams app will emit the 
`org.apache.kafka.stream.state.oldest.iterator.open.since.ms` metric that is 
expected.  So the `globalStoreIterator` is a global variable (pun not intended) 
so it can be closed in the `tearDown` method.



##########
streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java:
##########
@@ -37,10 +38,10 @@ public class StreamsThreadMetricsDelegatingReporter 
implements MetricsReporter {
     private final String stateUpdaterThreadId;
 
 
-    public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[], 
byte[]> consumer, final String threadId, final String stateUpdaterThreadId) {
+    public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[], 
byte[]> consumer, final String threadId, final Optional<String> 
stateUpdaterThreadId) {

Review Comment:
   Since the reporting process is exactly the same for the global thread as a 
stream thread, IMHO it made sense to re-use the existing reporter class. But it 
seems better to have the `stateUpdaterThreadId` represented as an `Optional` to 
signal that it may not be present



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -215,21 +273,21 @@ public void shouldPushMetricsToBroker(final String 
recordingLevel) throws Except
     public void shouldPassMetrics(final String topologyType, final boolean 
stateUpdaterEnabled) throws Exception {
         // Streams metrics should get passed to Admin and Consumer
         streamsApplicationProperties = props(stateUpdaterEnabled);
-        final Topology topology = topologyType.equals("simple") ? 
simpleTopology() : complexTopology();
+        final Topology topology = topologyType.equals("simple") ? 
simpleTopology(false) : complexTopology();
        
         try (final KafkaStreams streams = new KafkaStreams(topology, 
streamsApplicationProperties)) {
             IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
 
             final List<MetricName> streamsThreadMetrics = 
streams.metrics().values().stream().map(Metric::metricName)
-                    .filter(metricName -> 
metricName.tags().containsKey("thread-id")).collect(Collectors.toList());
+                    .filter(metricName -> 
metricName.tags().containsKey("thread-id")).toList();
 
             final List<MetricName> streamsClientMetrics = 
streams.metrics().values().stream().map(Metric::metricName)
-                    .filter(metricName -> 
metricName.group().equals("stream-metrics")).collect(Collectors.toList());
+                    .filter(metricName -> 
metricName.group().equals("stream-metrics")).toList();
 
 
 
-            final List<MetricName> consumerPassedStreamThreadMetricNames = 
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).collect(Collectors.toList());
-            final List<MetricName> adminPassedStreamClientMetricNames = 
INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).collect(Collectors.toList());
+            final List<MetricName> consumerPassedStreamThreadMetricNames = 
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).toList();
+            final List<MetricName> adminPassedStreamClientMetricNames = 
INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).toList();

Review Comment:
   side cleanup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to