cadonna commented on code in PR #17820: URL: https://github.com/apache/kafka/pull/17820#discussion_r1843502960
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java: ########## @@ -413,6 +415,39 @@ public void shouldAddThreadStartTimeMetric() { ); } + @Test + public void shouldAddThreadStateTelemetryMetric() { + final Gauge<Integer> threadStateProvider = (streamsMetrics, startTime) -> StreamThread.State.RUNNING.ordinal(); + ThreadMetrics.addThreadStateTelemetryMetric( + "threadId", Review Comment: There is a constant for a test thread ID: ```suggestion THREAD_ID, ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java: ########## @@ -413,6 +415,39 @@ public void shouldAddThreadStartTimeMetric() { ); } + @Test + public void shouldAddThreadStateTelemetryMetric() { + final Gauge<Integer> threadStateProvider = (streamsMetrics, startTime) -> StreamThread.State.RUNNING.ordinal(); + ThreadMetrics.addThreadStateTelemetryMetric( + "threadId", + streamsMetrics, + threadStateProvider + ); + verify(streamsMetrics).addThreadLevelMutableMetric( + "thread-state", + "The current state of the thread", + "threadId", Review Comment: ```suggestion THREAD_ID, ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java: ########## @@ -413,6 +415,39 @@ public void shouldAddThreadStartTimeMetric() { ); } + @Test + public void shouldAddThreadStateTelemetryMetric() { + final Gauge<Integer> threadStateProvider = (streamsMetrics, startTime) -> StreamThread.State.RUNNING.ordinal(); + ThreadMetrics.addThreadStateTelemetryMetric( + "threadId", + streamsMetrics, + threadStateProvider + ); + verify(streamsMetrics).addThreadLevelMutableMetric( + "thread-state", + "The current state of the thread", + "threadId", + threadStateProvider + ); + } + + @Test + public void shouldAddThreadStateJMXMetric() { + final Gauge<String> threadStateProvider = (streamsMetrics, startTime) -> StreamThread.State.RUNNING.name().toLowerCase(Locale.getDefault()); + ThreadMetrics.addThreadStateMetric( + "threadId", + streamsMetrics, + threadStateProvider + ); + verify(streamsMetrics).addThreadLevelMutableMetric( + "state", + "The current state of the thread", + "threadId", Review Comment: ```suggestion THREAD_ID, ``` ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1067,7 +1070,7 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin private static Metrics createMetrics(final StreamsConfig config, final Time time, final String clientId) { final MetricConfig metricConfig = new MetricConfig() .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) - .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))) + .recordLevel(Sensor.RecordingLevel.forName(config.getString(METRICS_RECORDING_LEVEL_CONFIG))) Review Comment: Why did you remove the `StreamsConfig.` prefix? For all other config names, we use the prefix. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -614,6 +615,12 @@ public StreamThread(final Time time, streamsMetrics, time.milliseconds() ); + ThreadMetrics.addThreadStateTelemetryMetric(threadId, Review Comment: nit: we usually use 4 spaces indentation. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java: ########## @@ -413,6 +415,39 @@ public void shouldAddThreadStartTimeMetric() { ); } + @Test + public void shouldAddThreadStateTelemetryMetric() { + final Gauge<Integer> threadStateProvider = (streamsMetrics, startTime) -> StreamThread.State.RUNNING.ordinal(); + ThreadMetrics.addThreadStateTelemetryMetric( + "threadId", + streamsMetrics, + threadStateProvider + ); + verify(streamsMetrics).addThreadLevelMutableMetric( + "thread-state", + "The current state of the thread", + "threadId", + threadStateProvider + ); + } + + @Test + public void shouldAddThreadStateJMXMetric() { Review Comment: nit: ```suggestion public void shouldAddThreadStateJmxMetric() { ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java: ########## @@ -413,6 +415,39 @@ public void shouldAddThreadStartTimeMetric() { ); } + @Test + public void shouldAddThreadStateTelemetryMetric() { + final Gauge<Integer> threadStateProvider = (streamsMetrics, startTime) -> StreamThread.State.RUNNING.ordinal(); + ThreadMetrics.addThreadStateTelemetryMetric( + "threadId", + streamsMetrics, + threadStateProvider + ); + verify(streamsMetrics).addThreadLevelMutableMetric( + "thread-state", + "The current state of the thread", + "threadId", + threadStateProvider + ); + } + + @Test + public void shouldAddThreadStateJMXMetric() { + final Gauge<String> threadStateProvider = (streamsMetrics, startTime) -> StreamThread.State.RUNNING.name().toLowerCase(Locale.getDefault()); + ThreadMetrics.addThreadStateMetric( + "threadId", Review Comment: ```suggestion THREAD_ID, ``` ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ########## @@ -180,15 +180,16 @@ public void shouldPushMetricsToBroker(final String recordingLevel) throws Except final String name = mn.name().replace('-', '.'); final String group = mn.group().replace("-metrics", "").replace('-', '.'); return "org.apache.kafka." + group + "." + name; - }).sorted().collect(Collectors.toList()); + }).filter(name -> !name.equals("org.apache.kafka.stream.thread.state"))// telemetry reporter filters out string metrics + .sorted().collect(Collectors.toList()); Review Comment: I do not understand this. According to the KIP and the code in `ThreadMetrics` the type of the metric is numeric. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org