Yuto Kawamura created KAFKA-3775:
------------------------------------

             Summary: Throttle maximum number of tasks assigned to a single 
KafkaStreams
                 Key: KAFKA-3775
                 URL: https://issues.apache.org/jira/browse/KAFKA-3775
             Project: Kafka
          Issue Type: Improvement
          Components: streams
    Affects Versions: 0.10.0.0
            Reporter: Yuto Kawamura
            Assignee: Yuto Kawamura
             Fix For: 0.10.1.0


As of today, if I start a Kafka Streams app on a single machine which consists 
of single KafkaStreams instance, that instance gets all partitions of the 
target topic assigned.
As we're using it to process topics which has huge number of partitions and 
message traffic, it is a problem that we don't have a way of throttling the 
maximum amount of partitions assigned to a single instance.

In fact, when we started a Kafka Streams app which consumes a topic which has 
more than 10MB/sec traffic of each partition we saw that all partitions 
assigned to the first instance and soon the app dead by OOM.
I know that there's some workarounds considerable here. for example:

- Start multiple instances at once so the partitions distributed evenly.
  => Maybe works. but as Kafka Streams is a library but not an execution 
framework, there's no predefined procedure of starting Kafka Streams apps so 
some users might wanna take an option to start the first single instance and 
check if it works as expected with lesster number of partitions(I want :p)
- Adjust config parameters such as {{buffered.records.per.partition}}, 
{{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
pressure.
  => Maybe works. but still have two problems IMO:
  - Still leads traffic explosion with high throughput processing as it accepts 
all incoming messages from hundreads of partitions.
  - In the first place, by the distributed system principle, it's wired that 
users don't have a away to control maximum "partitions" assigned to a single 
shard(an instance of KafkaStreams here). Users should be allowed to provide the 
maximum amount of partitions that is considered as possible to be processed 
with single instance(or host).

Here, I'd like to introduce a new configuration parameter 
{{max.tasks.assigned}}, which limits the number of tasks(a notion of partition) 
assigned to the processId(which is the notion of single KafkaStreams instance).
At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
tolerate the incomplete assignment. That is, Kafka Streams should continue 
working for the part of partitions even there are some partitions left 
unassigned, in order to satisfy this> "user may want to take an option to start 
the first single instance and check if it works as expected with lesster number 
of partitions(I want :p)".

I've implemented the rough POC for this. PTAL and if it make sense I will 
continue sophisticating it.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to