mjsax commented on code in PR #17820: URL: https://github.com/apache/kafka/pull/17820#discussion_r1843070888
########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1250,6 +1253,20 @@ private Optional<String> removeStreamThread(final long timeoutMs) throws Timeout return Optional.empty(); } + private int calculateMetricsRecordingLevel() { + final int recordingLevel; + final String recordingLevelString = applicationConfigs.getString(METRICS_RECORDING_LEVEL_CONFIG); + if (recordingLevelString.equals("INFO")) { + recordingLevel = 0; + } else if (recordingLevelString.equals("DEBUG")) { + recordingLevel = 1; + } else { + // Must be TRACE level Review Comment: Might be better to use another `if` and final `else` throw an exception as safe guard? Would also highlight that we need to update this code, in case we add a new level? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -614,6 +615,12 @@ public StreamThread(final Time time, streamsMetrics, time.milliseconds() ); + ThreadMetrics.addThreadStateTelemetryMetric(threadId, + streamsMetrics, + (metricConfig, now) -> this.state().ordinal()); + ThreadMetrics.addThreadStateMetric(threadId, Review Comment: as above ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1250,6 +1253,20 @@ private Optional<String> removeStreamThread(final long timeoutMs) throws Timeout return Optional.empty(); } + private int calculateMetricsRecordingLevel() { + final int recordingLevel; + final String recordingLevelString = applicationConfigs.getString(METRICS_RECORDING_LEVEL_CONFIG); + if (recordingLevelString.equals("INFO")) { Review Comment: Why not use a `switch` statement? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java: ########## @@ -290,6 +295,30 @@ public static void addThreadStartTimeMetric(final String threadId, ); } + public static void addThreadStateTelemetryMetric(final String threadId, + final StreamsMetricsImpl streamsMetrics, + final Gauge<Integer> threadStateProvider) { + streamsMetrics.addThreadLevelMutableMetric( + THREAD_STATE, Review Comment: nit: indention too deep (should only be 4 whitespace, not 8 -- should be an IDE setting) ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java: ########## @@ -290,6 +295,30 @@ public static void addThreadStartTimeMetric(final String threadId, ); } + public static void addThreadStateTelemetryMetric(final String threadId, + final StreamsMetricsImpl streamsMetrics, + final Gauge<Integer> threadStateProvider) { + streamsMetrics.addThreadLevelMutableMetric( + THREAD_STATE, + THREAD_STATE_DESCRIPTION, + threadId, + threadStateProvider + ); + } + + public static void addThreadStateMetric(final String threadId, + final StreamsMetricsImpl streamsMetrics, + final Gauge<String> threadStateProvider) { + streamsMetrics.addThreadLevelMutableMetric( + STATE, Review Comment: as above ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -614,6 +615,12 @@ public StreamThread(final Time time, streamsMetrics, time.milliseconds() ); + ThreadMetrics.addThreadStateTelemetryMetric(threadId, Review Comment: nit: `threadId` should be in the next line by itself ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ########## @@ -180,15 +180,16 @@ public void shouldPushMetricsToBroker(final String recordingLevel) throws Except final String name = mn.name().replace('-', '.'); final String group = mn.group().replace("-metrics", "").replace('-', '.'); return "org.apache.kafka." + group + "." + name; - }).sorted().collect(Collectors.toList()); + }).filter(name -> !name.equals("org.apache.kafka.stream.thread.state"))// telemetry reporter filters out string metrics + .sorted().collect(Collectors.toList()); final List<String> actualMetrics = new ArrayList<>(TelemetryPlugin.SUBSCRIBED_METRICS.get(mainConsumerInstanceId)); assertEquals(expectedMetrics, actualMetrics); TestUtils.waitForCondition(() -> !TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId).isEmpty(), 30_000, "Never received subscribed metrics"); final List<String> actualInstanceMetrics = TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId); - final List<String> expectedInstanceMetrics = Arrays.asList("org.apache.kafka.stream.alive.stream.threads", "org.apache.kafka.stream.failed.stream.threads"); + final List<String> expectedInstanceMetrics = Arrays.asList("org.apache.kafka.stream.alive.stream.threads", "org.apache.kafka.stream.client.state", "org.apache.kafka.stream.failed.stream.threads", "org.apache.kafka.stream.recording.level"); Review Comment: Nit: line too long. How about ``` final List<String> expectedInstanceMetrics = Arrays.asList( "org.apache.kafka.stream.client.state", "org.apache.kafka.stream.alive.stream.threads", "org.apache.kafka.stream.failed.stream.threads", "org.apache.kafka.stream.recording.level" ); ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -614,6 +615,12 @@ public StreamThread(final Time time, streamsMetrics, time.milliseconds() ); + ThreadMetrics.addThreadStateTelemetryMetric(threadId, + streamsMetrics, + (metricConfig, now) -> this.state().ordinal()); + ThreadMetrics.addThreadStateMetric(threadId, + streamsMetrics, + (metricConfig, now) -> this.state().name().toLowerCase(Locale.getDefault())); Review Comment: For the corresponding client metric we just use ``` ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state); ``` Why so "complicated" -- I am also ok to update the code for the client metric. But both should be the same? (Or maybe keep `... -> state` and add a "fancy" `toString()` overload to both `enum` (for client and thread) which model the state? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org