Hi Matthias,

Thank you for the clarifications. May I ask about some more detail? I'll try to describe the behavior as I understand it. Please correct me if I'm wrong...

So in effect each task alternates between NORMAL and ENFORCED processing modes.

It starts in NORMAL mode where it is performing a kind of "merge" step (as in merge sort) among assigned partitions where records in a particular partition are assumed to be already sorted by timestamp. When any of the partitions drains, the task pauses (and doesn't process messages even if they have already been received by last poll call) and waits for drained partitions to start delivering messages so each partition has a candidate message to choose from before continue-ing. The important point as I see is that the merging process can only progress in NORMAL mode when it can choose the earliest message among all the partitions assigned to the task. If when pausing, timeout kicks in, the task enters ENFORCED processing mode.

In ENFORCED processing mode, all the partitions are un-paused again and the merging continues, but it merges a chunk of messages from a particular poll at a time before polling for next chunk. If at some point all the partitions have a candidate message in a particular polled chunk, it reverts back to NORMAL processing mode...

Is my understanding correct?

Thanks, Peter

On 1/16/19 8:15 AM, Matthias J. Sax wrote:
The parameter applies too all three topics (input, intermediate,
repartitions topics) and it's a global config.

About the blocking behavior:

If one partitions becomes empty, all other partitions are paused() and
Streams only poll() for the empty partition. If no data is returned
within the timeout, processing of enforced for this task for all partitions.

The task will stay in the "enforced processing state" until all
partitions deliver data again (at the same time).

If a second partition becomes empty, no additional delay is applied.

Hope this answers your question.


-Matthias


On 1/15/19 2:07 AM, Peter Levart wrote:
Another question about this parameter.

Does that parameter apply just to input topics to the KafkaStreams
topology or also to intermediate (repartitioning) topics or to
intermediate topics configured with KStream.through() directive?

Is it possible to control the behavior on a per-topic basis?

Thanks,

Peter

On 1/15/19 11:00 AM, Peter Levart wrote:
Hello!

I'm trying to understand the behavior of Kafka Streams consumers with
regards to max.task.idle.ms configuration parameter (default 0). The
documentation says:

max.task.idle.ms     Medium     Maximum amount of time a stream task
will stay idle when not all of its partition buffers contain records.

Suppose an input topic to a streams application has multiple
partitions and that traffic arrives to this input topic in bursts. At
first all partitions will be filled with records and all tasks will be
busy. Then some partition will get drained first. The task with
drained partition will bet paused for max.task.idle.ms time.

Question: How often will the task with drained partition pause for
max.task.idle.ms time due to missing records in a partition?

1. Before every record?
2. At each KafkaConsumer.poll() that returns any records?
3. Just the 1st time some partition is detected to have no polled
records, but then this partition will get "blacklisted" from further
checks for pausing until it again receives a record.

For example, if the task is assigned N partitions, what is the max. #
of times the task will pause for max.task.idle.ms time when the input
burst is over:

The wishful answer is N-1 times. Is this so?

Regards, Peter


Reply via email to