The parameter applies too all three topics (input, intermediate, repartitions topics) and it's a global config.
About the blocking behavior: If one partitions becomes empty, all other partitions are paused() and Streams only poll() for the empty partition. If no data is returned within the timeout, processing of enforced for this task for all partitions. The task will stay in the "enforced processing state" until all partitions deliver data again (at the same time). If a second partition becomes empty, no additional delay is applied. Hope this answers your question. -Matthias On 1/15/19 2:07 AM, Peter Levart wrote: > Another question about this parameter. > > Does that parameter apply just to input topics to the KafkaStreams > topology or also to intermediate (repartitioning) topics or to > intermediate topics configured with KStream.through() directive? > > Is it possible to control the behavior on a per-topic basis? > > Thanks, > > Peter > > On 1/15/19 11:00 AM, Peter Levart wrote: >> Hello! >> >> I'm trying to understand the behavior of Kafka Streams consumers with >> regards to max.task.idle.ms configuration parameter (default 0). The >> documentation says: >> >> max.task.idle.ms Medium Maximum amount of time a stream task >> will stay idle when not all of its partition buffers contain records. >> >> Suppose an input topic to a streams application has multiple >> partitions and that traffic arrives to this input topic in bursts. At >> first all partitions will be filled with records and all tasks will be >> busy. Then some partition will get drained first. The task with >> drained partition will bet paused for max.task.idle.ms time. >> >> Question: How often will the task with drained partition pause for >> max.task.idle.ms time due to missing records in a partition? >> >> 1. Before every record? >> 2. At each KafkaConsumer.poll() that returns any records? >> 3. Just the 1st time some partition is detected to have no polled >> records, but then this partition will get "blacklisted" from further >> checks for pausing until it again receives a record. >> >> For example, if the task is assigned N partitions, what is the max. # >> of times the task will pause for max.task.idle.ms time when the input >> burst is over: >> >> The wishful answer is N-1 times. Is this so? >> >> Regards, Peter >> >
signature.asc
Description: OpenPGP digital signature