[ https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701020#comment-16701020 ]
ASF GitHub Bot commented on KAFKA-7223: --------------------------------------- guozhangwang closed pull request #5795: KAFKA-7223: Suppression Buffer Metrics URL: https://github.com/apache/kafka/pull/5795 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java index 5b0d8b59233..12c481307f7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java @@ -20,10 +20,17 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.Sum; +import org.apache.kafka.common.metrics.stats.Total; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP; public class Sensors { private Sensors() {} @@ -38,8 +45,8 @@ public static Sensor lateRecordDropSensor(final InternalProcessorContext context ); StreamsMetricsImpl.addInvocationRateAndCount( sensor, - "stream-processor-node-metrics", - metrics.tagMap("task-id", context.taskId().toString(), "processor-node-id", context.currentNode().name()), + PROCESSOR_NODE_METRICS_GROUP, + metrics.tagMap("task-id", context.taskId().toString(), PROCESSOR_NODE_ID_TAG, context.currentNode().name()), "late-record-drop" ); return sensor; @@ -75,4 +82,40 @@ public static Sensor recordLatenessSensor(final InternalProcessorContext context ); return sensor; } + + public static Sensor suppressionEmitSensor(final InternalProcessorContext context) { + final StreamsMetricsImpl metrics = context.metrics(); + + final Sensor sensor = metrics.nodeLevelSensor( + context.taskId().toString(), + context.currentNode().name(), + "suppression-emit", + Sensor.RecordingLevel.DEBUG + ); + + final Map<String, String> tags = metrics.tagMap( + "task-id", context.taskId().toString(), + PROCESSOR_NODE_ID_TAG, context.currentNode().name() + ); + + sensor.add( + new MetricName( + "suppression-emit-rate", + PROCESSOR_NODE_METRICS_GROUP, + "The average number of occurrence of suppression-emit operation per second.", + tags + ), + new Rate(TimeUnit.SECONDS, new Sum()) + ); + sensor.add( + new MetricName( + "suppression-emit-total", + PROCESSOR_NODE_METRICS_GROUP, + "The total number of occurrence of suppression-emit operations.", + tags + ), + new Total() + ); + return sensor; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java index 50e74a38fd6..06d5004f65f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java @@ -16,12 +16,14 @@ */ package org.apache.kafka.streams.kstream.internals.suppress; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.kstream.internals.metrics.Sensors; import org.apache.kafka.streams.kstream.internals.suppress.TimeDefinitions.TimeDefinition; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; @@ -42,9 +44,10 @@ private final BufferFullStrategy bufferFullStrategy; private final boolean shouldSuppressTombstones; private final String storeName; + private TimeOrderedKeyValueBuffer buffer; private InternalProcessorContext internalProcessorContext; - + private Sensor suppressionEmitSensor; private Serde<K> keySerde; private FullChangeSerde<V> valueSerde; @@ -68,6 +71,8 @@ public KTableSuppressProcessor(final SuppressedInternal<K> suppress, @Override public void init(final ProcessorContext context) { internalProcessorContext = (InternalProcessorContext) context; + suppressionEmitSensor = Sensors.suppressionEmitSensor(internalProcessorContext); + keySerde = keySerde == null ? (Serde<K>) context.keySerde() : keySerde; valueSerde = valueSerde == null ? FullChangeSerde.castOrWrap(context.valueSerde()) : valueSerde; buffer = Objects.requireNonNull((TimeOrderedKeyValueBuffer) context.getStateStore(storeName)); @@ -123,6 +128,7 @@ private void emit(final KeyValue<Bytes, ContextualRecord> toEmit) { try { final K key = keySerde.deserializer().deserialize(null, toEmit.key.get()); internalProcessorContext.forward(key, value); + suppressionEmitSensor.record(); } finally { internalProcessorContext.setRecordContext(prevRecordContext); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 8dc64173379..8483791e2e7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -31,6 +31,8 @@ import java.util.Map; import java.util.Set; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; @@ -165,15 +167,13 @@ Sensor sourceNodeForwardSensor() { private NodeMetrics(final StreamsMetricsImpl metrics, final String processorNodeName, final ProcessorContext context) { this.metrics = metrics; - final String group = "stream-processor-node-metrics"; final String taskName = context.taskId().toString(); - final Map<String, String> tagMap = metrics.tagMap("task-id", context.taskId().toString(), "processor-node-id", processorNodeName); - final Map<String, String> allTagMap = metrics.tagMap("task-id", context.taskId().toString(), "processor-node-id", "all"); + final Map<String, String> tagMap = metrics.tagMap("task-id", context.taskId().toString(), PROCESSOR_NODE_ID_TAG, processorNodeName); + final Map<String, String> allTagMap = metrics.tagMap("task-id", context.taskId().toString(), PROCESSOR_NODE_ID_TAG, "all"); nodeProcessTimeSensor = createTaskAndNodeLatencyAndThroughputSensors( "process", metrics, - group, taskName, processorNodeName, allTagMap, @@ -183,7 +183,6 @@ private NodeMetrics(final StreamsMetricsImpl metrics, final String processorNode nodePunctuateTimeSensor = createTaskAndNodeLatencyAndThroughputSensors( "punctuate", metrics, - group, taskName, processorNodeName, allTagMap, @@ -193,7 +192,6 @@ private NodeMetrics(final StreamsMetricsImpl metrics, final String processorNode nodeCreationSensor = createTaskAndNodeLatencyAndThroughputSensors( "create", metrics, - group, taskName, processorNodeName, allTagMap, @@ -204,7 +202,6 @@ private NodeMetrics(final StreamsMetricsImpl metrics, final String processorNode nodeDestructionSensor = createTaskAndNodeLatencyAndThroughputSensors( "destroy", metrics, - group, taskName, processorNodeName, allTagMap, @@ -214,7 +211,6 @@ private NodeMetrics(final StreamsMetricsImpl metrics, final String processorNode sourceNodeForwardSensor = createTaskAndNodeLatencyAndThroughputSensors( "forward", metrics, - group, taskName, processorNodeName, allTagMap, @@ -231,17 +227,16 @@ private void removeAllSensors() { private static Sensor createTaskAndNodeLatencyAndThroughputSensors(final String operation, final StreamsMetricsImpl metrics, - final String group, final String taskName, final String processorNodeName, final Map<String, String> taskTags, final Map<String, String> nodeTags) { final Sensor parent = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG); - addAvgMaxLatency(parent, group, taskTags, operation); - addInvocationRateAndCount(parent, group, taskTags, operation); + addAvgMaxLatency(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); + addInvocationRateAndCount(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); final Sensor sensor = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent); - addAvgMaxLatency(sensor, group, nodeTags, operation); - addInvocationRateAndCount(sensor, group, nodeTags, operation); + addAvgMaxLatency(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); + addInvocationRateAndCount(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); return sensor; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index 170311238ec..8ec2711e764 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -51,6 +51,9 @@ private static final String SENSOR_PREFIX_DELIMITER = "."; private static final String SENSOR_NAME_DELIMITER = ".s."; + public static final String PROCESSOR_NODE_METRICS_GROUP = "stream-processor-node-metrics"; + public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id"; + public StreamsMetricsImpl(final Metrics metrics, final String threadName) { Objects.requireNonNull(metrics, "Metrics cannot be null"); this.threadName = threadName; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java index d94f671b712..234ea05264a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java @@ -17,17 +17,20 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.BytesSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.internals.metrics.Sensors; import java.nio.ByteBuffer; import java.util.Collection; @@ -57,6 +60,8 @@ private long minTimestamp = Long.MAX_VALUE; private RecordCollector collector; private String changelogTopic; + private Sensor bufferSizeSensor; + private Sensor bufferCountSensor; private volatile boolean open; @@ -174,11 +179,16 @@ public boolean persistent() { @Override public void init(final ProcessorContext context, final StateStore root) { + final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context; + bufferSizeSensor = Sensors.createBufferSizeSensor(this, internalProcessorContext); + bufferCountSensor = Sensors.createBufferCountSensor(this, internalProcessorContext); + context.register(root, (RecordBatchingStateRestoreCallback) this::restoreBatch); if (loggingEnabled) { collector = ((RecordCollector.Supplier) context).recordCollector(); changelogTopic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName); } + updateBufferMetrics(); open = true; } @@ -189,12 +199,13 @@ public boolean isOpen() { @Override public void close() { + open = false; index.clear(); sortedMap.clear(); dirtyKeys.clear(); memBufferSize = 0; minTimestamp = Long.MAX_VALUE; - open = false; + updateBufferMetrics(); } @Override @@ -265,6 +276,7 @@ private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch ); } } + updateBufferMetrics(); } @@ -272,6 +284,7 @@ private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch public void evictWhile(final Supplier<Boolean> predicate, final Consumer<KeyValue<Bytes, ContextualRecord>> callback) { final Iterator<Map.Entry<BufferKey, ContextualRecord>> delegate = sortedMap.entrySet().iterator(); + int evictions = 0; if (predicate.get()) { Map.Entry<BufferKey, ContextualRecord> next = null; @@ -298,8 +311,13 @@ public void evictWhile(final Supplier<Boolean> predicate, next = null; minTimestamp = Long.MAX_VALUE; } + + evictions++; } } + if (evictions > 0) { + updateBufferMetrics(); + } } @Override @@ -308,6 +326,7 @@ public void put(final long time, final ContextualRecord value) { cleanPut(time, key, value); dirtyKeys.add(key); + updateBufferMetrics(); } private void cleanPut(final long time, final Bytes key, final ContextualRecord value) { @@ -355,4 +374,9 @@ private long computeRecordSize(final Bytes key, final ContextualRecord value) { } return size; } + + private void updateBufferMetrics() { + bufferSizeSensor.record(memBufferSize); + bufferCountSensor.record(index.size()); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java index fdbc7c813fe..13a39c652f5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java @@ -16,7 +16,13 @@ */ package org.apache.kafka.streams.state.internals.metrics; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Value; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import java.util.Map; @@ -25,7 +31,6 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; public final class Sensors { - private Sensors() {} public static Sensor createTaskAndStoreLatencyAndThroughputSensors(final Sensor.RecordingLevel level, @@ -44,5 +49,67 @@ public static Sensor createTaskAndStoreLatencyAndThroughputSensors(final Sensor. addInvocationRateAndCount(sensor, metricsGroup, storeTags, operation); return sensor; } + + public static Sensor createBufferSizeSensor(final StateStore store, + final InternalProcessorContext context) { + return getBufferSizeOrCountSensor(store, context, "size"); + } + + public static Sensor createBufferCountSensor(final StateStore store, + final InternalProcessorContext context) { + return getBufferSizeOrCountSensor(store, context, "count"); + } + + private static Sensor getBufferSizeOrCountSensor(final StateStore store, + final InternalProcessorContext context, + final String property) { + final StreamsMetricsImpl metrics = context.metrics(); + + final String sensorName = "suppression-buffer-" + property; + + final Sensor sensor = metrics.storeLevelSensor( + context.taskId().toString(), + store.name(), + sensorName, + Sensor.RecordingLevel.DEBUG + ); + + final String metricsGroup = "stream-buffer-metrics"; + + final Map<String, String> tags = metrics.tagMap( + "task-id", context.taskId().toString(), + "buffer-id", store.name() + ); + + sensor.add( + new MetricName( + sensorName + "-current", + metricsGroup, + "The current " + property + " of buffered records.", + tags), + new Value() + ); + + + sensor.add( + new MetricName( + sensorName + "-avg", + metricsGroup, + "The average " + property + " of buffered records.", + tags), + new Avg() + ); + + sensor.add( + new MetricName( + sensorName + "-max", + metricsGroup, + "The max " + property + " of buffered records.", + tags), + new Max() + ); + + return sensor; + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java new file mode 100644 index 00000000000..986dc6f1961 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.streams.kstream.Suppressed; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer; +import org.apache.kafka.test.MockInternalProcessorContext; +import org.hamcrest.Matcher; +import org.junit.Test; + +import java.time.Duration; +import java.util.Map; + +import static org.apache.kafka.common.serialization.Serdes.Long; +import static org.apache.kafka.common.serialization.Serdes.String; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.core.Is.is; + +@SuppressWarnings("PointlessArithmeticExpression") +public class KTableSuppressProcessorMetricsTest { + private static final long ARBITRARY_LONG = 5L; + + private static final MetricName EVICTION_TOTAL_METRIC = new MetricName( + "suppression-emit-total", + "stream-processor-node-metrics", + "The total number of occurrence of suppression-emit operations.", + mkMap( + mkEntry("client-id", "mock-processor-context-virtual-thread"), + mkEntry("task-id", "0_0"), + mkEntry("processor-node-id", "testNode") + ) + ); + + private static final MetricName EVICTION_RATE_METRIC = new MetricName( + "suppression-emit-rate", + "stream-processor-node-metrics", + "The average number of occurrence of suppression-emit operation per second.", + mkMap( + mkEntry("client-id", "mock-processor-context-virtual-thread"), + mkEntry("task-id", "0_0"), + mkEntry("processor-node-id", "testNode") + ) + ); + + private static final MetricName BUFFER_SIZE_AVG_METRIC = new MetricName( + "suppression-buffer-size-avg", + "stream-buffer-metrics", + "The average size of buffered records.", + mkMap( + mkEntry("client-id", "mock-processor-context-virtual-thread"), + mkEntry("task-id", "0_0"), + mkEntry("buffer-id", "test-store") + ) + ); + + private static final MetricName BUFFER_SIZE_CURRENT_METRIC = new MetricName( + "suppression-buffer-size-current", + "stream-buffer-metrics", + "The current size of buffered records.", + mkMap( + mkEntry("client-id", "mock-processor-context-virtual-thread"), + mkEntry("task-id", "0_0"), + mkEntry("buffer-id", "test-store") + ) + ); + + private static final MetricName BUFFER_SIZE_MAX_METRIC = new MetricName( + "suppression-buffer-size-max", + "stream-buffer-metrics", + "The max size of buffered records.", + mkMap( + mkEntry("client-id", "mock-processor-context-virtual-thread"), + mkEntry("task-id", "0_0"), + mkEntry("buffer-id", "test-store") + ) + ); + + private static final MetricName BUFFER_COUNT_AVG_METRIC = new MetricName( + "suppression-buffer-count-avg", + "stream-buffer-metrics", + "The average count of buffered records.", + mkMap( + mkEntry("client-id", "mock-processor-context-virtual-thread"), + mkEntry("task-id", "0_0"), + mkEntry("buffer-id", "test-store") + ) + ); + + private static final MetricName BUFFER_COUNT_CURRENT_METRIC = new MetricName( + "suppression-buffer-count-current", + "stream-buffer-metrics", + "The current count of buffered records.", + mkMap( + mkEntry("client-id", "mock-processor-context-virtual-thread"), + mkEntry("task-id", "0_0"), + mkEntry("buffer-id", "test-store") + ) + ); + + private static final MetricName BUFFER_COUNT_MAX_METRIC = new MetricName( + "suppression-buffer-count-max", + "stream-buffer-metrics", + "The max count of buffered records.", + mkMap( + mkEntry("client-id", "mock-processor-context-virtual-thread"), + mkEntry("task-id", "0_0"), + mkEntry("buffer-id", "test-store") + ) + ); + + @Test + public void shouldRecordMetrics() { + final String storeName = "test-store"; + + final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName) + .withLoggingDisabled() + .build(); + + final KTableSuppressProcessor<String, Long> processor = + new KTableSuppressProcessor<>( + (SuppressedInternal<String>) Suppressed.<String>untilTimeLimit(Duration.ofDays(100), maxRecords(1)), + storeName, + String(), + new FullChangeSerde<>(Long()) + ); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + context.setCurrentNode(new ProcessorNode("testNode")); + + buffer.init(context, buffer); + processor.init(context); + + final long timestamp = 100L; + context.setStreamTime(timestamp); + context.setRecordMetadata("", 0, 0L, null, timestamp); + final String key = "longKey"; + final Change<Long> value = new Change<>(null, ARBITRARY_LONG); + processor.process(key, value); + + { + final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics(); + + verifyMetric(metrics, EVICTION_RATE_METRIC, is(0.0)); + verifyMetric(metrics, EVICTION_TOTAL_METRIC, is(0.0)); + verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(25.5)); + verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(51.0)); + verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(51.0)); + verifyMetric(metrics, BUFFER_COUNT_AVG_METRIC, is(0.5)); + verifyMetric(metrics, BUFFER_COUNT_CURRENT_METRIC, is(1.0)); + verifyMetric(metrics, BUFFER_COUNT_MAX_METRIC, is(1.0)); + } + + context.setStreamTime(timestamp + 1); + context.setRecordMetadata("", 0, 1L, null, timestamp + 1); + processor.process("key", value); + + { + final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics(); + + verifyMetric(metrics, EVICTION_RATE_METRIC, greaterThan(0.0)); + verifyMetric(metrics, EVICTION_TOTAL_METRIC, is(1.0)); + verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(49.0)); + verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(47.0)); + verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(98.0)); + verifyMetric(metrics, BUFFER_COUNT_AVG_METRIC, is(1.0)); + verifyMetric(metrics, BUFFER_COUNT_CURRENT_METRIC, is(1.0)); + verifyMetric(metrics, BUFFER_COUNT_MAX_METRIC, is(2.0)); + } + } + + @SuppressWarnings("unchecked") + private <T> void verifyMetric(final Map<MetricName, ? extends Metric> metrics, + final MetricName metricName, + final Matcher<T> matcher) { + assertThat(metrics.get(metricName).metricName().description(), is(metricName.description())); + assertThat((T) metrics.get(metricName).metricValue(), matcher); + + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java index 002ace2ed28..335fae1875c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java @@ -77,9 +77,16 @@ .withLoggingDisabled() .build(); final KTableSuppressProcessor<K, V> processor = - new KTableSuppressProcessor<>(getImpl(suppressed), storeName, keySerde, new FullChangeSerde<>(valueSerde)); + new KTableSuppressProcessor<>( + (SuppressedInternal<K>) suppressed, + storeName, + keySerde, + new FullChangeSerde<>(valueSerde) + ); final MockInternalProcessorContext context = new MockInternalProcessorContext(); + context.setCurrentNode(new ProcessorNode("testNode")); + buffer.init(context, buffer); processor.init(context); @@ -461,10 +468,6 @@ public boolean matches(final Object item) { }; } - private static <K> SuppressedInternal<K> getImpl(final Suppressed<K> suppressed) { - return (SuppressedInternal<K>) suppressed; - } - private <K> Serde<Windowed<K>> timeWindowedSerdeFrom(final Class<K> rawType, final long windowSize) { final Serde<K> kSerde = Serdes.serdeFrom(rawType); return new Serdes.WrapperSerde<>( diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java index 88a7fe79e2b..4c3a6b2e881 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java @@ -19,7 +19,9 @@ import java.time.Duration; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.KeyValue; @@ -208,7 +210,12 @@ public MockProcessorContext(final Properties config, final TaskId taskId, final this.taskId = taskId; this.config = streamsConfig; this.stateDir = stateDir; - this.metrics = new StreamsMetricsImpl(new Metrics(), "mock-processor-context-virtual-thread"); + final MetricConfig metricConfig = new MetricConfig(); + metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG); + this.metrics = new StreamsMetricsImpl( + new Metrics(metricConfig), + "mock-processor-context-virtual-thread" + ); } @Override ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KIP-328: Add in-memory Suppression > ---------------------------------- > > Key: KAFKA-7223 > URL: https://issues.apache.org/jira/browse/KAFKA-7223 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: John Roesler > Assignee: John Roesler > Priority: Major > > As described in > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.] > > This ticket is to implement Suppress, but only for in-memory buffers. > (depends on KAFKA-7222) -- This message was sent by Atlassian JIRA (v7.6.3#76005)