[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16549398#comment-16549398 ]
ASF GitHub Bot commented on KAFKA-3514: --------------------------------------- guozhangwang closed pull request #5382: KAFKA-3514: Remove min timestamp tracker URL: https://github.com/apache/kafka/pull/5382 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java deleted file mode 100644 index df35c3d69f1..00000000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.processor.internals; - -import java.util.LinkedList; - -/** - * MinTimestampTracker implements {@link TimestampTracker} that maintains the min - * timestamp of the maintained stamped elements. - */ -public class MinTimestampTracker<E> implements TimestampTracker<E> { - - // first element has the lowest timestamp and last element the highest - private final LinkedList<Stamped<E>> ascendingSubsequence = new LinkedList<>(); - - // in the case that incoming traffic is very small, the records maybe put and polled - // within a single iteration, in this case we need to remember the last polled - // record's timestamp - private long lastKnownTime = NOT_KNOWN; - - /** - * @throws NullPointerException if the element is null - */ - public void addElement(final Stamped<E> elem) { - if (elem == null) throw new NullPointerException(); - - Stamped<E> maxElem = ascendingSubsequence.peekLast(); - while (maxElem != null && maxElem.timestamp >= elem.timestamp) { - ascendingSubsequence.removeLast(); - maxElem = ascendingSubsequence.peekLast(); - } - ascendingSubsequence.offerLast(elem); //lower timestamps have been retained and all greater/equal removed - } - - public void removeElement(final Stamped<E> elem) { - if (elem == null) { - return; - } - - if (ascendingSubsequence.peekFirst() == elem) { - ascendingSubsequence.removeFirst(); - } - - if (ascendingSubsequence.isEmpty()) { - lastKnownTime = elem.timestamp; - } - - } - - public int size() { - return ascendingSubsequence.size(); - } - - /** - * @return the lowest tracked timestamp - */ - public long get() { - Stamped<E> stamped = ascendingSubsequence.peekFirst(); - - if (stamped == null) - return lastKnownTime; - else - return stamped.timestamp; - } - - public void clear() { - lastKnownTime = NOT_KNOWN; - ascendingSubsequence.clear(); - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index c809da9b4cb..34252bf2b4c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -58,7 +58,7 @@ RecordQueue queue() { nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::timestamp)); this.partitionQueues = partitionQueues; totalBuffered = 0; - streamTime = -1; + streamTime = RecordQueue.NOT_KNOWN; } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 22ef4d63f8c..86340bb82c1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -35,16 +35,19 @@ * timestamp is monotonically increasing such that once it is advanced, it will not be decremented. */ public class RecordQueue { + + static final long NOT_KNOWN = -1L; + + private final Logger log; private final SourceNode source; - private final TimestampExtractor timestampExtractor; private final TopicPartition partition; - private final ArrayDeque<StampedRecord> fifoQueue; - private final TimestampTracker<ConsumerRecord<Object, Object>> timeTracker; - private final RecordDeserializer recordDeserializer; private final ProcessorContext processorContext; - private final Logger log; + private final TimestampExtractor timestampExtractor; + private final RecordDeserializer recordDeserializer; + private final ArrayDeque<ConsumerRecord<byte[], byte[]>> fifoQueue; - private long partitionTime = TimestampTracker.NOT_KNOWN; + private long partitionTime = NOT_KNOWN; + private StampedRecord headRecord = null; RecordQueue(final TopicPartition partition, final SourceNode source, @@ -52,11 +55,10 @@ final DeserializationExceptionHandler deserializationExceptionHandler, final InternalProcessorContext processorContext, final LogContext logContext) { - this.partition = partition; this.source = source; - this.timestampExtractor = timestampExtractor; + this.partition = partition; this.fifoQueue = new ArrayDeque<>(); - this.timeTracker = new MinTimestampTracker<>(); + this.timestampExtractor = timestampExtractor; this.recordDeserializer = new RecordDeserializer( source, deserializationExceptionHandler, @@ -93,48 +95,10 @@ public TopicPartition partition() { */ int addRawRecords(final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) { for (final ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) { - - final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(processorContext, rawRecord); - if (record == null) { - // this only happens if the deserializer decides to skip. It has already logged the reason. - continue; - } - - final long timestamp; - try { - timestamp = timestampExtractor.extract(record, timeTracker.get()); - } catch (final StreamsException internalFatalExtractorException) { - throw internalFatalExtractorException; - } catch (final Exception fatalUserException) { - throw new StreamsException( - String.format("Fatal user code error in TimestampExtractor callback for record %s.", record), - fatalUserException); - } - log.trace("Source node {} extracted timestamp {} for record {}", source.name(), timestamp, record); - - // drop message if TS is invalid, i.e., negative - if (timestamp < 0) { - log.warn( - "Skipping record due to negative extracted timestamp. topic=[{}] partition=[{}] offset=[{}] extractedTimestamp=[{}] extractor=[{}]", - record.topic(), record.partition(), record.offset(), timestamp, timestampExtractor.getClass().getCanonicalName() - ); - ((StreamsMetricsImpl) processorContext.metrics()).skippedRecordsSensor().record(); - continue; - } - - final StampedRecord stampedRecord = new StampedRecord(record, timestamp); - fifoQueue.addLast(stampedRecord); - timeTracker.addElement(stampedRecord); + fifoQueue.addLast(rawRecord); } - // update the partition timestamp if its currently - // tracked min timestamp has exceed its value; this will - // usually only take effect for the first added batch - final long timestamp = timeTracker.get(); - - if (timestamp > partitionTime) { - partitionTime = timestamp; - } + maybeUpdateTimestamp(); return size(); } @@ -145,23 +109,12 @@ int addRawRecords(final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) { * @return StampedRecord */ public StampedRecord poll() { - final StampedRecord elem = fifoQueue.pollFirst(); - - if (elem == null) { - return null; - } - - timeTracker.removeElement(elem); + final StampedRecord recordToReturn = headRecord; + headRecord = null; - // only advance the partition timestamp if its currently - // tracked min timestamp has exceeded its value - final long timestamp = timeTracker.get(); + maybeUpdateTimestamp(); - if (timestamp > partitionTime) { - partitionTime = timestamp; - } - - return elem; + return recordToReturn; } /** @@ -170,7 +123,8 @@ public StampedRecord poll() { * @return the number of records */ public int size() { - return fifoQueue.size(); + // plus one deserialized head record for timestamp tracking + return fifoQueue.size() + (headRecord == null ? 0 : 1); } /** @@ -179,7 +133,7 @@ public int size() { * @return true if the queue is empty, otherwise false */ public boolean isEmpty() { - return fifoQueue.isEmpty(); + return fifoQueue.isEmpty() && headRecord == null; } /** @@ -196,16 +150,48 @@ public long timestamp() { */ public void clear() { fifoQueue.clear(); - timeTracker.clear(); - partitionTime = TimestampTracker.NOT_KNOWN; + headRecord = null; + partitionTime = NOT_KNOWN; } - /* - * Returns the timestamp tracker of the record queue - * - * This is only used for testing - */ - TimestampTracker<ConsumerRecord<Object, Object>> timeTracker() { - return timeTracker; + private void maybeUpdateTimestamp() { + while (headRecord == null && !fifoQueue.isEmpty()) { + final ConsumerRecord<byte[], byte[]> raw = fifoQueue.pollFirst(); + final ConsumerRecord<Object, Object> deserialized = recordDeserializer.deserialize(processorContext, raw); + + if (deserialized == null) { + // this only happens if the deserializer decides to skip. It has already logged the reason. + continue; + } + + final long timestamp; + try { + timestamp = timestampExtractor.extract(deserialized, partitionTime); + } catch (final StreamsException internalFatalExtractorException) { + throw internalFatalExtractorException; + } catch (final Exception fatalUserException) { + throw new StreamsException( + String.format("Fatal user code error in TimestampExtractor callback for record %s.", deserialized), + fatalUserException); + } + log.trace("Source node {} extracted timestamp {} for record {}", source.name(), timestamp, deserialized); + + // drop message if TS is invalid, i.e., negative + if (timestamp < 0) { + log.warn( + "Skipping record due to negative extracted timestamp. topic=[{}] partition=[{}] offset=[{}] extractedTimestamp=[{}] extractor=[{}]", + deserialized.topic(), deserialized.partition(), deserialized.offset(), timestamp, timestampExtractor.getClass().getCanonicalName() + ); + ((StreamsMetricsImpl) processorContext.metrics()).skippedRecordsSensor().record(); + continue; + } + + headRecord = new StampedRecord(deserialized, timestamp); + + // update the partition timestamp if the current head record's timestamp has exceed its value + if (timestamp > partitionTime) { + partitionTime = timestamp; + } + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index b01fd5b35ea..4c06c391fda 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -69,7 +69,8 @@ public void close() {} return Collections.emptyMap(); } }; - private long streamTime = TimestampTracker.NOT_KNOWN; + + private long streamTime = RecordQueue.NOT_KNOWN; StandbyContextImpl(final TaskId id, final StreamsConfig config, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 6af4c4bb132..7f121fe0df4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -716,7 +716,7 @@ public boolean maybePunctuateStreamTime() { // if the timestamp is not known yet, meaning there is not enough data accumulated // to reason stream partition time, then skip. - if (timestamp == TimestampTracker.NOT_KNOWN) { + if (timestamp == RecordQueue.NOT_KNOWN) { return false; } else { return streamTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.STREAM_TIME, this); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java deleted file mode 100644 index 30c816dd526..00000000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.processor.internals; - -/** - * TimestampTracker is a helper class for a sliding window implementation. - * It is assumed that stamped elements are added or removed in a FIFO manner. - * It maintains the timestamp, such as the min timestamp, the max timestamp, etc. - * of stamped elements that were added but not yet removed. - */ -public interface TimestampTracker<E> { - - long NOT_KNOWN = -1L; - - /** - * Adds a stamped elements to this tracker. - * - * @param elem the added element - */ - void addElement(Stamped<E> elem); - - /** - * Removed a stamped elements to this tracker. - * - * @param elem the removed element - */ - void removeElement(Stamped<E> elem); - - /** - * Returns the current tracked timestamp - * - * @return timestamp, or {@link #NOT_KNOWN} when empty - */ - long get(); - - /** - * Returns the size of internal structure. The meaning of "size" depends on the implementation. - * - * @return size - */ - int size(); - - /** - * Empty the tracker by removing any tracked stamped elements - */ - void clear(); -} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java deleted file mode 100644 index 24653e684c7..00000000000 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.processor.internals; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; - -import org.junit.Test; - -public class MinTimestampTrackerTest { - - private MinTimestampTracker<String> tracker = new MinTimestampTracker<>(); - - @Test - public void shouldReturnNotKnownTimestampWhenNoRecordsEverAdded() { - assertThat(tracker.get(), equalTo(TimestampTracker.NOT_KNOWN)); - } - - @Test - public void shouldReturnTimestampOfOnlyRecord() { - tracker.addElement(elem(100)); - assertThat(tracker.get(), equalTo(100L)); - } - - @Test - public void shouldReturnLowestAvailableTimestampFromAllInputs() { - tracker.addElement(elem(100)); - tracker.addElement(elem(99)); - tracker.addElement(elem(102)); - assertThat(tracker.get(), equalTo(99L)); - } - - @Test - public void shouldReturnLowestAvailableTimestampAfterPreviousLowestRemoved() { - final Stamped<String> lowest = elem(88); - tracker.addElement(lowest); - tracker.addElement(elem(101)); - tracker.addElement(elem(99)); - tracker.removeElement(lowest); - assertThat(tracker.get(), equalTo(99L)); - } - - @Test - public void shouldReturnLastKnownTimestampWhenAllElementsHaveBeenRemoved() { - final Stamped<String> record = elem(98); - tracker.addElement(record); - tracker.removeElement(record); - assertThat(tracker.get(), equalTo(98L)); - } - - @Test - public void shouldIgnoreNullRecordOnRemove() { - tracker.removeElement(null); - } - - @Test(expected = NullPointerException.class) - public void shouldThrowNullPointerExceptionWhenTryingToAddNullElement() { - tracker.addElement(null); - } - - private Stamped<String> elem(final long timestamp) { - return new Stamped<>("", timestamp); - } -} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index 9823ae1bc3a..b3123e46343 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -140,49 +140,49 @@ public void testTimeTracking() { record = group.nextRecord(info); // 1:[5, 2, 4] // 2:[4, 6] - // st: 3 (2's presence prevents it from advancing to 4) + // st: 4 as partition st is now {5, 4} assertEquals(partition1, info.partition()); assertEquals(3L, record.timestamp); assertEquals(5, group.numBuffered()); assertEquals(3, group.numBuffered(partition1)); assertEquals(2, group.numBuffered(partition2)); - assertEquals(3L, group.timestamp()); + assertEquals(4L, group.timestamp()); // get one record, time should not be advanced record = group.nextRecord(info); - // 1:[2, 4] - // 2:[4, 6] - // st: 3 (2's presence prevents it from advancing to 4) - assertEquals(partition1, info.partition()); - assertEquals(5L, record.timestamp); + // 1:[5, 2, 4] + // 2:[6] + // st: 5 as partition st is now {5, 6} + assertEquals(partition2, info.partition()); + assertEquals(4L, record.timestamp); assertEquals(4, group.numBuffered()); - assertEquals(2, group.numBuffered(partition1)); - assertEquals(2, group.numBuffered(partition2)); - assertEquals(3L, group.timestamp()); + assertEquals(3, group.numBuffered(partition1)); + assertEquals(1, group.numBuffered(partition2)); + assertEquals(5L, group.timestamp()); // get one more record, now time should be advanced record = group.nextRecord(info); - // 1:[4] - // 2:[4, 6] - // st: 4 + // 1:[2, 4] + // 2:[6] + // st: 5 assertEquals(partition1, info.partition()); - assertEquals(2L, record.timestamp); + assertEquals(5L, record.timestamp); assertEquals(3, group.numBuffered()); - assertEquals(1, group.numBuffered(partition1)); - assertEquals(2, group.numBuffered(partition2)); - assertEquals(4L, group.timestamp()); + assertEquals(2, group.numBuffered(partition1)); + assertEquals(1, group.numBuffered(partition2)); + assertEquals(5L, group.timestamp()); // get one more record, time should not be advanced record = group.nextRecord(info); // 1:[4] // 2:[6] - // st: 4 - assertEquals(partition2, info.partition()); - assertEquals(4L, record.timestamp); + // st: 5 + assertEquals(partition1, info.partition()); + assertEquals(2L, record.timestamp); assertEquals(2, group.numBuffered()); assertEquals(1, group.numBuffered(partition1)); assertEquals(1, group.numBuffered(partition2)); - assertEquals(4L, group.timestamp()); + assertEquals(5L, group.timestamp()); // get one more record, time should not be advanced record = group.nextRecord(info); @@ -194,7 +194,7 @@ public void testTimeTracking() { assertEquals(1, group.numBuffered()); assertEquals(0, group.numBuffered(partition1)); assertEquals(1, group.numBuffered(partition2)); - assertEquals(4L, group.timestamp()); + assertEquals(5L, group.timestamp()); // get one more record, time should not be advanced record = group.nextRecord(info); @@ -206,7 +206,7 @@ public void testTimeTracking() { assertEquals(0, group.numBuffered()); assertEquals(0, group.numBuffered(partition1)); assertEquals(0, group.numBuffered(partition2)); - assertEquals(4L, group.timestamp()); + assertEquals(5L, group.timestamp()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index 3ed9e3b61a0..cf1d63f1e1e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -102,7 +102,7 @@ public void testTimeTracking() { assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); - assertEquals(TimestampTracker.NOT_KNOWN, queue.timestamp()); + assertEquals(RecordQueue.NOT_KNOWN, queue.timestamp()); // add three 3 out-of-order records with timestamp 2, 1, 3 final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList( @@ -113,20 +113,17 @@ public void testTimeTracking() { queue.addRawRecords(list1); assertEquals(3, queue.size()); - assertEquals(1L, queue.timestamp()); - assertEquals(2, queue.timeTracker().size()); + assertEquals(2L, queue.timestamp()); // poll the first record, now with 1, 3 assertEquals(2L, queue.poll().timestamp); assertEquals(2, queue.size()); - assertEquals(1L, queue.timestamp()); - assertEquals(2, queue.timeTracker().size()); + assertEquals(2L, queue.timestamp()); // poll the second record, now with 3 assertEquals(1L, queue.poll().timestamp); assertEquals(1, queue.size()); assertEquals(3L, queue.timestamp()); - assertEquals(1, queue.timeTracker().size()); // add three 3 out-of-order records with timestamp 4, 1, 2 // now with 3, 4, 1, 2 @@ -139,28 +136,23 @@ public void testTimeTracking() { assertEquals(4, queue.size()); assertEquals(3L, queue.timestamp()); - assertEquals(2, queue.timeTracker().size()); // poll the third record, now with 4, 1, 2 assertEquals(3L, queue.poll().timestamp); assertEquals(3, queue.size()); - assertEquals(3L, queue.timestamp()); - assertEquals(2, queue.timeTracker().size()); + assertEquals(4L, queue.timestamp()); // poll the rest records assertEquals(4L, queue.poll().timestamp); - assertEquals(3L, queue.timestamp()); - assertEquals(2, queue.timeTracker().size()); + assertEquals(4L, queue.timestamp()); assertEquals(1L, queue.poll().timestamp); - assertEquals(3L, queue.timestamp()); - assertEquals(1, queue.timeTracker().size()); + assertEquals(4L, queue.timestamp()); assertEquals(2L, queue.poll().timestamp); assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); - assertEquals(3L, queue.timestamp()); - assertEquals(0, queue.timeTracker().size()); + assertEquals(4L, queue.timestamp()); // add three more records with 4, 5, 6 final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList( @@ -177,14 +169,12 @@ public void testTimeTracking() { assertEquals(4L, queue.poll().timestamp); assertEquals(2, queue.size()); assertEquals(5L, queue.timestamp()); - assertEquals(2, queue.timeTracker().size()); // clear the queue queue.clear(); assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); - assertEquals(0, queue.timeTracker().size()); - assertEquals(TimestampTracker.NOT_KNOWN, queue.timestamp()); + assertEquals(RecordQueue.NOT_KNOWN, queue.timestamp()); // re-insert the three records with 4, 5, 6 queue.addRawRecords(list3); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Stream timestamp computation needs some further thoughts > -------------------------------------------------------- > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Guozhang Wang > Assignee: Matthias J. Sax > Priority: Major > Labels: architecture > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. > *Update* > There is one more thing to consider (full discussion found here: > http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor) > {quote} > Let's assume the following case. > - a stream processor that uses the Processor API > - context.schedule(1000) is called in the init() > - the processor reads only one topic that has one partition > - using custom timestamp extractor, but that timestamp is just a wall > clock time > Image the following events: > 1., for 10 seconds I send in 5 messages / second > 2., does not send any messages for 3 seconds > 3., starts the 5 messages / second again > I see that punctuate() is not called during the 3 seconds when I do not > send any messages. This is ok according to the documentation, because > there is not any new messages to trigger the punctuate() call. When the > first few messages arrives after a restart the sending (point 3. above) I > see the following sequence of method calls: > 1., process() on the 1st message > 2., punctuate() is called 3 times > 3., process() on the 2nd message > 4., process() on each following message > What I would expect instead is that punctuate() is called first and then > process() is called on the messages, because the first message's timestamp > is already 3 seconds older then the last punctuate() was called, so the > first message belongs after the 3 punctuate() calls. > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)