[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16567670#comment-16567670
 ] 

ASF GitHub Bot commented on KAFKA-3514:
---------------------------------------

guozhangwang closed pull request #5398: KAFKA-3514: Part II, Choose tasks with 
data on all partitions to process
URL: https://github.com/apache/kafka/pull/5398
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index f98e6356a22..0a839655748 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -87,11 +87,13 @@ int maybeCommit() {
      */
     int process() {
         int processed = 0;
+
         final Iterator<Map.Entry<TaskId, StreamTask>> it = 
running.entrySet().iterator();
         while (it.hasNext()) {
             final StreamTask task = it.next().getValue();
+
             try {
-                if (task.process()) {
+                if (task.isProcessable() && task.process()) {
                     processed++;
                 }
             } catch (final TaskMigratedException e) {
@@ -108,6 +110,7 @@ int process() {
                 throw e;
             }
         }
+
         return processed;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 34252bf2b4c..f17c63acd2f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -27,15 +27,23 @@
 
 /**
  * A PartitionGroup is composed from a set of partitions. It also maintains 
the timestamp of this
- * group, hence the associated task as the min timestamp across all partitions 
in the group.
+ * group, a.k.a. the stream time of the associated task. It is defined as the 
maximum timestamp of
+ * all the records having been retrieved for processing from this 
PartitionGroup so far.
+ *
+ * We decide from which partition to retrieve the next record to process based 
on partitions' timestamps.
+ * The timestamp of a specific partition is initialized as UNKNOWN (-1), and 
is updated with the head record's timestamp
+ * if it is smaller (i.e. it should be monotonically increasing); when the 
partition's buffer becomes empty and there is
+ * no head record, the partition's timestamp will not be updated any more.
  */
 public class PartitionGroup {
 
     private final Map<TopicPartition, RecordQueue> partitionQueues;
-
     private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime;
+
     private long streamTime;
     private int totalBuffered;
+    private boolean allBuffered;
+
 
     public static class RecordInfo {
         RecordQueue queue;
@@ -53,11 +61,11 @@ RecordQueue queue() {
         }
     }
 
-
     PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues) {
         nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), 
Comparator.comparingLong(RecordQueue::timestamp));
         this.partitionQueues = partitionQueues;
         totalBuffered = 0;
+        allBuffered = false;
         streamTime = RecordQueue.NOT_KNOWN;
     }
 
@@ -79,17 +87,16 @@ StampedRecord nextRecord(final RecordInfo info) {
             if (record != null) {
                 --totalBuffered;
 
-                if (!queue.isEmpty()) {
+                if (queue.isEmpty()) {
+                    // if a certain queue has been drained, reset the flag
+                    allBuffered = false;
+                } else {
                     nonEmptyQueuesByTime.offer(queue);
                 }
 
-                // Since this was previously a queue with min timestamp,
-                // streamTime could only advance if this queue's time did.
-                if (queue.timestamp() > streamTime) {
-                    computeStreamTime();
-                }
+                // always update the stream time to the record's timestamp yet 
to be processed if it is larger
+                streamTime = Math.max(streamTime, record.timestamp);
             }
-
         }
 
         return record;
@@ -106,17 +113,18 @@ int addRawRecords(final TopicPartition partition, final 
Iterable<ConsumerRecord<
         final RecordQueue recordQueue = partitionQueues.get(partition);
 
         final int oldSize = recordQueue.size();
-        final long oldTimestamp = recordQueue.timestamp();
         final int newSize = recordQueue.addRawRecords(rawRecords);
 
         // add this record queue to be considered for processing in the future 
if it was empty before
         if (oldSize == 0 && newSize > 0) {
             nonEmptyQueuesByTime.offer(recordQueue);
-        }
 
-        // Adding to this queue could only advance streamTime if it was 
previously the queue with min timestamp (= streamTime)
-        if (oldTimestamp <= streamTime && recordQueue.timestamp() > 
streamTime) {
-            computeStreamTime();
+            // if all partitions now are non-empty, set the flag
+            // we do not need to update the stream time here since this task 
will definitely be
+            // processed next, and hence the stream time will be updated when 
we retrieved records by then
+            if (nonEmptyQueuesByTime.size() == this.partitionQueues.size()) {
+                allBuffered = true;
+            }
         }
 
         totalBuffered += newSize - oldSize;
@@ -136,18 +144,6 @@ public long timestamp() {
         return streamTime;
     }
 
-    private void computeStreamTime() {
-        // we should always return the smallest timestamp of all partitions
-        // to avoid group partition time goes backward
-        long timestamp = Long.MAX_VALUE;
-        for (final RecordQueue queue : partitionQueues.values()) {
-            if (queue.timestamp() < timestamp) {
-                timestamp = queue.timestamp();
-            }
-        }
-        this.streamTime = timestamp;
-    }
-
     /**
      * @throws IllegalStateException if the record's partition does not belong 
to this partition group
      */
@@ -165,7 +161,12 @@ int numBuffered() {
         return totalBuffered;
     }
 
+    boolean allPartitionsBuffered() {
+        return allBuffered;
+    }
+
     public void close() {
+        clear();
         partitionQueues.clear();
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 7f121fe0df4..6f3b031b8db 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -59,6 +59,8 @@
 
     private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new 
ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null);
 
+    private static final int WAIT_ON_PARTIAL_INPUT = 5;
+
     private final PartitionGroup partitionGroup;
     private final PartitionGroup.RecordInfo recordInfo;
     private final PunctuationQueue streamTimePunctuationQueue;
@@ -72,12 +74,14 @@
     private boolean commitRequested = false;
     private boolean commitOffsetNeeded = false;
     private boolean transactionInFlight = false;
+    private int waits = WAIT_ON_PARTIAL_INPUT;
     private final Time time;
     private final TaskMetrics taskMetrics;
 
     protected static final class TaskMetrics {
         final StreamsMetricsImpl metrics;
         final Sensor taskCommitTimeSensor;
+        final Sensor taskEnforcedProcessSensor;
         private final String taskName;
 
 
@@ -108,7 +112,7 @@
 
             // add the operation metrics with additional tags
             final Map<String, String> tagMap = metrics.tagMap("task-id", 
taskName);
-            taskCommitTimeSensor = metrics.taskLevelSensor("commit", taskName, 
Sensor.RecordingLevel.DEBUG, parent);
+            taskCommitTimeSensor = metrics.taskLevelSensor(taskName, "commit", 
Sensor.RecordingLevel.DEBUG, parent);
             taskCommitTimeSensor.add(
                 new MetricName("commit-latency-avg", group, "The average 
latency of commit operation.", tagMap),
                 new Avg()
@@ -125,6 +129,18 @@
                 new MetricName("commit-total", group, "The total number of 
occurrence of commit operations.", tagMap),
                 new Count()
             );
+
+            // add the metrics for enforced processing
+            taskEnforcedProcessSensor = metrics.taskLevelSensor(taskName, 
"enforced-process", Sensor.RecordingLevel.DEBUG, parent);
+            taskEnforcedProcessSensor.add(
+                    new MetricName("enforced-process-rate", group, "The 
average number of occurrence of enforced-process per second.", tagMap),
+                    new Rate(TimeUnit.SECONDS, new Count())
+            );
+            taskEnforcedProcessSensor.add(
+                    new MetricName("enforced-process-total", group, "The total 
number of occurrence of enforced-process operations.", tagMap),
+                    new Count()
+            );
+
         }
 
         void removeAllSensors() {
@@ -263,6 +279,21 @@ public void resume() {
         log.debug("Resuming");
     }
 
+    /**
+     * An active task is processable if its buffer contains data for all of 
its input source topic partitions
+     */
+    public boolean isProcessable() {
+        if (partitionGroup.allPartitionsBuffered()) {
+            return true;
+        } else if (partitionGroup.numBuffered() > 0 && --waits < 0) {
+            taskMetrics.taskEnforcedProcessSensor.record();
+            waits = WAIT_ON_PARTIAL_INPUT;
+            return true;
+        }
+
+        return false;
+    }
+
     /**
      * Process one record.
      *
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 428aa1d938b..42f55efbf63 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -831,18 +831,21 @@ long runOnce(final long recordsProcessedBeforeCommit) {
             }
         }
 
-        if (records != null && !records.isEmpty() && 
taskManager.hasActiveRunningTasks()) {
+        if (records != null && !records.isEmpty()) {
             streamsMetrics.pollTimeSensor.record(computeLatency(), 
timerStartedMs);
             addRecordsToTasks(records);
+        }
+
+        if (taskManager.hasActiveRunningTasks()) {
             final long totalProcessed = 
processAndMaybeCommit(recordsProcessedBeforeCommit);
             if (totalProcessed > 0) {
                 final long processLatency = computeLatency();
                 streamsMetrics.processTimeSensor.record(processLatency / 
(double) totalProcessed, timerStartedMs);
                 processedBeforeCommit = adjustRecordsProcessedBeforeCommit(
-                    recordsProcessedBeforeCommit,
-                    totalProcessed,
-                    processLatency,
-                    commitTimeMs);
+                        recordsProcessedBeforeCommit,
+                        totalProcessed,
+                        processLatency,
+                        commitTimeMs);
             }
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 662ded553ad..e99a5b3bb23 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -88,13 +88,13 @@ public final void removeAllThreadLevelSensors() {
     }
 
     public final Sensor taskLevelSensor(final String taskName,
-                                         final String sensorName,
-                                         final Sensor.RecordingLevel 
recordingLevel,
-                                         final Sensor... parents) {
+                                        final String sensorName,
+                                        final Sensor.RecordingLevel 
recordingLevel,
+                                        final Sensor... parents) {
         final String key = threadName + "." + taskName;
         synchronized (taskLevelSensors) {
             if (!taskLevelSensors.containsKey(key)) {
-                taskLevelSensors.put(key, new LinkedList<String>());
+                taskLevelSensors.put(key, new LinkedList<>());
             }
 
             final String fullSensorName = key + "." + sensorName;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index 77b9c1e8560..12b4cf30240 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -370,7 +370,7 @@ private NamedCacheMetrics(final StreamsMetricsImpl metrics, 
final String cacheNa
                 "record-cache-id", "all",
                 "task-id", taskName
             );
-            final Sensor taskLevelHitRatioSensor = 
metrics.taskLevelSensor("hitRatio", taskName, Sensor.RecordingLevel.DEBUG);
+            final Sensor taskLevelHitRatioSensor = 
metrics.taskLevelSensor(taskName, "hitRatio", Sensor.RecordingLevel.DEBUG);
             taskLevelHitRatioSensor.add(
                 new MetricName("hitRatio-avg", group, "The average cache hit 
ratio.", allMetricTags),
                 new Avg()
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 749d74887e5..1a78ed356bb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -50,7 +50,9 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -350,6 +352,45 @@ public static void waitForCompletion(final KafkaStreams 
streams,
         return accumData;
     }
 
+    public static <K, V> List<KeyValue<K, V>> 
waitUntilFinalKeyValueRecordsReceived(final Properties consumerConfig,
+                                                                               
     final String topic,
+                                                                               
     final List<KeyValue<K, V>> expectedRecords) throws InterruptedException {
+        return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, 
expectedRecords, DEFAULT_TIMEOUT);
+    }
+
+    public static <K, V> List<KeyValue<K, V>> 
waitUntilFinalKeyValueRecordsReceived(final Properties consumerConfig,
+                                                                               
     final String topic,
+                                                                               
     final List<KeyValue<K, V>> expectedRecords,
+                                                                               
     final long waitTime) throws InterruptedException {
+        final List<KeyValue<K, V>> accumData = new ArrayList<>();
+        try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
+            final TestCondition valuesRead = new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    final List<KeyValue<K, V>> readData =
+                            readKeyValues(topic, consumer, waitTime, 
expectedRecords.size());
+                    accumData.addAll(readData);
+
+                    final Map<K, V> finalData = new HashMap<>();
+
+                    for (final KeyValue<K, V> keyValue : accumData) {
+                        finalData.put(keyValue.key, keyValue.value);
+                    }
+
+                    for (final KeyValue<K, V> keyValue : expectedRecords) {
+                        if 
(!keyValue.value.equals(finalData.get(keyValue.key)))
+                            return false;
+                    }
+
+                    return true;
+                }
+            };
+            final String conditionDetails = "Did not receive all " + 
expectedRecords + " records from topic " + topic;
+            TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
+        }
+        return accumData;
+    }
+
     public static <K, V> List<ConsumerRecord<K, V>> 
waitUntilMinRecordsReceived(final Properties consumerConfig,
                                                                                
 final String topic,
                                                                                
 final int expectedNumRecords,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index 8a8d6255c46..7efe653578f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -337,6 +337,7 @@ public void 
shouldCloseTaskOnMaybeCommitIfTaskMigratedException() {
     @Test
     public void shouldCloseTaskOnProcessesIfTaskMigratedException() {
         mockTaskInitialization();
+        EasyMock.expect(t1.isProcessable()).andReturn(true);
         t1.process();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException());
         t1.close(false, true);
@@ -353,6 +354,32 @@ public void 
shouldCloseTaskOnProcessesIfTaskMigratedException() {
         EasyMock.verify(t1);
     }
 
+    @Test
+    public void shouldNotProcessUnprocessableTasks() {
+        mockTaskInitialization();
+        EasyMock.expect(t1.isProcessable()).andReturn(false);
+        EasyMock.replay(t1);
+        addAndInitTask();
+
+        assertThat(assignedTasks.process(), equalTo(0));
+
+        EasyMock.verify(t1);
+    }
+
+    @Test
+    public void shouldAlwaysProcessProcessableTasks() {
+        mockTaskInitialization();
+        EasyMock.expect(t1.isProcessable()).andReturn(true);
+        EasyMock.expect(t1.process()).andReturn(true).once();
+        EasyMock.replay(t1);
+
+        addAndInitTask();
+
+        assertThat(assignedTasks.process(), equalTo(1));
+
+        EasyMock.verify(t1);
+    }
+
     @Test
     public void shouldPunctuateRunningTasks() {
         mockTaskInitialization();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index b3123e46343..2df4f66cb85 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -88,12 +88,12 @@ public void testTimeTracking() {
         group.addRawRecords(partition2, list2);
         // 1:[1, 3, 5]
         // 2:[2, 4, 6]
-        // st: 1
+        // st: -1 since no records was being processed yet
 
         assertEquals(6, group.numBuffered());
         assertEquals(3, group.numBuffered(partition1));
         assertEquals(3, group.numBuffered(partition2));
-        assertEquals(1L, group.timestamp());
+        assertEquals(-1L, group.timestamp());
 
         StampedRecord record;
         final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
@@ -108,7 +108,7 @@ public void testTimeTracking() {
         assertEquals(5, group.numBuffered());
         assertEquals(2, group.numBuffered(partition1));
         assertEquals(3, group.numBuffered(partition2));
-        assertEquals(2L, group.timestamp());
+        assertEquals(1L, group.timestamp());
 
         // get one record, now the time should be advanced
         record = group.nextRecord(info);
@@ -120,7 +120,7 @@ public void testTimeTracking() {
         assertEquals(4, group.numBuffered());
         assertEquals(2, group.numBuffered(partition1));
         assertEquals(2, group.numBuffered(partition2));
-        assertEquals(3L, group.timestamp());
+        assertEquals(2L, group.timestamp());
 
         // add 2 more records with timestamp 2, 4 to partition-1
         final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
@@ -134,7 +134,7 @@ public void testTimeTracking() {
         assertEquals(6, group.numBuffered());
         assertEquals(4, group.numBuffered(partition1));
         assertEquals(2, group.numBuffered(partition2));
-        assertEquals(3L, group.timestamp());
+        assertEquals(2L, group.timestamp());
 
         // get one record, time should not be advanced
         record = group.nextRecord(info);
@@ -146,7 +146,7 @@ public void testTimeTracking() {
         assertEquals(5, group.numBuffered());
         assertEquals(3, group.numBuffered(partition1));
         assertEquals(2, group.numBuffered(partition2));
-        assertEquals(4L, group.timestamp());
+        assertEquals(3L, group.timestamp());
 
         // get one record, time should not be advanced
         record = group.nextRecord(info);
@@ -158,7 +158,7 @@ public void testTimeTracking() {
         assertEquals(4, group.numBuffered());
         assertEquals(3, group.numBuffered(partition1));
         assertEquals(1, group.numBuffered(partition2));
-        assertEquals(5L, group.timestamp());
+        assertEquals(4L, group.timestamp());
 
         // get one more record, now time should be advanced
         record = group.nextRecord(info);
@@ -206,7 +206,7 @@ public void testTimeTracking() {
         assertEquals(0, group.numBuffered());
         assertEquals(0, group.numBuffered(partition1));
         assertEquals(0, group.numBuffered(partition2));
-        assertEquals(5L, group.timestamp());
+        assertEquals(6L, group.timestamp());
 
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index bfbb2a00270..146bcb3b54e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -57,6 +57,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
@@ -111,7 +112,7 @@ public void close() {
 
     private final ProcessorTopology topology = ProcessorTopology.withSources(
         Utils.<ProcessorNode>mkList(source1, source2, processorStreamTime, 
processorSystemTime),
-        mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, 
(SourceNode) source2))
+        mkMap(mkEntry(topic1, source1), mkEntry(topic2, source2))
     );
 
     private final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
@@ -307,97 +308,6 @@ public void testPauseResume() {
         assertEquals(0, consumer.paused().size());
     }
 
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testMaybePunctuateStreamTime() {
-        task = createStatelessTask(createConfig(false));
-        task.initializeStateStores();
-        task.initializeTopology();
-
-        task.addRecords(partition1, Arrays.asList(
-            getConsumerRecord(partition1, 0),
-            getConsumerRecord(partition1, 20),
-            getConsumerRecord(partition1, 32),
-            getConsumerRecord(partition1, 40),
-            getConsumerRecord(partition1, 60)
-        ));
-
-        task.addRecords(partition2, Arrays.asList(
-            getConsumerRecord(partition2, 25),
-            getConsumerRecord(partition2, 35),
-            getConsumerRecord(partition2, 45),
-            getConsumerRecord(partition2, 61)
-        ));
-
-        assertTrue(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(8, task.numBuffered());
-        assertEquals(1, source1.numReceived);
-        assertEquals(0, source2.numReceived);
-
-        assertTrue(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(7, task.numBuffered());
-        assertEquals(2, source1.numReceived);
-        assertEquals(0, source2.numReceived);
-
-        assertFalse(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(6, task.numBuffered());
-        assertEquals(2, source1.numReceived);
-        assertEquals(1, source2.numReceived);
-
-        assertTrue(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(5, task.numBuffered());
-        assertEquals(3, source1.numReceived);
-        assertEquals(1, source2.numReceived);
-
-        assertFalse(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(4, task.numBuffered());
-        assertEquals(3, source1.numReceived);
-        assertEquals(2, source2.numReceived);
-
-        assertTrue(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(3, task.numBuffered());
-        assertEquals(4, source1.numReceived);
-        assertEquals(2, source2.numReceived);
-
-        assertFalse(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(2, task.numBuffered());
-        assertEquals(4, source1.numReceived);
-        assertEquals(3, source2.numReceived);
-
-        assertTrue(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(1, task.numBuffered());
-        assertEquals(5, source1.numReceived);
-        assertEquals(3, source2.numReceived);
-
-        assertFalse(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(0, task.numBuffered());
-        assertEquals(5, source1.numReceived);
-        assertEquals(4, source2.numReceived);
-
-        assertFalse(task.process());
-        assertFalse(task.maybePunctuateStreamTime());
-
-        
processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME,
 0L, 20L, 32L, 40L, 60L);
-    }
-
     @SuppressWarnings("unchecked")
     @Test
     public void shouldPunctuateOnceStreamTimeAfterGap() {
@@ -419,64 +329,67 @@ public void shouldPunctuateOnceStreamTimeAfterGap() {
             getConsumerRecord(partition2, 161)
         ));
 
-        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 20
+        // st: -1
+        assertFalse(task.maybePunctuateStreamTime()); // punctuate at 20
 
+        // st: 20
         assertTrue(task.process());
         assertEquals(7, task.numBuffered());
         assertEquals(1, source1.numReceived);
         assertEquals(0, source2.numReceived);
+        assertTrue(task.maybePunctuateStreamTime());
 
-        assertFalse(task.maybePunctuateStreamTime());
-
+        // st: 25
         assertTrue(task.process());
         assertEquals(6, task.numBuffered());
         assertEquals(1, source1.numReceived);
         assertEquals(1, source2.numReceived);
-
-        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 142
-
-        // only one punctuation after 100ms gap
         assertFalse(task.maybePunctuateStreamTime());
 
+        // st: 142
+        // punctuate at 142
         assertTrue(task.process());
         assertEquals(5, task.numBuffered());
         assertEquals(2, source1.numReceived);
         assertEquals(1, source2.numReceived);
+        assertTrue(task.maybePunctuateStreamTime());
 
-        assertFalse(task.maybePunctuateStreamTime());
-
+        // st: 145
+        // only one punctuation after 100ms gap
         assertTrue(task.process());
         assertEquals(4, task.numBuffered());
         assertEquals(2, source1.numReceived);
         assertEquals(2, source2.numReceived);
+        assertFalse(task.maybePunctuateStreamTime());
 
-        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 155
-
+        // st: 155
+        // punctuate at 155
         assertTrue(task.process());
         assertEquals(3, task.numBuffered());
         assertEquals(3, source1.numReceived);
         assertEquals(2, source2.numReceived);
+        assertTrue(task.maybePunctuateStreamTime());
 
-        assertFalse(task.maybePunctuateStreamTime());
-
+        // st: 159
         assertTrue(task.process());
         assertEquals(2, task.numBuffered());
         assertEquals(3, source1.numReceived);
         assertEquals(3, source2.numReceived);
+        assertFalse(task.maybePunctuateStreamTime());
 
-        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 160, 
still aligned on the initial punctuation
-
+        // st: 160, aligned at 0
         assertTrue(task.process());
         assertEquals(1, task.numBuffered());
         assertEquals(4, source1.numReceived);
         assertEquals(3, source2.numReceived);
+        assertTrue(task.maybePunctuateStreamTime());
 
-        assertFalse(task.maybePunctuateStreamTime());
-
+        // st: 161
         assertTrue(task.process());
         assertEquals(0, task.numBuffered());
         assertEquals(4, source1.numReceived);
         assertEquals(4, source2.numReceived);
+        assertFalse(task.maybePunctuateStreamTime());
 
         assertFalse(task.process());
         assertFalse(task.maybePunctuateStreamTime());
@@ -484,9 +397,8 @@ public void shouldPunctuateOnceStreamTimeAfterGap() {
         
processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME,
 20L, 142L, 155L, 160L);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void testCancelPunctuateStreamTime() {
+    public void shouldRespectPunctuateCancellationStreamTime() {
         task = createStatelessTask(createConfig(false));
         task.initializeStateStores();
         task.initializeTopology();
@@ -503,12 +415,19 @@ public void testCancelPunctuateStreamTime() {
             getConsumerRecord(partition2, 45)
         ));
 
+        assertFalse(task.maybePunctuateStreamTime());
+
+        // st is now 20
+        assertTrue(task.process());
+
         assertTrue(task.maybePunctuateStreamTime());
 
+        // st is now 25
         assertTrue(task.process());
 
         assertFalse(task.maybePunctuateStreamTime());
 
+        // st is now 30
         assertTrue(task.process());
 
         processorStreamTime.mockProcessor.scheduleCancellable.cancel();
@@ -518,6 +437,61 @@ public void testCancelPunctuateStreamTime() {
         
processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME,
 20L);
     }
 
+    @Test
+    public void shouldRespectPunctuateCancellationSystemTime() {
+        task = createStatelessTask(createConfig(false));
+        task.initializeStateStores();
+        task.initializeTopology();
+        final long now = time.milliseconds();
+        time.sleep(10);
+        assertTrue(task.maybePunctuateSystemTime());
+        processorSystemTime.mockProcessor.scheduleCancellable.cancel();
+        time.sleep(10);
+        assertFalse(task.maybePunctuateSystemTime());
+        
processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
 now + 10);
+    }
+
+    @Test
+    public void shouldBeProcessableIfAllPartitionsBuffered() {
+        task = createStatelessTask(createConfig(false));
+        task.initializeStateStores();
+        task.initializeTopology();
+
+        assertFalse(task.isProcessable());
+
+        final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array();
+
+        task.addRecords(partition1, Collections.singleton(new 
ConsumerRecord<>(topic1, 1, 0, bytes, bytes)));
+
+        assertFalse(task.isProcessable());
+
+        task.addRecords(partition2, Collections.singleton(new 
ConsumerRecord<>(topic2, 1, 0, bytes, bytes)));
+
+        assertTrue(task.isProcessable());
+    }
+
+    @Test
+    public void shouldBeProcessableIfWaitedForTooLong() {
+        task = createStatelessTask(createConfig(false));
+        task.initializeStateStores();
+        task.initializeTopology();
+
+        assertFalse(task.isProcessable());
+
+        final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array();
+
+        task.addRecords(partition1, Collections.singleton(new 
ConsumerRecord<>(topic1, 1, 0, bytes, bytes)));
+
+        assertFalse(task.isProcessable());
+        assertFalse(task.isProcessable());
+        assertFalse(task.isProcessable());
+        assertFalse(task.isProcessable());
+        assertFalse(task.isProcessable());
+
+        assertTrue(task.isProcessable());
+    }
+
+
     @Test
     public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
         task = createStatelessTask(createConfig(false));
@@ -575,20 +549,6 @@ public void shouldPunctuateOnceSystemTimeAfterGap() {
         
processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
 now + 100, now + 110, now + 122, now + 130, now + 235, now + 240);
     }
 
-    @Test
-    public void testCancelPunctuateSystemTime() {
-        task = createStatelessTask(createConfig(false));
-        task.initializeStateStores();
-        task.initializeTopology();
-        final long now = time.milliseconds();
-        time.sleep(10);
-        assertTrue(task.maybePunctuateSystemTime());
-        processorSystemTime.mockProcessor.scheduleCancellable.cancel();
-        time.sleep(10);
-        assertFalse(task.maybePunctuateSystemTime());
-        
processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
 now + 10);
-    }
-
     @Test
     public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() {
         task = createTaskThatThrowsException();
@@ -1110,7 +1070,7 @@ private StreamTask 
createStatefulTaskThatThrowsExceptionOnClose() {
     private StreamTask createStatelessTask(final StreamsConfig streamsConfig) {
         final ProcessorTopology topology = ProcessorTopology.withSources(
             Utils.<ProcessorNode>mkList(source1, source2, processorStreamTime, 
processorSystemTime),
-            mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, 
(SourceNode) source2))
+            mkMap(mkEntry(topic1, source1), mkEntry(topic2, source2))
         );
 
         source1.addChild(processorStreamTime);
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala
index 32ad793bc84..cf87eb5e27d 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala
@@ -24,11 +24,7 @@ import org.apache.kafka.common.serialization._
 import org.apache.kafka.common.utils.MockTime
 import org.apache.kafka.streams._
 import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, 
IntegrationTestUtils}
-import org.apache.kafka.streams.processor.internals.StreamThread
-import org.apache.kafka.streams.scala.ImplicitConversions._
-import org.apache.kafka.streams.scala.kstream._
 import org.apache.kafka.test.TestUtils
-import org.junit.Assert._
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.scalatest.junit.JUnitSuite
@@ -129,9 +125,9 @@ class StreamToTableJoinScalaIntegrationTestBase extends 
JUnitSuite with StreamTo
       // consume and verify result
       val consumerConfig = getConsumerConfig()
 
-      IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
-                                                               outputTopic,
-                                                               
expectedClicksPerRegion.size)
+      
IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig,
+                                                                 outputTopic,
+                                                                 
expectedClicksPerRegion.asJava)
     } else {
       java.util.Collections.emptyList()
     }
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index e5253f95d45..3d1bab5d086 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -82,9 +82,6 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
extends StreamToTableJ
     val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
       produceNConsume(userClicksTopic, userRegionsTopic, outputTopic)
     streams.close()
-
-    import collection.JavaConverters._
-    assertEquals(actualClicksPerRegion.asScala.sortBy(_.key), 
expectedClicksPerRegion.sortBy(_.key))
   }
 
   @Test def testShouldCountClicksPerRegionJava(): Unit = {
@@ -149,6 +146,5 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
extends StreamToTableJ
       produceNConsume(userClicksTopicJ, userRegionsTopicJ, outputTopicJ)
 
     streams.close()
-    assertEquals(actualClicksPerRegion.asScala.sortBy(_.key), 
expectedClicksPerRegion.sortBy(_.key))
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Stream timestamp computation needs some further thoughts
> --------------------------------------------------------
>
>                 Key: KAFKA-3514
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3514
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>            Priority: Major
>              Labels: architecture
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.
> *Update*
> There is one more thing to consider (full discussion found here: 
> http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor)
> {quote}
> Let's assume the following case.
> - a stream processor that uses the Processor API
> - context.schedule(1000) is called in the init()
> - the processor reads only one topic that has one partition
> - using custom timestamp extractor, but that timestamp is just a wall 
> clock time
> Image the following events:
> 1., for 10 seconds I send in 5 messages / second
> 2., does not send any messages for 3 seconds
> 3., starts the 5 messages / second again
> I see that punctuate() is not called during the 3 seconds when I do not 
> send any messages. This is ok according to the documentation, because 
> there is not any new messages to trigger the punctuate() call. When the 
> first few messages arrives after a restart the sending (point 3. above) I 
> see the following sequence of method calls:
> 1., process() on the 1st message
> 2., punctuate() is called 3 times
> 3., process() on the 2nd message
> 4., process() on each following message
> What I would expect instead is that punctuate() is called first and then 
> process() is called on the messages, because the first message's timestamp 
> is already 3 seconds older then the last punctuate() was called, so the 
> first message belongs after the 3 punctuate() calls.
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to