[ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15311836#comment-15311836 ]
ASF GitHub Bot commented on KAFKA-3775: --------------------------------------- GitHub user kawamuray opened a pull request: https://github.com/apache/kafka/pull/1460 KAFKA-3775: Throttle maximum number of tasks assigned to a single KafkaStreams Issue: https://issues.apache.org/jira/browse/KAFKA-3775 POC. Discussion in progress. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kawamuray/kafka KAFKA-3775-throttle-tasks Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1460.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1460 ---- commit fefe259b2c97bb1bbf14b572533ca74348651c0d Author: Yuto Kawamura <kawamuray.dad...@gmail.com> Date: 2016-06-02T03:46:51Z MINOR: Add toString() to ClientState for debugging commit c4f363d32d9a496c0f4b4e66ee846429a2a2eda5 Author: Yuto Kawamura <kawamuray.dad...@gmail.com> Date: 2016-06-02T03:51:34Z MINOR: Remove meanglessly repeated assertions in unit test commit 3c173fa5d029277e5d1974c104d7e66939b5cd17 Author: Yuto Kawamura <kawamuray.dad...@gmail.com> Date: 2016-06-02T03:55:10Z KAFKA-3775: Intorduce new streams configuration max.tasks.assigned This configuration limits the maximum number of tasks assigned to a single KafkaStreams instance. As a task consists of single partition for more than 1 topic, setting this value to lower is useful to prevent huge number of partitions are assigned to an instance which started first. ---- > Throttle maximum number of tasks assigned to a single KafkaStreams > ------------------------------------------------------------------ > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 0.10.0.0 > Reporter: Yuto Kawamura > Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which > consists of single KafkaStreams instance, that instance gets all partitions > of the target topic assigned. > As we're using it to process topics which has huge number of partitions and > message traffic, it is a problem that we don't have a way of throttling the > maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which has > more than 10MB/sec traffic of each partition we saw that all partitions > assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > => Maybe works. but as Kafka Streams is a library but not an execution > framework, there's no predefined procedure of starting Kafka Streams apps so > some users might wanna take an option to start the first single instance and > check if it works as expected with lesster number of partitions(I want :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, > {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap > pressure. > => Maybe works. but still have two problems IMO: > - Still leads traffic explosion with high throughput processing as it > accepts all incoming messages from hundreads of partitions. > - In the first place, by the distributed system principle, it's wired that > users don't have a away to control maximum "partitions" assigned to a single > shard(an instance of KafkaStreams here). Users should be allowed to provide > the maximum amount of partitions that is considered as possible to be > processed with single instance(or host). > Here, I'd like to introduce a new configuration parameter > {{max.tasks.assigned}}, which limits the number of tasks(a notion of > partition) assigned to the processId(which is the notion of single > KafkaStreams instance). > At the same time we need to change StreamPartitionAssignor(TaskAssignor) to > tolerate the incomplete assignment. That is, Kafka Streams should continue > working for the part of partitions even there are some partitions left > unassigned, in order to satisfy this> "user may want to take an option to > start the first single instance and check if it works as expected with > lesster number of partitions(I want :p)". > I've implemented the rough POC for this. PTAL and if it make sense I will > continue sophisticating it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)