guozhangwang commented on a change in pull request #9840: URL: https://github.com/apache/kafka/pull/9840#discussion_r562057043
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ########## @@ -78,15 +89,149 @@ RecordQueue queue() { } } - PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues, final Sensor recordLatenessSensor) { + PartitionGroup(final TaskId id, + final Map<TopicPartition, RecordQueue> partitionQueues, + final Sensor recordLatenessSensor, + final Sensor enforcedProcessingSensor, + final long maxTaskIdleMs) { + this.id = id; nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp)); this.partitionQueues = partitionQueues; + this.enforcedProcessingSensor = enforcedProcessingSensor; + this.maxTaskIdleMs = maxTaskIdleMs; this.recordLatenessSensor = recordLatenessSensor; totalBuffered = 0; allBuffered = false; streamTime = RecordQueue.UNKNOWN; } + public void addFetchedMetadata(final TopicPartition partition, final ConsumerRecords.Metadata metadata) { + final Long lag = metadata.lag(); + if (lag != null) { + LOG.debug("[{}] added fetched lag {}: {}", id, partition, lag); + fetchedLags.put(partition, lag); + } + } + + public boolean readyToProcess(final long wallClockTime) { + if (LOG.isTraceEnabled()) { + for (final Map.Entry<TopicPartition, RecordQueue> entry : partitionQueues.entrySet()) { + LOG.trace( + "[{}] buffered/lag {}: {}/{}", + id, + entry.getKey(), + entry.getValue().size(), + fetchedLags.get(entry.getKey()) + ); + } + } + // Log-level strategy: + // TRACE for messages that don't wait for fetches, since these may be logged at extremely high frequency + // DEBUG when we waited for a fetch and decided to wait some more, as configured + // DEBUG when we are ready for processing and didn't have to enforce processing + // INFO when we enforce processing, since this has to wait for fetches AND may result in disorder + + if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) { + if (LOG.isTraceEnabled() && !allBuffered && totalBuffered > 0) { + final Set<TopicPartition> bufferedPartitions = new HashSet<>(); + final Set<TopicPartition> emptyPartitions = new HashSet<>(); + for (final Map.Entry<TopicPartition, RecordQueue> entry : partitionQueues.entrySet()) { + if (entry.getValue().isEmpty()) { + emptyPartitions.add(entry.getKey()); + } else { + bufferedPartitions.add(entry.getKey()); + } + } + LOG.trace("[{}] Ready for processing because max.task.idle.ms is disabled." + + "\n\tThere may be out-of-order processing for this task as a result." + + "\n\tBuffered partitions: {}" + + "\n\tNon-buffered partitions: {}", + id, + bufferedPartitions, + emptyPartitions); + } + return true; Review comment: Actually on a second thought.. if users configures `-1` it means they probably do not care about enforced processing, while on the other side the INFO entry may flood the logs here. So NVM. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org