cadonna commented on a change in pull request #9094: URL: https://github.com/apache/kafka/pull/9094#discussion_r462327050
########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ########## @@ -227,6 +234,14 @@ protected Bytes keyBytes(final K key) { return byteEntries; } + private void maybeRecordE2ELatency() { + if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) { Review comment: I think, you do not need to check for metrics with `e2eLatencySensor.hasMetrics()`. There should always be metrics within this sensor. `hasMetrics()` is used in `StreamsMetricsImpl#maybeMeasureLatency()` because some sensors may not contain any metrics due to the built-in metrics version. For instance, the destroy sensor exists for built-in metrics version 0.10.0-2.4 but not for latest. To avoid version checks in the record processing code, we just create an empty sensor and call record on it effectively not recording any metrics for this sensor for version latest. We do not hide newly added metrics if the built-in version is set to an older version. Same applies to the other uses of `hasMetrics()` introduced in this PR. ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java ########## @@ -248,4 +253,12 @@ public void close() { private Bytes keyBytes(final K key) { return Bytes.wrap(serdes.rawKey(key)); } + + private void maybeRecordE2ELatency() { + if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) { Review comment: Your approach makes sense to me. I agree that the latency should refer to the update in the state store and not to record itself. If a record updates the state more than once then latency should be measured each time. ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java ########## @@ -443,6 +447,25 @@ public static Sensor suppressionBufferSizeSensor(final String threadId, ); } + public static Sensor e2ELatencySensor(final String threadId, + final String taskId, + final String storeType, + final String storeName, + final StreamsMetricsImpl streamsMetrics) { + final Sensor sensor = streamsMetrics.storeLevelSensor(threadId, taskId, storeName, RECORD_E2E_LATENCY, RecordingLevel.TRACE); + final Map<String, String> tagMap = streamsMetrics.storeLevelTagMap(threadId, taskId, storeType, storeName); + addAvgAndMinAndMaxToSensor( + sensor, + STATE_STORE_LEVEL_GROUP, Review comment: You need to use the `stateStoreLevelGroup()` here instead of `STATE_STORE_LEVEL_GROUP` because the group name depends on the version and the store type. ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java ########## @@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String group0100To24, checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0); checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0); checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0); + checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, expectedNumberofE2ELatencyMetrics); + checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, expectedNumberofE2ELatencyMetrics); + checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, expectedNumberofE2ELatencyMetrics); Review comment: I agree with you, it should always be 1. It is the group of the metrics. See my comment in `StateStoreMetrics`. I am glad this test served its purpose, because I did not notice this in the unit tests! ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ########## @@ -468,38 +468,44 @@ public void shouldRecordE2ELatencyOnProcessForSourceNodes() { } @Test - public void shouldRecordE2ELatencyMinAndMax() { + public void shouldRecordE2ELatencyAvgAndMinAndMax() { time = new MockTime(0L, 0L, 0L); metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time); task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST); final String sourceNode = source1.name(); - final Metric maxMetric = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST); + final Metric avgMetric = getProcessorMetric("record-e2e-latency", "%s-avg", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST); final Metric minMetric = getProcessorMetric("record-e2e-latency", "%s-min", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST); + final Metric maxMetric = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST); + assertThat(avgMetric.metricValue(), equalTo(Double.NaN)); assertThat(minMetric.metricValue(), equalTo(Double.NaN)); assertThat(maxMetric.metricValue(), equalTo(Double.NaN)); // e2e latency = 10 task.maybeRecordE2ELatency(0L, 10L, sourceNode); Review comment: Could you test `maybeRecordE2ELatency()` through `process()` and `forward()`? Although you test `maybeRecordE2ELatency()`, you do not test if the recording is done during processing, but that is the crucial thing, IMO. ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java ########## @@ -327,6 +327,38 @@ public void shouldGetExpiredWindowRecordDropSensor() { assertThat(sensor, is(expectedSensor)); } + @Test + public void shouldGetRecordE2ELatencySensor() { + final String metricName = "record-e2e-latency"; + + final String e2eLatencyDescription = + "end-to-end latency of a record, measuring by comparing the record timestamp with the " + + "system time when it has been fully processed by the node"; + final String descriptionOfAvg = "The average " + e2eLatencyDescription; + final String descriptionOfMin = "The minimum " + e2eLatencyDescription; + final String descriptionOfMax = "The maximum " + e2eLatencyDescription; + + expect(streamsMetrics.storeLevelSensor(THREAD_ID, TASK_ID, STORE_NAME, metricName, RecordingLevel.TRACE)) + .andReturn(expectedSensor); + expect(streamsMetrics.storeLevelTagMap(THREAD_ID, TASK_ID, STORE_TYPE, STORE_NAME)).andReturn(storeTagMap); + StreamsMetricsImpl.addAvgAndMinAndMaxToSensor( + expectedSensor, + STORE_LEVEL_GROUP, Review comment: You need to make this dependent on the built-in metrics version by using instance variable `storeLevelGroup`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org