guozhangwang commented on a change in pull request #9840: URL: https://github.com/apache/kafka/pull/9840#discussion_r561459260
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ########## @@ -78,15 +89,149 @@ RecordQueue queue() { } } - PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues, final Sensor recordLatenessSensor) { + PartitionGroup(final TaskId id, + final Map<TopicPartition, RecordQueue> partitionQueues, + final Sensor recordLatenessSensor, + final Sensor enforcedProcessingSensor, + final long maxTaskIdleMs) { + this.id = id; nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp)); this.partitionQueues = partitionQueues; + this.enforcedProcessingSensor = enforcedProcessingSensor; + this.maxTaskIdleMs = maxTaskIdleMs; this.recordLatenessSensor = recordLatenessSensor; totalBuffered = 0; allBuffered = false; streamTime = RecordQueue.UNKNOWN; } + public void addFetchedMetadata(final TopicPartition partition, final ConsumerRecords.Metadata metadata) { + final Long lag = metadata.lag(); + if (lag != null) { + LOG.debug("[{}] added fetched lag {}: {}", id, partition, lag); + fetchedLags.put(partition, lag); + } + } + + public boolean readyToProcess(final long wallClockTime) { + if (LOG.isTraceEnabled()) { + for (final Map.Entry<TopicPartition, RecordQueue> entry : partitionQueues.entrySet()) { + LOG.trace( + "[{}] buffered/lag {}: {}/{}", + id, + entry.getKey(), + entry.getValue().size(), + fetchedLags.get(entry.getKey()) + ); + } + } + // Log-level strategy: + // TRACE for messages that don't wait for fetches, since these may be logged at extremely high frequency + // DEBUG when we waited for a fetch and decided to wait some more, as configured + // DEBUG when we are ready for processing and didn't have to enforce processing + // INFO when we enforce processing, since this has to wait for fetches AND may result in disorder + + if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) { + if (LOG.isTraceEnabled() && !allBuffered && totalBuffered > 0) { + final Set<TopicPartition> bufferedPartitions = new HashSet<>(); + final Set<TopicPartition> emptyPartitions = new HashSet<>(); + for (final Map.Entry<TopicPartition, RecordQueue> entry : partitionQueues.entrySet()) { + if (entry.getValue().isEmpty()) { + emptyPartitions.add(entry.getKey()); + } else { + bufferedPartitions.add(entry.getKey()); + } + } + LOG.trace("[{}] Ready for processing because max.task.idle.ms is disabled." + + "\n\tThere may be out-of-order processing for this task as a result." + + "\n\tBuffered partitions: {}" + + "\n\tNon-buffered partitions: {}", + id, + bufferedPartitions, + emptyPartitions); + } + return true; Review comment: Should we log INFO if we are indeed enforcing processing? I.e. there are some empty partitions. ########## File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ########## @@ -134,6 +134,8 @@ @SuppressWarnings("deprecation") public class StreamsConfig extends AbstractConfig { + public static final long MAX_TASK_IDLE_MS_DISABLED = -1; Review comment: nit: move this down below to 147? ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java ########## @@ -73,28 +82,22 @@ private final byte[] recordKey = intSerializer.serialize(null, 1); private final Metrics metrics = new Metrics(); + private final Sensor enforcedProcessingSensor = metrics.sensor(UUID.randomUUID().toString()); private final MetricName lastLatenessValue = new MetricName("record-lateness-last-value", "", "", mkMap()); - private PartitionGroup group; private static Sensor getValueSensor(final Metrics metrics, final MetricName metricName) { final Sensor lastRecordedValue = metrics.sensor(metricName.name()); lastRecordedValue.add(metricName, new Value()); return lastRecordedValue; } - @Before Review comment: Good refactoring! ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ########## @@ -52,15 +58,20 @@ * (i.e., it increases or stays the same over time). */ public class PartitionGroup { + private static final Logger LOG = LoggerFactory.getLogger(PartitionGroup.class); Review comment: Is it more convienent to pass in the `log` object from AbstractTask to the PartitionGroup constructor? It is created with the logContext including the task-type / task-id. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -510,9 +516,7 @@ public StreamThread(final Time time, this.nextProbingRebalanceMs = nextProbingRebalanceMs; this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); - final int dummyThreadIdx = 1; - this.maxPollTimeMs = new InternalConsumerConfig(config.getMainConsumerConfigs("dummyGroupId", "dummyClientId", dummyThreadIdx)) - .getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG); + this.maxPollTimeMs = maxPollTimeMs; Review comment: What's the rationale of this refactoring? ########## File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ########## @@ -134,6 +134,8 @@ @SuppressWarnings("deprecation") public class StreamsConfig extends AbstractConfig { + public static final long MAX_TASK_IDLE_MS_DISABLED = -1; Review comment: Also nit, the line below should be ``` private static final Logger ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -910,6 +900,11 @@ public void addRecords(final TopicPartition partition, final Iterable<ConsumerRe } } + @Override + public void addFetchedMetadata(final TopicPartition partition, final ConsumerRecords.Metadata metadata) { Review comment: The only reason that we need to add this function at `Task` seems to be `tasks.activeTasksForInputPartition(partition)` at `TaskManager`. and there's a TODO to convert its return to `StreamTask` anyways. So let's just move this function to `StreamTask` only and in `TaskManager` force convert the `task` to `StreamTask`. And then we can remove it from `StandbyTask`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org