That sounds correct. Just to avoid confusion: we don't use the term "pause" for tasks, but "idle", because consumers can "pause/resume" partitions what is something different.
In fact, if a task is idle the underlying empty partitions are not paused, because we hope to get data from them before the timeout is exceeded. -Matthias On 1/18/19 2:17 AM, Peter Levart wrote: > Hi Matthias, > > Thank you for the clarifications. May I ask about some more detail? I'll > try to describe the behavior as I understand it. Please correct me if > I'm wrong... > > So in effect each task alternates between NORMAL and ENFORCED processing > modes. > > It starts in NORMAL mode where it is performing a kind of "merge" step > (as in merge sort) among assigned partitions where records in a > particular partition are assumed to be already sorted by timestamp. When > any of the partitions drains, the task pauses (and doesn't process > messages even if they have already been received by last poll call) and > waits for drained partitions to start delivering messages so each > partition has a candidate message to choose from before continue-ing. > The important point as I see is that the merging process can only > progress in NORMAL mode when it can choose the earliest message among > all the partitions assigned to the task. If when pausing, timeout kicks > in, the task enters ENFORCED processing mode. > > In ENFORCED processing mode, all the partitions are un-paused again and > the merging continues, but it merges a chunk of messages from a > particular poll at a time before polling for next chunk. If at some > point all the partitions have a candidate message in a particular polled > chunk, it reverts back to NORMAL processing mode... > > Is my understanding correct? > > Thanks, Peter > > On 1/16/19 8:15 AM, Matthias J. Sax wrote: >> The parameter applies too all three topics (input, intermediate, >> repartitions topics) and it's a global config. >> >> About the blocking behavior: >> >> If one partitions becomes empty, all other partitions are paused() and >> Streams only poll() for the empty partition. If no data is returned >> within the timeout, processing of enforced for this task for all >> partitions. >> >> The task will stay in the "enforced processing state" until all >> partitions deliver data again (at the same time). >> >> If a second partition becomes empty, no additional delay is applied. >> >> Hope this answers your question. >> >> >> -Matthias >> >> >> On 1/15/19 2:07 AM, Peter Levart wrote: >>> Another question about this parameter. >>> >>> Does that parameter apply just to input topics to the KafkaStreams >>> topology or also to intermediate (repartitioning) topics or to >>> intermediate topics configured with KStream.through() directive? >>> >>> Is it possible to control the behavior on a per-topic basis? >>> >>> Thanks, >>> >>> Peter >>> >>> On 1/15/19 11:00 AM, Peter Levart wrote: >>>> Hello! >>>> >>>> I'm trying to understand the behavior of Kafka Streams consumers with >>>> regards to max.task.idle.ms configuration parameter (default 0). The >>>> documentation says: >>>> >>>> max.task.idle.ms Medium Maximum amount of time a stream task >>>> will stay idle when not all of its partition buffers contain records. >>>> >>>> Suppose an input topic to a streams application has multiple >>>> partitions and that traffic arrives to this input topic in bursts. At >>>> first all partitions will be filled with records and all tasks will be >>>> busy. Then some partition will get drained first. The task with >>>> drained partition will bet paused for max.task.idle.ms time. >>>> >>>> Question: How often will the task with drained partition pause for >>>> max.task.idle.ms time due to missing records in a partition? >>>> >>>> 1. Before every record? >>>> 2. At each KafkaConsumer.poll() that returns any records? >>>> 3. Just the 1st time some partition is detected to have no polled >>>> records, but then this partition will get "blacklisted" from further >>>> checks for pausing until it again receives a record. >>>> >>>> For example, if the task is assigned N partitions, what is the max. # >>>> of times the task will pause for max.task.idle.ms time when the input >>>> burst is over: >>>> >>>> The wishful answer is N-1 times. Is this so? >>>> >>>> Regards, Peter >>>> >
signature.asc
Description: OpenPGP digital signature