[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16567670#comment-16567670 ]
ASF GitHub Bot commented on KAFKA-3514: --------------------------------------- guozhangwang closed pull request #5398: KAFKA-3514: Part II, Choose tasks with data on all partitions to process URL: https://github.com/apache/kafka/pull/5398 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java index f98e6356a22..0a839655748 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java @@ -87,11 +87,13 @@ int maybeCommit() { */ int process() { int processed = 0; + final Iterator<Map.Entry<TaskId, StreamTask>> it = running.entrySet().iterator(); while (it.hasNext()) { final StreamTask task = it.next().getValue(); + try { - if (task.process()) { + if (task.isProcessable() && task.process()) { processed++; } } catch (final TaskMigratedException e) { @@ -108,6 +110,7 @@ int process() { throw e; } } + return processed; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 34252bf2b4c..f17c63acd2f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -27,15 +27,23 @@ /** * A PartitionGroup is composed from a set of partitions. It also maintains the timestamp of this - * group, hence the associated task as the min timestamp across all partitions in the group. + * group, a.k.a. the stream time of the associated task. It is defined as the maximum timestamp of + * all the records having been retrieved for processing from this PartitionGroup so far. + * + * We decide from which partition to retrieve the next record to process based on partitions' timestamps. + * The timestamp of a specific partition is initialized as UNKNOWN (-1), and is updated with the head record's timestamp + * if it is smaller (i.e. it should be monotonically increasing); when the partition's buffer becomes empty and there is + * no head record, the partition's timestamp will not be updated any more. */ public class PartitionGroup { private final Map<TopicPartition, RecordQueue> partitionQueues; - private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime; + private long streamTime; private int totalBuffered; + private boolean allBuffered; + public static class RecordInfo { RecordQueue queue; @@ -53,11 +61,11 @@ RecordQueue queue() { } } - PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues) { nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::timestamp)); this.partitionQueues = partitionQueues; totalBuffered = 0; + allBuffered = false; streamTime = RecordQueue.NOT_KNOWN; } @@ -79,17 +87,16 @@ StampedRecord nextRecord(final RecordInfo info) { if (record != null) { --totalBuffered; - if (!queue.isEmpty()) { + if (queue.isEmpty()) { + // if a certain queue has been drained, reset the flag + allBuffered = false; + } else { nonEmptyQueuesByTime.offer(queue); } - // Since this was previously a queue with min timestamp, - // streamTime could only advance if this queue's time did. - if (queue.timestamp() > streamTime) { - computeStreamTime(); - } + // always update the stream time to the record's timestamp yet to be processed if it is larger + streamTime = Math.max(streamTime, record.timestamp); } - } return record; @@ -106,17 +113,18 @@ int addRawRecords(final TopicPartition partition, final Iterable<ConsumerRecord< final RecordQueue recordQueue = partitionQueues.get(partition); final int oldSize = recordQueue.size(); - final long oldTimestamp = recordQueue.timestamp(); final int newSize = recordQueue.addRawRecords(rawRecords); // add this record queue to be considered for processing in the future if it was empty before if (oldSize == 0 && newSize > 0) { nonEmptyQueuesByTime.offer(recordQueue); - } - // Adding to this queue could only advance streamTime if it was previously the queue with min timestamp (= streamTime) - if (oldTimestamp <= streamTime && recordQueue.timestamp() > streamTime) { - computeStreamTime(); + // if all partitions now are non-empty, set the flag + // we do not need to update the stream time here since this task will definitely be + // processed next, and hence the stream time will be updated when we retrieved records by then + if (nonEmptyQueuesByTime.size() == this.partitionQueues.size()) { + allBuffered = true; + } } totalBuffered += newSize - oldSize; @@ -136,18 +144,6 @@ public long timestamp() { return streamTime; } - private void computeStreamTime() { - // we should always return the smallest timestamp of all partitions - // to avoid group partition time goes backward - long timestamp = Long.MAX_VALUE; - for (final RecordQueue queue : partitionQueues.values()) { - if (queue.timestamp() < timestamp) { - timestamp = queue.timestamp(); - } - } - this.streamTime = timestamp; - } - /** * @throws IllegalStateException if the record's partition does not belong to this partition group */ @@ -165,7 +161,12 @@ int numBuffered() { return totalBuffered; } + boolean allPartitionsBuffered() { + return allBuffered; + } + public void close() { + clear(); partitionQueues.clear(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 7f121fe0df4..6f3b031b8db 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -59,6 +59,8 @@ private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null); + private static final int WAIT_ON_PARTIAL_INPUT = 5; + private final PartitionGroup partitionGroup; private final PartitionGroup.RecordInfo recordInfo; private final PunctuationQueue streamTimePunctuationQueue; @@ -72,12 +74,14 @@ private boolean commitRequested = false; private boolean commitOffsetNeeded = false; private boolean transactionInFlight = false; + private int waits = WAIT_ON_PARTIAL_INPUT; private final Time time; private final TaskMetrics taskMetrics; protected static final class TaskMetrics { final StreamsMetricsImpl metrics; final Sensor taskCommitTimeSensor; + final Sensor taskEnforcedProcessSensor; private final String taskName; @@ -108,7 +112,7 @@ // add the operation metrics with additional tags final Map<String, String> tagMap = metrics.tagMap("task-id", taskName); - taskCommitTimeSensor = metrics.taskLevelSensor("commit", taskName, Sensor.RecordingLevel.DEBUG, parent); + taskCommitTimeSensor = metrics.taskLevelSensor(taskName, "commit", Sensor.RecordingLevel.DEBUG, parent); taskCommitTimeSensor.add( new MetricName("commit-latency-avg", group, "The average latency of commit operation.", tagMap), new Avg() @@ -125,6 +129,18 @@ new MetricName("commit-total", group, "The total number of occurrence of commit operations.", tagMap), new Count() ); + + // add the metrics for enforced processing + taskEnforcedProcessSensor = metrics.taskLevelSensor(taskName, "enforced-process", Sensor.RecordingLevel.DEBUG, parent); + taskEnforcedProcessSensor.add( + new MetricName("enforced-process-rate", group, "The average number of occurrence of enforced-process per second.", tagMap), + new Rate(TimeUnit.SECONDS, new Count()) + ); + taskEnforcedProcessSensor.add( + new MetricName("enforced-process-total", group, "The total number of occurrence of enforced-process operations.", tagMap), + new Count() + ); + } void removeAllSensors() { @@ -263,6 +279,21 @@ public void resume() { log.debug("Resuming"); } + /** + * An active task is processable if its buffer contains data for all of its input source topic partitions + */ + public boolean isProcessable() { + if (partitionGroup.allPartitionsBuffered()) { + return true; + } else if (partitionGroup.numBuffered() > 0 && --waits < 0) { + taskMetrics.taskEnforcedProcessSensor.record(); + waits = WAIT_ON_PARTIAL_INPUT; + return true; + } + + return false; + } + /** * Process one record. * diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 428aa1d938b..42f55efbf63 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -831,18 +831,21 @@ long runOnce(final long recordsProcessedBeforeCommit) { } } - if (records != null && !records.isEmpty() && taskManager.hasActiveRunningTasks()) { + if (records != null && !records.isEmpty()) { streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs); addRecordsToTasks(records); + } + + if (taskManager.hasActiveRunningTasks()) { final long totalProcessed = processAndMaybeCommit(recordsProcessedBeforeCommit); if (totalProcessed > 0) { final long processLatency = computeLatency(); streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessed, timerStartedMs); processedBeforeCommit = adjustRecordsProcessedBeforeCommit( - recordsProcessedBeforeCommit, - totalProcessed, - processLatency, - commitTimeMs); + recordsProcessedBeforeCommit, + totalProcessed, + processLatency, + commitTimeMs); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index 662ded553ad..e99a5b3bb23 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -88,13 +88,13 @@ public final void removeAllThreadLevelSensors() { } public final Sensor taskLevelSensor(final String taskName, - final String sensorName, - final Sensor.RecordingLevel recordingLevel, - final Sensor... parents) { + final String sensorName, + final Sensor.RecordingLevel recordingLevel, + final Sensor... parents) { final String key = threadName + "." + taskName; synchronized (taskLevelSensors) { if (!taskLevelSensors.containsKey(key)) { - taskLevelSensors.put(key, new LinkedList<String>()); + taskLevelSensors.put(key, new LinkedList<>()); } final String fullSensorName = key + "." + sensorName; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java index 77b9c1e8560..12b4cf30240 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java @@ -370,7 +370,7 @@ private NamedCacheMetrics(final StreamsMetricsImpl metrics, final String cacheNa "record-cache-id", "all", "task-id", taskName ); - final Sensor taskLevelHitRatioSensor = metrics.taskLevelSensor("hitRatio", taskName, Sensor.RecordingLevel.DEBUG); + final Sensor taskLevelHitRatioSensor = metrics.taskLevelSensor(taskName, "hitRatio", Sensor.RecordingLevel.DEBUG); taskLevelHitRatioSensor.add( new MetricName("hitRatio-avg", group, "The average cache hit ratio.", allMetricTags), new Avg() diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 749d74887e5..1a78ed356bb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -50,7 +50,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -350,6 +352,45 @@ public static void waitForCompletion(final KafkaStreams streams, return accumData; } + public static <K, V> List<KeyValue<K, V>> waitUntilFinalKeyValueRecordsReceived(final Properties consumerConfig, + final String topic, + final List<KeyValue<K, V>> expectedRecords) throws InterruptedException { + return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, DEFAULT_TIMEOUT); + } + + public static <K, V> List<KeyValue<K, V>> waitUntilFinalKeyValueRecordsReceived(final Properties consumerConfig, + final String topic, + final List<KeyValue<K, V>> expectedRecords, + final long waitTime) throws InterruptedException { + final List<KeyValue<K, V>> accumData = new ArrayList<>(); + try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) { + final TestCondition valuesRead = new TestCondition() { + @Override + public boolean conditionMet() { + final List<KeyValue<K, V>> readData = + readKeyValues(topic, consumer, waitTime, expectedRecords.size()); + accumData.addAll(readData); + + final Map<K, V> finalData = new HashMap<>(); + + for (final KeyValue<K, V> keyValue : accumData) { + finalData.put(keyValue.key, keyValue.value); + } + + for (final KeyValue<K, V> keyValue : expectedRecords) { + if (!keyValue.value.equals(finalData.get(keyValue.key))) + return false; + } + + return true; + } + }; + final String conditionDetails = "Did not receive all " + expectedRecords + " records from topic " + topic; + TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails); + } + return accumData; + } + public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(final Properties consumerConfig, final String topic, final int expectedNumRecords, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java index 8a8d6255c46..7efe653578f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java @@ -337,6 +337,7 @@ public void shouldCloseTaskOnMaybeCommitIfTaskMigratedException() { @Test public void shouldCloseTaskOnProcessesIfTaskMigratedException() { mockTaskInitialization(); + EasyMock.expect(t1.isProcessable()).andReturn(true); t1.process(); EasyMock.expectLastCall().andThrow(new TaskMigratedException()); t1.close(false, true); @@ -353,6 +354,32 @@ public void shouldCloseTaskOnProcessesIfTaskMigratedException() { EasyMock.verify(t1); } + @Test + public void shouldNotProcessUnprocessableTasks() { + mockTaskInitialization(); + EasyMock.expect(t1.isProcessable()).andReturn(false); + EasyMock.replay(t1); + addAndInitTask(); + + assertThat(assignedTasks.process(), equalTo(0)); + + EasyMock.verify(t1); + } + + @Test + public void shouldAlwaysProcessProcessableTasks() { + mockTaskInitialization(); + EasyMock.expect(t1.isProcessable()).andReturn(true); + EasyMock.expect(t1.process()).andReturn(true).once(); + EasyMock.replay(t1); + + addAndInitTask(); + + assertThat(assignedTasks.process(), equalTo(1)); + + EasyMock.verify(t1); + } + @Test public void shouldPunctuateRunningTasks() { mockTaskInitialization(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index b3123e46343..2df4f66cb85 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -88,12 +88,12 @@ public void testTimeTracking() { group.addRawRecords(partition2, list2); // 1:[1, 3, 5] // 2:[2, 4, 6] - // st: 1 + // st: -1 since no records was being processed yet assertEquals(6, group.numBuffered()); assertEquals(3, group.numBuffered(partition1)); assertEquals(3, group.numBuffered(partition2)); - assertEquals(1L, group.timestamp()); + assertEquals(-1L, group.timestamp()); StampedRecord record; final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo(); @@ -108,7 +108,7 @@ public void testTimeTracking() { assertEquals(5, group.numBuffered()); assertEquals(2, group.numBuffered(partition1)); assertEquals(3, group.numBuffered(partition2)); - assertEquals(2L, group.timestamp()); + assertEquals(1L, group.timestamp()); // get one record, now the time should be advanced record = group.nextRecord(info); @@ -120,7 +120,7 @@ public void testTimeTracking() { assertEquals(4, group.numBuffered()); assertEquals(2, group.numBuffered(partition1)); assertEquals(2, group.numBuffered(partition2)); - assertEquals(3L, group.timestamp()); + assertEquals(2L, group.timestamp()); // add 2 more records with timestamp 2, 4 to partition-1 final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList( @@ -134,7 +134,7 @@ public void testTimeTracking() { assertEquals(6, group.numBuffered()); assertEquals(4, group.numBuffered(partition1)); assertEquals(2, group.numBuffered(partition2)); - assertEquals(3L, group.timestamp()); + assertEquals(2L, group.timestamp()); // get one record, time should not be advanced record = group.nextRecord(info); @@ -146,7 +146,7 @@ public void testTimeTracking() { assertEquals(5, group.numBuffered()); assertEquals(3, group.numBuffered(partition1)); assertEquals(2, group.numBuffered(partition2)); - assertEquals(4L, group.timestamp()); + assertEquals(3L, group.timestamp()); // get one record, time should not be advanced record = group.nextRecord(info); @@ -158,7 +158,7 @@ public void testTimeTracking() { assertEquals(4, group.numBuffered()); assertEquals(3, group.numBuffered(partition1)); assertEquals(1, group.numBuffered(partition2)); - assertEquals(5L, group.timestamp()); + assertEquals(4L, group.timestamp()); // get one more record, now time should be advanced record = group.nextRecord(info); @@ -206,7 +206,7 @@ public void testTimeTracking() { assertEquals(0, group.numBuffered()); assertEquals(0, group.numBuffered(partition1)); assertEquals(0, group.numBuffered(partition2)); - assertEquals(5L, group.timestamp()); + assertEquals(6L, group.timestamp()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index bfbb2a00270..146bcb3b54e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -57,6 +57,7 @@ import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.Arrays; import java.util.Collections; @@ -111,7 +112,7 @@ public void close() { private final ProcessorTopology topology = ProcessorTopology.withSources( Utils.<ProcessorNode>mkList(source1, source2, processorStreamTime, processorSystemTime), - mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, (SourceNode) source2)) + mkMap(mkEntry(topic1, source1), mkEntry(topic2, source2)) ); private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); @@ -307,97 +308,6 @@ public void testPauseResume() { assertEquals(0, consumer.paused().size()); } - @SuppressWarnings("unchecked") - @Test - public void testMaybePunctuateStreamTime() { - task = createStatelessTask(createConfig(false)); - task.initializeStateStores(); - task.initializeTopology(); - - task.addRecords(partition1, Arrays.asList( - getConsumerRecord(partition1, 0), - getConsumerRecord(partition1, 20), - getConsumerRecord(partition1, 32), - getConsumerRecord(partition1, 40), - getConsumerRecord(partition1, 60) - )); - - task.addRecords(partition2, Arrays.asList( - getConsumerRecord(partition2, 25), - getConsumerRecord(partition2, 35), - getConsumerRecord(partition2, 45), - getConsumerRecord(partition2, 61) - )); - - assertTrue(task.maybePunctuateStreamTime()); - - assertTrue(task.process()); - assertEquals(8, task.numBuffered()); - assertEquals(1, source1.numReceived); - assertEquals(0, source2.numReceived); - - assertTrue(task.maybePunctuateStreamTime()); - - assertTrue(task.process()); - assertEquals(7, task.numBuffered()); - assertEquals(2, source1.numReceived); - assertEquals(0, source2.numReceived); - - assertFalse(task.maybePunctuateStreamTime()); - - assertTrue(task.process()); - assertEquals(6, task.numBuffered()); - assertEquals(2, source1.numReceived); - assertEquals(1, source2.numReceived); - - assertTrue(task.maybePunctuateStreamTime()); - - assertTrue(task.process()); - assertEquals(5, task.numBuffered()); - assertEquals(3, source1.numReceived); - assertEquals(1, source2.numReceived); - - assertFalse(task.maybePunctuateStreamTime()); - - assertTrue(task.process()); - assertEquals(4, task.numBuffered()); - assertEquals(3, source1.numReceived); - assertEquals(2, source2.numReceived); - - assertTrue(task.maybePunctuateStreamTime()); - - assertTrue(task.process()); - assertEquals(3, task.numBuffered()); - assertEquals(4, source1.numReceived); - assertEquals(2, source2.numReceived); - - assertFalse(task.maybePunctuateStreamTime()); - - assertTrue(task.process()); - assertEquals(2, task.numBuffered()); - assertEquals(4, source1.numReceived); - assertEquals(3, source2.numReceived); - - assertTrue(task.maybePunctuateStreamTime()); - - assertTrue(task.process()); - assertEquals(1, task.numBuffered()); - assertEquals(5, source1.numReceived); - assertEquals(3, source2.numReceived); - - assertFalse(task.maybePunctuateStreamTime()); - - assertTrue(task.process()); - assertEquals(0, task.numBuffered()); - assertEquals(5, source1.numReceived); - assertEquals(4, source2.numReceived); - - assertFalse(task.process()); - assertFalse(task.maybePunctuateStreamTime()); - - processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 0L, 20L, 32L, 40L, 60L); - } - @SuppressWarnings("unchecked") @Test public void shouldPunctuateOnceStreamTimeAfterGap() { @@ -419,64 +329,67 @@ public void shouldPunctuateOnceStreamTimeAfterGap() { getConsumerRecord(partition2, 161) )); - assertTrue(task.maybePunctuateStreamTime()); // punctuate at 20 + // st: -1 + assertFalse(task.maybePunctuateStreamTime()); // punctuate at 20 + // st: 20 assertTrue(task.process()); assertEquals(7, task.numBuffered()); assertEquals(1, source1.numReceived); assertEquals(0, source2.numReceived); + assertTrue(task.maybePunctuateStreamTime()); - assertFalse(task.maybePunctuateStreamTime()); - + // st: 25 assertTrue(task.process()); assertEquals(6, task.numBuffered()); assertEquals(1, source1.numReceived); assertEquals(1, source2.numReceived); - - assertTrue(task.maybePunctuateStreamTime()); // punctuate at 142 - - // only one punctuation after 100ms gap assertFalse(task.maybePunctuateStreamTime()); + // st: 142 + // punctuate at 142 assertTrue(task.process()); assertEquals(5, task.numBuffered()); assertEquals(2, source1.numReceived); assertEquals(1, source2.numReceived); + assertTrue(task.maybePunctuateStreamTime()); - assertFalse(task.maybePunctuateStreamTime()); - + // st: 145 + // only one punctuation after 100ms gap assertTrue(task.process()); assertEquals(4, task.numBuffered()); assertEquals(2, source1.numReceived); assertEquals(2, source2.numReceived); + assertFalse(task.maybePunctuateStreamTime()); - assertTrue(task.maybePunctuateStreamTime()); // punctuate at 155 - + // st: 155 + // punctuate at 155 assertTrue(task.process()); assertEquals(3, task.numBuffered()); assertEquals(3, source1.numReceived); assertEquals(2, source2.numReceived); + assertTrue(task.maybePunctuateStreamTime()); - assertFalse(task.maybePunctuateStreamTime()); - + // st: 159 assertTrue(task.process()); assertEquals(2, task.numBuffered()); assertEquals(3, source1.numReceived); assertEquals(3, source2.numReceived); + assertFalse(task.maybePunctuateStreamTime()); - assertTrue(task.maybePunctuateStreamTime()); // punctuate at 160, still aligned on the initial punctuation - + // st: 160, aligned at 0 assertTrue(task.process()); assertEquals(1, task.numBuffered()); assertEquals(4, source1.numReceived); assertEquals(3, source2.numReceived); + assertTrue(task.maybePunctuateStreamTime()); - assertFalse(task.maybePunctuateStreamTime()); - + // st: 161 assertTrue(task.process()); assertEquals(0, task.numBuffered()); assertEquals(4, source1.numReceived); assertEquals(4, source2.numReceived); + assertFalse(task.maybePunctuateStreamTime()); assertFalse(task.process()); assertFalse(task.maybePunctuateStreamTime()); @@ -484,9 +397,8 @@ public void shouldPunctuateOnceStreamTimeAfterGap() { processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L, 142L, 155L, 160L); } - @SuppressWarnings("unchecked") @Test - public void testCancelPunctuateStreamTime() { + public void shouldRespectPunctuateCancellationStreamTime() { task = createStatelessTask(createConfig(false)); task.initializeStateStores(); task.initializeTopology(); @@ -503,12 +415,19 @@ public void testCancelPunctuateStreamTime() { getConsumerRecord(partition2, 45) )); + assertFalse(task.maybePunctuateStreamTime()); + + // st is now 20 + assertTrue(task.process()); + assertTrue(task.maybePunctuateStreamTime()); + // st is now 25 assertTrue(task.process()); assertFalse(task.maybePunctuateStreamTime()); + // st is now 30 assertTrue(task.process()); processorStreamTime.mockProcessor.scheduleCancellable.cancel(); @@ -518,6 +437,61 @@ public void testCancelPunctuateStreamTime() { processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L); } + @Test + public void shouldRespectPunctuateCancellationSystemTime() { + task = createStatelessTask(createConfig(false)); + task.initializeStateStores(); + task.initializeTopology(); + final long now = time.milliseconds(); + time.sleep(10); + assertTrue(task.maybePunctuateSystemTime()); + processorSystemTime.mockProcessor.scheduleCancellable.cancel(); + time.sleep(10); + assertFalse(task.maybePunctuateSystemTime()); + processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10); + } + + @Test + public void shouldBeProcessableIfAllPartitionsBuffered() { + task = createStatelessTask(createConfig(false)); + task.initializeStateStores(); + task.initializeTopology(); + + assertFalse(task.isProcessable()); + + final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array(); + + task.addRecords(partition1, Collections.singleton(new ConsumerRecord<>(topic1, 1, 0, bytes, bytes))); + + assertFalse(task.isProcessable()); + + task.addRecords(partition2, Collections.singleton(new ConsumerRecord<>(topic2, 1, 0, bytes, bytes))); + + assertTrue(task.isProcessable()); + } + + @Test + public void shouldBeProcessableIfWaitedForTooLong() { + task = createStatelessTask(createConfig(false)); + task.initializeStateStores(); + task.initializeTopology(); + + assertFalse(task.isProcessable()); + + final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array(); + + task.addRecords(partition1, Collections.singleton(new ConsumerRecord<>(topic1, 1, 0, bytes, bytes))); + + assertFalse(task.isProcessable()); + assertFalse(task.isProcessable()); + assertFalse(task.isProcessable()); + assertFalse(task.isProcessable()); + assertFalse(task.isProcessable()); + + assertTrue(task.isProcessable()); + } + + @Test public void shouldPunctuateSystemTimeWhenIntervalElapsed() { task = createStatelessTask(createConfig(false)); @@ -575,20 +549,6 @@ public void shouldPunctuateOnceSystemTimeAfterGap() { processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 100, now + 110, now + 122, now + 130, now + 235, now + 240); } - @Test - public void testCancelPunctuateSystemTime() { - task = createStatelessTask(createConfig(false)); - task.initializeStateStores(); - task.initializeTopology(); - final long now = time.milliseconds(); - time.sleep(10); - assertTrue(task.maybePunctuateSystemTime()); - processorSystemTime.mockProcessor.scheduleCancellable.cancel(); - time.sleep(10); - assertFalse(task.maybePunctuateSystemTime()); - processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10); - } - @Test public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() { task = createTaskThatThrowsException(); @@ -1110,7 +1070,7 @@ private StreamTask createStatefulTaskThatThrowsExceptionOnClose() { private StreamTask createStatelessTask(final StreamsConfig streamsConfig) { final ProcessorTopology topology = ProcessorTopology.withSources( Utils.<ProcessorNode>mkList(source1, source2, processorStreamTime, processorSystemTime), - mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, (SourceNode) source2)) + mkMap(mkEntry(topic1, source1), mkEntry(topic2, source2)) ); source1.addChild(processorStreamTime); diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala index 32ad793bc84..cf87eb5e27d 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala @@ -24,11 +24,7 @@ import org.apache.kafka.common.serialization._ import org.apache.kafka.common.utils.MockTime import org.apache.kafka.streams._ import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils} -import org.apache.kafka.streams.processor.internals.StreamThread -import org.apache.kafka.streams.scala.ImplicitConversions._ -import org.apache.kafka.streams.scala.kstream._ import org.apache.kafka.test.TestUtils -import org.junit.Assert._ import org.junit._ import org.junit.rules.TemporaryFolder import org.scalatest.junit.JUnitSuite @@ -129,9 +125,9 @@ class StreamToTableJoinScalaIntegrationTestBase extends JUnitSuite with StreamTo // consume and verify result val consumerConfig = getConsumerConfig() - IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, - outputTopic, - expectedClicksPerRegion.size) + IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig, + outputTopic, + expectedClicksPerRegion.asJava) } else { java.util.Collections.emptyList() } diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala index e5253f95d45..3d1bab5d086 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala @@ -82,9 +82,6 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] = produceNConsume(userClicksTopic, userRegionsTopic, outputTopic) streams.close() - - import collection.JavaConverters._ - assertEquals(actualClicksPerRegion.asScala.sortBy(_.key), expectedClicksPerRegion.sortBy(_.key)) } @Test def testShouldCountClicksPerRegionJava(): Unit = { @@ -149,6 +146,5 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ produceNConsume(userClicksTopicJ, userRegionsTopicJ, outputTopicJ) streams.close() - assertEquals(actualClicksPerRegion.asScala.sortBy(_.key), expectedClicksPerRegion.sortBy(_.key)) } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Stream timestamp computation needs some further thoughts > -------------------------------------------------------- > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Guozhang Wang > Assignee: Guozhang Wang > Priority: Major > Labels: architecture > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. > *Update* > There is one more thing to consider (full discussion found here: > http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor) > {quote} > Let's assume the following case. > - a stream processor that uses the Processor API > - context.schedule(1000) is called in the init() > - the processor reads only one topic that has one partition > - using custom timestamp extractor, but that timestamp is just a wall > clock time > Image the following events: > 1., for 10 seconds I send in 5 messages / second > 2., does not send any messages for 3 seconds > 3., starts the 5 messages / second again > I see that punctuate() is not called during the 3 seconds when I do not > send any messages. This is ok according to the documentation, because > there is not any new messages to trigger the punctuate() call. When the > first few messages arrives after a restart the sending (point 3. above) I > see the following sequence of method calls: > 1., process() on the 1st message > 2., punctuate() is called 3 times > 3., process() on the 2nd message > 4., process() on each following message > What I would expect instead is that punctuate() is called first and then > process() is called on the messages, because the first message's timestamp > is already 3 seconds older then the last punctuate() was called, so the > first message belongs after the 3 punctuate() calls. > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)