[ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324187#comment-15324187 ]
Yuto Kawamura commented on KAFKA-3775: -------------------------------------- Sorry for leaving this discussion for a while and thanks for all your comments. I was busy for other work before half of this week and just yesterday I finished deploying my Kafka Streams app on production. I've got some experience through that so let me summarize it here. So the initial configuration at the first moment was look like this: {code} poll.ms=100(default) buffered.records.per.partition=1000(default) max.poll.records=Integer.MAX_VALUE(default) max.partition.fetch.bytes=1MiB(default) num.stream.threads=1 {code} First I tried to take a heapdump of the Kafka Streams process just before it dies. Then I found that there's 918772 instances of {{ConsumerRecord}}(most of them should be already a garbage as I took the heapdump with -F switch) which consumes more than 500MB of heap with it's referenced byte array at the moment. There was no other significant usage of heap by other objects(which are irrelevant to ConsumerRecord) so I'm sure this was causing OOM of my app. So I tried several configuration adjustment to avoid OOM. Here's the list I've tried: - Decrease {{buffered.records.per.partition}} from 1000(default) to 10 => No luck. Still OOM. - Decrease {{max.partition.fetch.bytes}} => Couldn't as we allow 1MiB size of message at maximum. - Decrease {{max.poll.records}} from Integer.MAX_VALUE(default) to 10 => Worked. No more OOM. Therefore by decreasing {{max.poll.records}} my application stop dying by OOM. Before that on each poll() invocation it might returned all records fetched for each partition so the memory could be exhausted very easy(I was misunderstanding about this point; I was thinking that poll() is never called as long as all tasks keep records more than {{buffered.records.per.partition}} but it was called continually in fact regardless to that because of {{poll.ms}} expiration). Network traffic increased about 300Mbps on that host but still not problematic ATM as the throughput was likely throttled by the single thread({{num.stream.threads=1}}). After the all instances are up I confirmed that the total throughput isn't enough as I saw the consumption lag keep increasing. I increased the {{num.stream.threads}} up to 3 and did the same deployment again(I know that I could perform rolling restart but just wanted to see what will happen with increased number of threads). So again, first instance survived without OOM but this time the traffic on the NIC increased about 600Mbps which was almost critical level on our network spec. As I started rest of instances, all partitions are distributed equally and now they are running pretty well. So my conclusion is: - Decreasing {{max.poll.records}} to the proper value works in terms of OOM. Still it's not intuitive that it controls memory pressure as the heap usage throttling is just a side effect of this adjustment(it's not for this purpose but for adjusting interval to call {{consumer.poll()}} within proper moment to avoid assignment expiration IIUC). - Still couldn't throttle the network traffic. As I wrote above, when I started a single instance with giving {{num.stream.threads=3}}, the traffic on a NIC of that host reached it's maximum capacity(1Gbps) while it's on catch up read. This could be serious in terms of packet dropping as we're deploying other service daemons on the same node. - I'm still not certain what is the best way of doing it but I believe it's worthful if we have an option to throttle the maximum number of incoming messages to a single instance(or in other word, the maximum capability of single KafkaStreams instance) regarding both memory pressure and traffic. So I'm basically +1 on idea that [~jkreps] suggested(global memory allocation throttling) but still wondering what you can suggest me an option for throttling the network traffic. And about PartitionGrouper: So it can be used to reduce the number of target tasks but that can't be changed w/o rewriting configuration(to revert partition.grouper) and restarting an instance right? If so, that's too cumbersome to perform such a 2-step deployment. First I have to deploy a single instance of custom {{partition.grouper}}, then deploy rest of instances, and finally revert the configuration and deploy again the first instance? No way :( > Throttle maximum number of tasks assigned to a single KafkaStreams > ------------------------------------------------------------------ > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 0.10.0.0 > Reporter: Yuto Kawamura > Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which > consists of single KafkaStreams instance, that instance gets all partitions > of the target topic assigned. > As we're using it to process topics which has huge number of partitions and > message traffic, it is a problem that we don't have a way of throttling the > maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which has > more than 10MB/sec traffic of each partition we saw that all partitions > assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > => Maybe works. but as Kafka Streams is a library but not an execution > framework, there's no predefined procedure of starting Kafka Streams apps so > some users might wanna take an option to start the first single instance and > check if it works as expected with lesster number of partitions(I want :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, > {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap > pressure. > => Maybe works. but still have two problems IMO: > - Still leads traffic explosion with high throughput processing as it > accepts all incoming messages from hundreads of partitions. > - In the first place, by the distributed system principle, it's wired that > users don't have a away to control maximum "partitions" assigned to a single > shard(an instance of KafkaStreams here). Users should be allowed to provide > the maximum amount of partitions that is considered as possible to be > processed with single instance(or host). > Here, I'd like to introduce a new configuration parameter > {{max.tasks.assigned}}, which limits the number of tasks(a notion of > partition) assigned to the processId(which is the notion of single > KafkaStreams instance). > At the same time we need to change StreamPartitionAssignor(TaskAssignor) to > tolerate the incomplete assignment. That is, Kafka Streams should continue > working for the part of partitions even there are some partitions left > unassigned, in order to satisfy this> "user may want to take an option to > start the first single instance and check if it works as expected with > lesster number of partitions(I want :p)". > I've implemented the rough POC for this. PTAL and if it make sense I will > continue sophisticating it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)