That sounds correct.

Just to avoid confusion: we don't use the term "pause" for tasks, but
"idle", because consumers can "pause/resume" partitions what is
something different.

In fact, if a task is idle the underlying empty partitions are not
paused, because we hope to get data from them before the timeout is
exceeded.


-Matthias

On 1/18/19 2:17 AM, Peter Levart wrote:
> Hi Matthias,
> 
> Thank you for the clarifications. May I ask about some more detail? I'll
> try to describe the behavior as I understand it. Please correct me if
> I'm wrong...
> 
> So in effect each task alternates between NORMAL and ENFORCED processing
> modes.
> 
> It starts in NORMAL mode where it is performing a kind of "merge" step
> (as in merge sort) among assigned partitions where records in a
> particular partition are assumed to be already sorted by timestamp. When
> any of the partitions drains, the task pauses (and doesn't process
> messages even if they have already been received by last poll call) and
> waits for drained partitions to start delivering messages so each
> partition has a candidate message to choose from before continue-ing.
> The important point as I see is that the merging process can only
> progress in NORMAL mode when it can choose the earliest message among
> all the partitions assigned to the task. If when pausing, timeout kicks
> in, the task enters ENFORCED processing mode.
> 
> In ENFORCED processing mode, all the partitions are un-paused again and
> the merging continues, but it merges a chunk of messages from a
> particular poll at a time before polling for next chunk. If at some
> point all the partitions have a candidate message in a particular polled
> chunk, it reverts back to NORMAL processing mode...
> 
> Is my understanding correct?
> 
> Thanks, Peter
> 
> On 1/16/19 8:15 AM, Matthias J. Sax wrote:
>> The parameter applies too all three topics (input, intermediate,
>> repartitions topics) and it's a global config.
>>
>> About the blocking behavior:
>>
>> If one partitions becomes empty, all other partitions are paused() and
>> Streams only poll() for the empty partition. If no data is returned
>> within the timeout, processing of enforced for this task for all
>> partitions.
>>
>> The task will stay in the "enforced processing state" until all
>> partitions deliver data again (at the same time).
>>
>> If a second partition becomes empty, no additional delay is applied.
>>
>> Hope this answers your question.
>>
>>
>> -Matthias
>>
>>
>> On 1/15/19 2:07 AM, Peter Levart wrote:
>>> Another question about this parameter.
>>>
>>> Does that parameter apply just to input topics to the KafkaStreams
>>> topology or also to intermediate (repartitioning) topics or to
>>> intermediate topics configured with KStream.through() directive?
>>>
>>> Is it possible to control the behavior on a per-topic basis?
>>>
>>> Thanks,
>>>
>>> Peter
>>>
>>> On 1/15/19 11:00 AM, Peter Levart wrote:
>>>> Hello!
>>>>
>>>> I'm trying to understand the behavior of Kafka Streams consumers with
>>>> regards to max.task.idle.ms configuration parameter (default 0). The
>>>> documentation says:
>>>>
>>>> max.task.idle.ms     Medium     Maximum amount of time a stream task
>>>> will stay idle when not all of its partition buffers contain records.
>>>>
>>>> Suppose an input topic to a streams application has multiple
>>>> partitions and that traffic arrives to this input topic in bursts. At
>>>> first all partitions will be filled with records and all tasks will be
>>>> busy. Then some partition will get drained first. The task with
>>>> drained partition will bet paused for max.task.idle.ms time.
>>>>
>>>> Question: How often will the task with drained partition pause for
>>>> max.task.idle.ms time due to missing records in a partition?
>>>>
>>>> 1. Before every record?
>>>> 2. At each KafkaConsumer.poll() that returns any records?
>>>> 3. Just the 1st time some partition is detected to have no polled
>>>> records, but then this partition will get "blacklisted" from further
>>>> checks for pausing until it again receives a record.
>>>>
>>>> For example, if the task is assigned N partitions, what is the max. #
>>>> of times the task will pause for max.task.idle.ms time when the input
>>>> burst is over:
>>>>
>>>> The wishful answer is N-1 times. Is this so?
>>>>
>>>> Regards, Peter
>>>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to