Seems you need to increase the memory to give to the JVM via command
line option `-Xmx` ?


-Matthias

On 9/11/19 4:19 AM, Prakash Y wrote:
> Hi,
> 
> I am using the hopping time window in Kafka streams with suppress option. I 
> am facing memory issues when the Kafka application is running continuously 
> for 2 to 3 days of deployment without any restart on the machine.
> 
> We are using hopping window and buffer size as 1GB.
> 
> We are getting there exceptions very frequently, and stream processing is 
> getting failed because of these exceptions. Please help us in resolving this 
> issue.
> 
> Kafka Version: 2.1.0
> 
> I am adding the POC code below
> 
> KTable<Windowed<String>, OurObject> cdstream = source
> .groupByKey()
> .windowedBy(TimeWindows.of(Duration.ofHours(4)).advanceBy(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1)))
> .aggregate(() -> new OurObject(),  //initializer
> (aggKey, newValue, aggValue) -> {
> aggValue.addNew(newValue);
> return aggValue;
> },  //adder
> Materialized.as("windowed_aggregated_store"))  //state store name
> .suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(1024 * 1024 * 
> 1024).shutDownWhenFull()));
> 
> 
> The log file contents are:
> 
> 
> Successfully started cd stream listner ....
> MBR-0x01:Consumer Consumer_metric_data-cd_metric_worker_2adc3_0 is ready, 
> Consuming the messages...MBR-0x01:Consumer 
> Consumer_metric_data-cd_metric_worker_2adc3_1 is ready, Consuming the 
> messages...MBR-0x01:Consumer 
> Consumer_cache-key-delete-queue-cache_key_delete_2adc3_2adc3-vjp43_0 is 
> ready, Consuming the messages...
> Exception: java.lang.OutOfMemoryError thrown from the 
> UncaughtExceptionHandler in thread "kafka-producer-network-thread | 
> producer-3"
> Exception in thread 
> "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-2" 
> java.lang.OutOfMemoryError: Java heap space
> at 
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:249)
> at 
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer$$Lambda$389/0x0000000800bd2840.restoreBatch(Unknown
>  Source)
> at 
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
> at 
> org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:83)
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:310)
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
> Exception in thread 
> "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-8" 
> java.lang.OutOfMemoryError: Java heap space
> Exception in thread 
> "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-7" 
> java.lang.OutOfMemoryError: Java heap space
> at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61)
> at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348)
> at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
> at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
> at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381)
> at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342)
> at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1243)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:88)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
> Exception in thread 
> "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-10" 
> java.lang.OutOfMemoryError: Java heap space
> Exception in thread 
> "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-4" 
> java.lang.OutOfMemoryError: Java heap space
> Exception in thread 
> "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-9" 
> java.lang.OutOfMemoryError: Java heap space
> Exception in thread 
> "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-6" 
> java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1069)
> Caused by: java.lang.OutOfMemoryError: Java heap space
> Exception in thread 
> "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-5" 
> java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1069)
> Caused by: java.lang.OutOfMemoryError: Java heap space
> Exception in thread 
> "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-3" 
> java.lang.OutOfMemoryError: Java heap space
> Exception in thread 
> "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-1" 
> java.lang.OutOfMemoryError: Java heap space
> -----
> 
> Thanks,
> Prakash.
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to