guozhangwang commented on a change in pull request #9840:
URL: https://github.com/apache/kafka/pull/9840#discussion_r562057043



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##########
@@ -78,15 +89,149 @@ RecordQueue queue() {
         }
     }
 
-    PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues, 
final Sensor recordLatenessSensor) {
+    PartitionGroup(final TaskId id,
+                   final Map<TopicPartition, RecordQueue> partitionQueues,
+                   final Sensor recordLatenessSensor,
+                   final Sensor enforcedProcessingSensor,
+                   final long maxTaskIdleMs) {
+        this.id = id;
         nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), 
Comparator.comparingLong(RecordQueue::headRecordTimestamp));
         this.partitionQueues = partitionQueues;
+        this.enforcedProcessingSensor = enforcedProcessingSensor;
+        this.maxTaskIdleMs = maxTaskIdleMs;
         this.recordLatenessSensor = recordLatenessSensor;
         totalBuffered = 0;
         allBuffered = false;
         streamTime = RecordQueue.UNKNOWN;
     }
 
+    public void addFetchedMetadata(final TopicPartition partition, final 
ConsumerRecords.Metadata metadata) {
+        final Long lag = metadata.lag();
+        if (lag != null) {
+            LOG.debug("[{}] added fetched lag {}: {}", id, partition, lag);
+            fetchedLags.put(partition, lag);
+        }
+    }
+
+    public boolean readyToProcess(final long wallClockTime) {
+        if (LOG.isTraceEnabled()) {
+            for (final Map.Entry<TopicPartition, RecordQueue> entry : 
partitionQueues.entrySet()) {
+                LOG.trace(
+                    "[{}] buffered/lag {}: {}/{}",
+                    id,
+                    entry.getKey(),
+                    entry.getValue().size(),
+                    fetchedLags.get(entry.getKey())
+                );
+            }
+        }
+        // Log-level strategy:
+        //  TRACE for messages that don't wait for fetches, since these may be 
logged at extremely high frequency
+        //  DEBUG when we waited for a fetch and decided to wait some more, as 
configured
+        //  DEBUG when we are ready for processing and didn't have to enforce 
processing
+        //  INFO  when we enforce processing, since this has to wait for 
fetches AND may result in disorder
+
+        if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) {
+            if (LOG.isTraceEnabled() && !allBuffered && totalBuffered > 0) {
+                final Set<TopicPartition> bufferedPartitions = new HashSet<>();
+                final Set<TopicPartition> emptyPartitions = new HashSet<>();
+                for (final Map.Entry<TopicPartition, RecordQueue> entry : 
partitionQueues.entrySet()) {
+                    if (entry.getValue().isEmpty()) {
+                        emptyPartitions.add(entry.getKey());
+                    } else {
+                        bufferedPartitions.add(entry.getKey());
+                    }
+                }
+                LOG.trace("[{}] Ready for processing because max.task.idle.ms 
is disabled." +
+                              "\n\tThere may be out-of-order processing for 
this task as a result." +
+                              "\n\tBuffered partitions: {}" +
+                              "\n\tNon-buffered partitions: {}",
+                          id,
+                          bufferedPartitions,
+                          emptyPartitions);
+            }
+            return true;

Review comment:
       Actually on a second thought.. if users configures `-1` it means they 
probably do not care about enforced processing, while on the other side the 
INFO entry may flood the logs here. So NVM.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to