Hello Dima,

The current consumer does not have explicit memory control mechanism, but
you can try to indirectly bound the memory usage via the following configs:
fetch.message.max.bytes and queued.max.message.chunks. Details can be found
at http://kafka.apache.org/documentation.html#consumerconfigs

As for your problem, I'm wondering are your messages being compressed.
There are known issues with the current decompression logic on the consumer
that it may allocate large chunks of memory unnecessarily and cause OOME
(KAFKA-527).

Guozhang

On Tue, Mar 17, 2015 at 3:10 AM, Dima Dimas <idi...@gmail.com> wrote:

> Hi
>
> I face to OOME while trying to consume from one topic 10 partitions (100
> 000 messages each partition) 5 consumers(consumer groups),
> consumer.timeout=10ms. OOME was gotten after 1-2 minutes after start.
> Java heap - Xms=1024M
> LAN about 10Gbit
> This is standalone application.
>
> Kafka version 0.8.2
>
> Messages have about 5-10kB size each.
> Before OOME consumers received from 5 000 to 30 000 messages per request
> from one topic.
> Each consumer reads from different topics.
>
> Part of the code(manually handle offsets) :
>
>  Map<String, Integer> streamCounts = Collections.singletonMap(topic, 1);
>         ConsumerConnector connector =
> consumer.createJavaConsumerConnector(consumerConf);
>
> Map<String, List<KafkaStream<byte[], byte[]>>> streams =
> connector.createMessageStreams(streamCounts);
> //read in one stream
> KafkaStream<byte[], byte[]> stream = streams.get(topic).get(0);
> //read from stream in infinity loop
> while(true){
>          try {
>             for (MessageAndMetadata<byte[], byte[]> messageAndMetadata :
> stream) {
>                 Message msg = new Message(messageAndMetadata);
>                 messages.add(msg);
>             }
>         } catch (ConsumerTimeoutException ignore) {
>             // throws every time when Kafka consumer timeout was reached
>         }
> //some logic
>
> //commit offsets
>
> //thread sleep 1 sec
> }
>
> //close connection
>
>
>
> Stacktrace log
>
> [[03/06/2015 13:23:56
>
> [ConsumerFetcherThread-preprocessorTopicReporting_mo-host-1425648218660-2d7f632b-0-5]
> ERROR kafka.network.BoundedByteBufferReceive - OOME with size 1048612
>                    2097206
> java.lang.OutOfMemoryError: Java heap space
>         at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>         at java.nio.ByteBuffer.allocate(ByteBuffer.java:329)
>         at
>
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>         at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>         at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>         at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108)
>         at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
>         at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111)
>         at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:97)
>         at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
> *Q1: How to solve this issue?*
> *Q2: What can be the root cause?*
> *Q3: How can i control memory allocating for each consumer?*
>
> *Thanks*
> *Dima.*
>



-- 
-- Guozhang

Reply via email to