Seems you need to increase the memory to give to the JVM via command line option `-Xmx` ?
-Matthias On 9/11/19 4:19 AM, Prakash Y wrote: > Hi, > > I am using the hopping time window in Kafka streams with suppress option. I > am facing memory issues when the Kafka application is running continuously > for 2 to 3 days of deployment without any restart on the machine. > > We are using hopping window and buffer size as 1GB. > > We are getting there exceptions very frequently, and stream processing is > getting failed because of these exceptions. Please help us in resolving this > issue. > > Kafka Version: 2.1.0 > > I am adding the POC code below > > KTable<Windowed<String>, OurObject> cdstream = source > .groupByKey() > .windowedBy(TimeWindows.of(Duration.ofHours(4)).advanceBy(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1))) > .aggregate(() -> new OurObject(), //initializer > (aggKey, newValue, aggValue) -> { > aggValue.addNew(newValue); > return aggValue; > }, //adder > Materialized.as("windowed_aggregated_store")) //state store name > .suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(1024 * 1024 * > 1024).shutDownWhenFull())); > > > The log file contents are: > > > Successfully started cd stream listner .... > MBR-0x01:Consumer Consumer_metric_data-cd_metric_worker_2adc3_0 is ready, > Consuming the messages...MBR-0x01:Consumer > Consumer_metric_data-cd_metric_worker_2adc3_1 is ready, Consuming the > messages...MBR-0x01:Consumer > Consumer_cache-key-delete-queue-cache_key_delete_2adc3_2adc3-vjp43_0 is > ready, Consuming the messages... > Exception: java.lang.OutOfMemoryError thrown from the > UncaughtExceptionHandler in thread "kafka-producer-network-thread | > producer-3" > Exception in thread > "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-2" > java.lang.OutOfMemoryError: Java heap space > at > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:249) > at > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer$$Lambda$389/0x0000000800bd2840.restoreBatch(Unknown > Source) > at > org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89) > at > org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:83) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:310) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92) > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) > Exception in thread > "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-8" > java.lang.OutOfMemoryError: Java heap space > Exception in thread > "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-7" > java.lang.OutOfMemoryError: Java heap space > at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61) > at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348) > at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112) > at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381) > at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342) > at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541) > at org.apache.kafka.common.network.Selector.poll(Selector.java:467) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1243) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:88) > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) > Exception in thread > "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-10" > java.lang.OutOfMemoryError: Java heap space > Exception in thread > "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-4" > java.lang.OutOfMemoryError: Java heap space > Exception in thread > "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-9" > java.lang.OutOfMemoryError: Java heap space > Exception in thread > "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-6" > java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1069) > Caused by: java.lang.OutOfMemoryError: Java heap space > Exception in thread > "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-5" > java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1069) > Caused by: java.lang.OutOfMemoryError: Java heap space > Exception in thread > "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-3" > java.lang.OutOfMemoryError: Java heap space > Exception in thread > "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-1" > java.lang.OutOfMemoryError: Java heap space > ----- > > Thanks, > Prakash. >
signature.asc
Description: OpenPGP digital signature