mjsax commented on code in PR #17820:
URL: https://github.com/apache/kafka/pull/17820#discussion_r1843070888


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1250,6 +1253,20 @@ private Optional<String> removeStreamThread(final long 
timeoutMs) throws Timeout
         return Optional.empty();
     }
 
+    private int calculateMetricsRecordingLevel() {
+        final int recordingLevel;
+        final String recordingLevelString = 
applicationConfigs.getString(METRICS_RECORDING_LEVEL_CONFIG);
+        if (recordingLevelString.equals("INFO")) {
+            recordingLevel = 0;
+        } else if (recordingLevelString.equals("DEBUG")) {
+            recordingLevel = 1;
+        } else {
+            // Must be TRACE level

Review Comment:
   Might be better to use another `if` and final `else` throw an exception as 
safe guard? Would also highlight that we need to update this code, in case we 
add a new level?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -614,6 +615,12 @@ public StreamThread(final Time time,
             streamsMetrics,
             time.milliseconds()
         );
+        ThreadMetrics.addThreadStateTelemetryMetric(threadId,
+                streamsMetrics,
+                (metricConfig, now) -> this.state().ordinal());
+        ThreadMetrics.addThreadStateMetric(threadId,

Review Comment:
   as above



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1250,6 +1253,20 @@ private Optional<String> removeStreamThread(final long 
timeoutMs) throws Timeout
         return Optional.empty();
     }
 
+    private int calculateMetricsRecordingLevel() {
+        final int recordingLevel;
+        final String recordingLevelString = 
applicationConfigs.getString(METRICS_RECORDING_LEVEL_CONFIG);
+        if (recordingLevelString.equals("INFO")) {

Review Comment:
   Why not use a `switch` statement?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java:
##########
@@ -290,6 +295,30 @@ public static void addThreadStartTimeMetric(final String 
threadId,
         );
     }
 
+    public static void addThreadStateTelemetryMetric(final String threadId,
+                                                     final StreamsMetricsImpl 
streamsMetrics,
+                                                     final Gauge<Integer> 
threadStateProvider) {
+        streamsMetrics.addThreadLevelMutableMetric(
+                THREAD_STATE,

Review Comment:
   nit: indention too deep (should only be 4 whitespace, not 8 -- should be an 
IDE setting)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java:
##########
@@ -290,6 +295,30 @@ public static void addThreadStartTimeMetric(final String 
threadId,
         );
     }
 
+    public static void addThreadStateTelemetryMetric(final String threadId,
+                                                     final StreamsMetricsImpl 
streamsMetrics,
+                                                     final Gauge<Integer> 
threadStateProvider) {
+        streamsMetrics.addThreadLevelMutableMetric(
+                THREAD_STATE,
+                THREAD_STATE_DESCRIPTION,
+                threadId,
+                threadStateProvider
+        );
+    }
+
+    public static void addThreadStateMetric(final String threadId,
+                                            final StreamsMetricsImpl 
streamsMetrics,
+                                            final Gauge<String> 
threadStateProvider) {
+        streamsMetrics.addThreadLevelMutableMetric(
+                STATE,

Review Comment:
   as above



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -614,6 +615,12 @@ public StreamThread(final Time time,
             streamsMetrics,
             time.milliseconds()
         );
+        ThreadMetrics.addThreadStateTelemetryMetric(threadId,

Review Comment:
   nit: `threadId` should be in the next line by itself



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -180,15 +180,16 @@ public void shouldPushMetricsToBroker(final String 
recordingLevel) throws Except
                         final String name = mn.name().replace('-', '.');
                         final String group = mn.group().replace("-metrics", 
"").replace('-', '.');
                         return "org.apache.kafka." + group + "." + name;
-                    }).sorted().collect(Collectors.toList());
+                    }).filter(name -> 
!name.equals("org.apache.kafka.stream.thread.state"))// telemetry reporter 
filters out string metrics
+                    .sorted().collect(Collectors.toList());
             final List<String> actualMetrics = new 
ArrayList<>(TelemetryPlugin.SUBSCRIBED_METRICS.get(mainConsumerInstanceId));
             assertEquals(expectedMetrics, actualMetrics);
 
             TestUtils.waitForCondition(() -> 
!TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId).isEmpty(),
                     30_000,
                     "Never received subscribed metrics");
             final List<String> actualInstanceMetrics = 
TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId);
-            final List<String> expectedInstanceMetrics = 
Arrays.asList("org.apache.kafka.stream.alive.stream.threads", 
"org.apache.kafka.stream.failed.stream.threads");
+            final List<String> expectedInstanceMetrics = 
Arrays.asList("org.apache.kafka.stream.alive.stream.threads", 
"org.apache.kafka.stream.client.state", 
"org.apache.kafka.stream.failed.stream.threads", 
"org.apache.kafka.stream.recording.level");

Review Comment:
   Nit: line too long. How about
   ```
   final List<String> expectedInstanceMetrics = Arrays.asList(
       "org.apache.kafka.stream.client.state",
       "org.apache.kafka.stream.alive.stream.threads",
       "org.apache.kafka.stream.failed.stream.threads",
       "org.apache.kafka.stream.recording.level"
   );
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -614,6 +615,12 @@ public StreamThread(final Time time,
             streamsMetrics,
             time.milliseconds()
         );
+        ThreadMetrics.addThreadStateTelemetryMetric(threadId,
+                streamsMetrics,
+                (metricConfig, now) -> this.state().ordinal());
+        ThreadMetrics.addThreadStateMetric(threadId,
+                streamsMetrics,
+                (metricConfig, now) -> 
this.state().name().toLowerCase(Locale.getDefault()));

Review Comment:
   For the corresponding client metric we just use
   ```
    ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state);
    ```
    
    Why so "complicated" -- I am also ok to update the code for the client 
metric. But both should be the same?
    
    (Or maybe keep `... -> state` and add a "fancy" `toString()` overload to 
both `enum` (for client and thread) which model the state?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to