bbejeck commented on code in PR #18953: URL: https://github.com/apache/kafka/pull/18953#discussion_r1960453578
########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ########## @@ -259,10 +317,10 @@ public void shouldPassCorrectMetricsDynamicInstances(final boolean stateUpdaterE IntegrationTestUtils.startApplicationAndWaitUntilRunning(streamsOne); final List<MetricName> streamsTaskMetricNames = streamsOne.metrics().values().stream().map(Metric::metricName) - .filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList()); + .filter(metricName -> metricName.tags().containsKey("task-id")).toList(); Review Comment: side cleanup ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ########## @@ -144,6 +157,51 @@ public void tearDown() throws Exception { if (!streamsSecondApplicationProperties.isEmpty()) { IntegrationTestUtils.purgeLocalStreamsState(streamsSecondApplicationProperties); } + if (globalStoreIterator != null) { + globalStoreIterator.close(); + } + } + + @ParameterizedTest + @ValueSource(strings = {"INFO", "DEBUG", "TRACE"}) + public void shouldPushGlobalThreadMetricsToBroker(final String recordingLevel) throws Exception { Review Comment: New test for validating broker plugin emits the global thread metrics. I thought of updating the existing broker plugin test, but it's busy enough as is. ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ########## @@ -525,7 +619,7 @@ public void exportMetrics(final AuthorizableRequestContext context, final Client .stream() .flatMap(rm -> rm.getScopeMetricsList().stream()) .flatMap(sm -> sm.getMetricsList().stream()) - .map(metric -> metric.getGauge()) + .map(org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.Metric::getGauge) Review Comment: side cleanup as pointed out by IntelliJ not sure using a method handle here and below is better than using a lambda in this case due to needing the fully qualified name ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ########## @@ -144,6 +157,51 @@ public void tearDown() throws Exception { if (!streamsSecondApplicationProperties.isEmpty()) { IntegrationTestUtils.purgeLocalStreamsState(streamsSecondApplicationProperties); } + if (globalStoreIterator != null) { + globalStoreIterator.close(); Review Comment: Required from test - explaination below adding the global store ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ########## @@ -215,21 +273,21 @@ public void shouldPushMetricsToBroker(final String recordingLevel) throws Except public void shouldPassMetrics(final String topologyType, final boolean stateUpdaterEnabled) throws Exception { // Streams metrics should get passed to Admin and Consumer streamsApplicationProperties = props(stateUpdaterEnabled); - final Topology topology = topologyType.equals("simple") ? simpleTopology() : complexTopology(); + final Topology topology = topologyType.equals("simple") ? simpleTopology(false) : complexTopology(); try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) { IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); final List<MetricName> streamsThreadMetrics = streams.metrics().values().stream().map(Metric::metricName) - .filter(metricName -> metricName.tags().containsKey("thread-id")).collect(Collectors.toList()); + .filter(metricName -> metricName.tags().containsKey("thread-id")).toList(); Review Comment: side cleanup ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ########## @@ -215,21 +273,21 @@ public void shouldPushMetricsToBroker(final String recordingLevel) throws Except public void shouldPassMetrics(final String topologyType, final boolean stateUpdaterEnabled) throws Exception { // Streams metrics should get passed to Admin and Consumer streamsApplicationProperties = props(stateUpdaterEnabled); - final Topology topology = topologyType.equals("simple") ? simpleTopology() : complexTopology(); + final Topology topology = topologyType.equals("simple") ? simpleTopology(false) : complexTopology(); try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) { IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); final List<MetricName> streamsThreadMetrics = streams.metrics().values().stream().map(Metric::metricName) - .filter(metricName -> metricName.tags().containsKey("thread-id")).collect(Collectors.toList()); + .filter(metricName -> metricName.tags().containsKey("thread-id")).toList(); final List<MetricName> streamsClientMetrics = streams.metrics().values().stream().map(Metric::metricName) - .filter(metricName -> metricName.group().equals("stream-metrics")).collect(Collectors.toList()); + .filter(metricName -> metricName.group().equals("stream-metrics")).toList(); Review Comment: side cleanup ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ########## @@ -350,10 +408,10 @@ public void passedMetricsShouldNotLeakIntoClientMetrics() throws Exception { IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); final List<MetricName> streamsThreadMetrics = streams.metrics().values().stream().map(Metric::metricName) - .filter(metricName -> metricName.tags().containsKey("thread-id")).collect(Collectors.toList()); + .filter(metricName -> metricName.tags().containsKey("thread-id")).toList(); Review Comment: side cleanup here and the next ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ########## @@ -293,24 +351,24 @@ public void shouldPassCorrectMetricsDynamicInstances(final boolean stateUpdaterE ); final List<MetricName> streamsOneTaskMetrics = streamsOne.metrics().values().stream().map(Metric::metricName) - .filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList()); + .filter(metricName -> metricName.tags().containsKey("task-id")).toList(); final List<MetricName> streamsOneStateMetrics = streamsOne.metrics().values().stream().map(Metric::metricName) - .filter(metricName -> metricName.group().equals("stream-state-metrics")).collect(Collectors.toList()); + .filter(metricName -> metricName.group().equals("stream-state-metrics")).toList(); final List<MetricName> consumerOnePassedTaskMetrics = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT) - .passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList()); + .passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> metricName.tags().containsKey("task-id")).toList(); final List<MetricName> consumerOnePassedStateMetrics = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT) - .passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> metricName.group().equals("stream-state-metrics")).collect(Collectors.toList()); + .passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> metricName.group().equals("stream-state-metrics")).toList(); final List<MetricName> streamsTwoTaskMetrics = streamsTwo.metrics().values().stream().map(Metric::metricName) - .filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList()); + .filter(metricName -> metricName.tags().containsKey("task-id")).toList(); final List<MetricName> streamsTwoStateMetrics = streamsTwo.metrics().values().stream().map(Metric::metricName) - .filter(metricName -> metricName.group().equals("stream-state-metrics")).collect(Collectors.toList()); + .filter(metricName -> metricName.group().equals("stream-state-metrics")).toList(); final List<MetricName> consumerTwoPassedTaskMetrics = INTERCEPTING_CONSUMERS.get(SECOND_INSTANCE_CLIENT) - .passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList()); + .passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> metricName.tags().containsKey("task-id")).toList(); final List<MetricName> consumerTwoPassedStateMetrics = INTERCEPTING_CONSUMERS.get(SECOND_INSTANCE_CLIENT) - .passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> metricName.group().equals("stream-state-metrics")).collect(Collectors.toList()); + .passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> metricName.group().equals("stream-state-metrics")).toList(); Review Comment: side cleanup ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ########## @@ -259,10 +317,10 @@ public void shouldPassCorrectMetricsDynamicInstances(final boolean stateUpdaterE IntegrationTestUtils.startApplicationAndWaitUntilRunning(streamsOne); final List<MetricName> streamsTaskMetricNames = streamsOne.metrics().values().stream().map(Metric::metricName) - .filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList()); + .filter(metricName -> metricName.tags().containsKey("task-id")).toList(); final List<MetricName> consumerPassedStreamTaskMetricNames = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName) - .filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList()); + .filter(metricName -> metricName.tags().containsKey("task-id")).toList(); Review Comment: side cleanup ########## streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java: ########## @@ -37,10 +38,10 @@ public class StreamsThreadMetricsDelegatingReporter implements MetricsReporter { private final String stateUpdaterThreadId; - public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[], byte[]> consumer, final String threadId, final String stateUpdaterThreadId) { + public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[], byte[]> consumer, final String threadId, final Optional<String> stateUpdaterThreadId) { this.consumer = Objects.requireNonNull(consumer); this.threadId = Objects.requireNonNull(threadId); - this.stateUpdaterThreadId = Objects.requireNonNull(stateUpdaterThreadId); + this.stateUpdaterThreadId = stateUpdaterThreadId.orElse(""); Review Comment: Thinking this should be `stateUpdaterThreadId.orElse(null);` instead - thoughts? ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ########## @@ -419,8 +485,36 @@ private Topology complexTopology() { return builder.build(); } - private Topology simpleTopology() { + private void addGlobalStore(final StreamsBuilder builder) { + builder.addGlobalStore(Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore("iq-test-store"), + Serdes.String(), + Serdes.String() + ), + globalStoreTopic, + Consumed.with(Serdes.String(), Serdes.String()), + () -> new Processor<>() { + private KeyValueStore<String, String> store; + + @Override + public void init(final ProcessorContext<Void, Void> context) { + store = context.getStateStore("iq-test-store"); + } + + @Override + public void process(final Record<String, String> record) { + store.put(record.key(), record.value()); + globalStoreIterator = store.all(); Review Comment: The store iterator is intentionally not closed here as it needs to be open during the test so the Streams app will emit the `org.apache.kafka.stream.state.oldest.iterator.open.since.ms` metric that is expected. So the `globalStoreIterator` is a global variable (pun not intended) so it can be closed in the `tearDown` method. ########## streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java: ########## @@ -37,10 +38,10 @@ public class StreamsThreadMetricsDelegatingReporter implements MetricsReporter { private final String stateUpdaterThreadId; - public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[], byte[]> consumer, final String threadId, final String stateUpdaterThreadId) { + public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[], byte[]> consumer, final String threadId, final Optional<String> stateUpdaterThreadId) { Review Comment: Since the reporting process is exactly the same for the global thread as a stream thread, IMHO it made sense to re-use the existing reporter class. But it seems better to have the `stateUpdaterThreadId` represented as an `Optional` to signal that it may not be present ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ########## @@ -215,21 +273,21 @@ public void shouldPushMetricsToBroker(final String recordingLevel) throws Except public void shouldPassMetrics(final String topologyType, final boolean stateUpdaterEnabled) throws Exception { // Streams metrics should get passed to Admin and Consumer streamsApplicationProperties = props(stateUpdaterEnabled); - final Topology topology = topologyType.equals("simple") ? simpleTopology() : complexTopology(); + final Topology topology = topologyType.equals("simple") ? simpleTopology(false) : complexTopology(); try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) { IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); final List<MetricName> streamsThreadMetrics = streams.metrics().values().stream().map(Metric::metricName) - .filter(metricName -> metricName.tags().containsKey("thread-id")).collect(Collectors.toList()); + .filter(metricName -> metricName.tags().containsKey("thread-id")).toList(); final List<MetricName> streamsClientMetrics = streams.metrics().values().stream().map(Metric::metricName) - .filter(metricName -> metricName.group().equals("stream-metrics")).collect(Collectors.toList()); + .filter(metricName -> metricName.group().equals("stream-metrics")).toList(); - final List<MetricName> consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).collect(Collectors.toList()); - final List<MetricName> adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).collect(Collectors.toList()); + final List<MetricName> consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).toList(); + final List<MetricName> adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).toList(); Review Comment: side cleanup -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org