Seems you need to increase the memory to give to the JVM via command line option `-Xmx` ?
-Matthias
On 9/11/19 4:19 AM, Prakash Y wrote:
> Hi,
>
> I am using the hopping time window in Kafka streams with suppress option. I
> am facing memory issues when the Kafka application is running continuously
> for 2 to 3 days of deployment without any restart on the machine.
>
> We are using hopping window and buffer size as 1GB.
>
> We are getting there exceptions very frequently, and stream processing is
> getting failed because of these exceptions. Please help us in resolving this
> issue.
>
> Kafka Version: 2.1.0
>
> I am adding the POC code below
>
> KTable<Windowed<String>, OurObject> cdstream = source
> .groupByKey()
> .windowedBy(TimeWindows.of(Duration.ofHours(4)).advanceBy(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1)))
> .aggregate(() -> new OurObject(), //initializer
> (aggKey, newValue, aggValue) -> {
> aggValue.addNew(newValue);
> return aggValue;
> }, //adder
> Materialized.as("windowed_aggregated_store")) //state store name
> .suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(1024 * 1024 *
> 1024).shutDownWhenFull()));
>
>
> The log file contents are:
>
>
> Successfully started cd stream listner ....
> MBR-0x01:Consumer Consumer_metric_data-cd_metric_worker_2adc3_0 is ready,
> Consuming the messages...MBR-0x01:Consumer
> Consumer_metric_data-cd_metric_worker_2adc3_1 is ready, Consuming the
> messages...MBR-0x01:Consumer
> Consumer_cache-key-delete-queue-cache_key_delete_2adc3_2adc3-vjp43_0 is
> ready, Consuming the messages...
> Exception: java.lang.OutOfMemoryError thrown from the
> UncaughtExceptionHandler in thread "kafka-producer-network-thread |
> producer-3"
> Exception in thread
> "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-2"
> java.lang.OutOfMemoryError: Java heap space
> at
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:249)
> at
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer$$Lambda$389/0x0000000800bd2840.restoreBatch(Unknown
> Source)
> at
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
> at
> org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:83)
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:310)
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
> Exception in thread
> "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-8"
> java.lang.OutOfMemoryError: Java heap space
> Exception in thread
> "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-7"
> java.lang.OutOfMemoryError: Java heap space
> at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61)
> at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348)
> at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
> at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
> at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381)
> at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342)
> at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609)
> at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1243)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:88)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
> Exception in thread
> "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-10"
> java.lang.OutOfMemoryError: Java heap space
> Exception in thread
> "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-4"
> java.lang.OutOfMemoryError: Java heap space
> Exception in thread
> "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-9"
> java.lang.OutOfMemoryError: Java heap space
> Exception in thread
> "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-6"
> java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1069)
> Caused by: java.lang.OutOfMemoryError: Java heap space
> Exception in thread
> "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-5"
> java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1069)
> Caused by: java.lang.OutOfMemoryError: Java heap space
> Exception in thread
> "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-3"
> java.lang.OutOfMemoryError: Java heap space
> Exception in thread
> "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-1"
> java.lang.OutOfMemoryError: Java heap space
> -----
>
> Thanks,
> Prakash.
>
signature.asc
Description: OpenPGP digital signature
