[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324187#comment-15324187
 ] 

Yuto Kawamura commented on KAFKA-3775:
--------------------------------------

Sorry for leaving this discussion for a while and thanks for all your comments. 
I was busy for other work before half of this week and just yesterday I 
finished deploying my Kafka Streams app on production. I've got some experience 
through that so let me summarize it here.

So the initial configuration at the first moment was look like this:
{code}
poll.ms=100(default)
buffered.records.per.partition=1000(default)
max.poll.records=Integer.MAX_VALUE(default)
max.partition.fetch.bytes=1MiB(default)
num.stream.threads=1
{code}

First I tried to take a heapdump of the Kafka Streams process just before it 
dies. Then I found that there's 918772 instances of {{ConsumerRecord}}(most of 
them should be already a garbage as I took the heapdump with -F switch) which 
consumes more than 500MB of heap with it's referenced byte array at the moment. 
There was no other significant usage of heap by other objects(which are 
irrelevant to ConsumerRecord) so I'm sure this was causing OOM of my app.

So I tried several configuration adjustment to avoid OOM. Here's the list I've 
tried:

- Decrease {{buffered.records.per.partition}} from 1000(default) to 10 => No 
luck. Still OOM.
- Decrease {{max.partition.fetch.bytes}} => Couldn't as we allow 1MiB size of 
message at maximum.
- Decrease {{max.poll.records}} from Integer.MAX_VALUE(default) to 10 => 
Worked. No more OOM.

Therefore by decreasing {{max.poll.records}} my application stop dying by OOM. 
Before that on each poll() invocation it might returned all records fetched for 
each partition so the memory could be exhausted very easy(I was 
misunderstanding about this point; I was thinking that poll() is never called 
as long as all tasks keep records more than {{buffered.records.per.partition}} 
but it was called continually in fact regardless to that because of {{poll.ms}} 
expiration).
Network traffic increased about 300Mbps on that host but still not problematic 
ATM as the throughput was likely throttled by the single 
thread({{num.stream.threads=1}}).

After the all instances are up I confirmed that the total throughput isn't 
enough as I saw the consumption lag keep increasing. I increased the 
{{num.stream.threads}} up to 3 and did the same deployment again(I know that I 
could perform rolling restart but just wanted to see what will happen with 
increased number of threads).
So again, first instance survived without OOM but this time the traffic on the 
NIC increased about 600Mbps which was almost critical level on our network 
spec. As I started rest of instances, all partitions are distributed equally 
and now they are running pretty well.

So my conclusion is:

- Decreasing {{max.poll.records}} to the proper value works in terms of OOM. 
Still it's not intuitive that it controls memory pressure as the heap usage 
throttling is just a side effect of this adjustment(it's not for this purpose 
but for adjusting interval to call {{consumer.poll()}} within proper moment to 
avoid assignment expiration IIUC).
- Still couldn't throttle the network traffic. As I wrote above, when I started 
a single instance with giving {{num.stream.threads=3}}, the traffic on a NIC of 
that host reached it's maximum capacity(1Gbps) while it's on catch up read. 
This could be serious in terms of packet dropping as we're deploying other 
service daemons on the same node.
- I'm still not certain what is the best way of doing it but I believe it's 
worthful if we have an option to throttle the maximum number of incoming 
messages to a single instance(or in other word, the maximum capability of 
single KafkaStreams instance) regarding both memory pressure and traffic. So 
I'm basically +1 on idea that [~jkreps] suggested(global memory allocation 
throttling) but still wondering what you can suggest me an option for 
throttling the network traffic.

And about PartitionGrouper:
So it can be used to reduce the number of target tasks but that can't be 
changed w/o rewriting configuration(to revert partition.grouper) and restarting 
an instance right?
If so, that's too cumbersome to perform such a 2-step deployment. First I have 
to deploy a single instance of custom {{partition.grouper}}, then deploy rest 
of instances, and finally revert the configuration and deploy again the first 
instance? No way :(


> Throttle maximum number of tasks assigned to a single KafkaStreams
> ------------------------------------------------------------------
>
>                 Key: KAFKA-3775
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3775
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.10.0.0
>            Reporter: Yuto Kawamura
>            Assignee: Yuto Kawamura
>             Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to