[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15311836#comment-15311836
 ] 

ASF GitHub Bot commented on KAFKA-3775:
---------------------------------------

GitHub user kawamuray opened a pull request:

    https://github.com/apache/kafka/pull/1460

    KAFKA-3775: Throttle maximum number of tasks assigned to a single 
KafkaStreams

    Issue: https://issues.apache.org/jira/browse/KAFKA-3775
    
    POC. Discussion in progress.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kawamuray/kafka KAFKA-3775-throttle-tasks

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/1460.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1460
    
----
commit fefe259b2c97bb1bbf14b572533ca74348651c0d
Author: Yuto Kawamura <kawamuray.dad...@gmail.com>
Date:   2016-06-02T03:46:51Z

    MINOR: Add toString() to ClientState for debugging

commit c4f363d32d9a496c0f4b4e66ee846429a2a2eda5
Author: Yuto Kawamura <kawamuray.dad...@gmail.com>
Date:   2016-06-02T03:51:34Z

    MINOR: Remove meanglessly repeated assertions in unit test

commit 3c173fa5d029277e5d1974c104d7e66939b5cd17
Author: Yuto Kawamura <kawamuray.dad...@gmail.com>
Date:   2016-06-02T03:55:10Z

    KAFKA-3775: Intorduce new streams configuration max.tasks.assigned
    
    This configuration limits the maximum number of tasks assigned to a single 
KafkaStreams instance.
    As a task consists of single partition for more than 1 topic, setting this 
value to lower is useful
    to prevent huge number of partitions are assigned to an instance which 
started first.

----


> Throttle maximum number of tasks assigned to a single KafkaStreams
> ------------------------------------------------------------------
>
>                 Key: KAFKA-3775
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3775
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.10.0.0
>            Reporter: Yuto Kawamura
>            Assignee: Yuto Kawamura
>             Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to