[
https://issues.apache.org/jira/browse/KAFKA-5368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308408#comment-16308408
]
ASF GitHub Bot commented on KAFKA-5368:
---------------------------------------
guozhangwang closed pull request #4365: KAFKA-5368: Add test for
skipped-records metric
URL: https://github.com/apache/kafka/pull/4365
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 8bcd6fb4ed4..42504655117 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -23,10 +23,12 @@
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
@@ -37,6 +39,7 @@
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
+import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
@@ -825,6 +828,51 @@ public boolean conditionMet() {
}
}
+ @Test
+ public void shouldReportSkippedRecordsForInvalidTimestamps() throws
Exception {
+ internalTopologyBuilder.addSource(null, "source1", null, null, null,
topic1);
+
+ final Properties config = configProps(false);
+
config.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
LogAndSkipOnInvalidTimestamp.class.getName());
+ final StreamThread thread = createStreamThread(clientId, new
StreamsConfig(config), false);
+
+ thread.setState(StreamThread.State.RUNNING);
+ thread.setState(StreamThread.State.PARTITIONS_REVOKED);
+
+ final Set<TopicPartition> assignedPartitions =
Collections.singleton(new TopicPartition(t1p1.topic(), t1p1.partition()));
+ thread.taskManager().setAssignmentMetadata(
+ Collections.singletonMap(
+ new TaskId(0, t1p1.partition()),
+ assignedPartitions),
+ Collections.<TaskId, Set<TopicPartition>>emptyMap());
+ thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+ final MockConsumer<byte[], byte[]> mockConsumer =
(MockConsumer<byte[], byte[]>) thread.consumer;
+ mockConsumer.assign(Collections.singleton(t1p1));
+ mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1,
0L));
+
+ final MetricName skippedTotalMetric =
metrics.metricName("skipped-records-total", "stream-metrics",
Collections.singletonMap("client-id", thread.getName()));
+ assertEquals(0.0, metrics.metric(skippedTotalMetric).metricValue());
+
+ long offset = -1;
+ mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
+ mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
+ thread.runOnce(-1);
+ assertEquals(2.0, metrics.metric(skippedTotalMetric).metricValue());
+
+ mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
+ mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
+ mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
+ mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
+ thread.runOnce(-1);
+ assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue());
+
+ mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
+ mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
+ thread.runOnce(-1);
+ assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue());
+ }
+
private void assertThreadMetadataHasEmptyTasksWithState(ThreadMetadata
metadata, StreamThread.State state) {
assertEquals(state.name(), metadata.threadState());
assertTrue(metadata.activeTasks().isEmpty());
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Kafka Streams skipped-records-rate sensor produces nonzero values when the
> timestamps are valid
> -----------------------------------------------------------------------------------------------
>
> Key: KAFKA-5368
> URL: https://issues.apache.org/jira/browse/KAFKA-5368
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Hamidreza Afzali
> Assignee: Hamidreza Afzali
> Fix For: 0.11.0.0
>
>
> Kafka Streams skipped-records-rate sensor produces nonzero values even when
> the timestamps are valid and records are processed. The values are equal to
> poll-rate.
> Related issue: KAFKA-5055
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)