Hi, I am using the hopping time window in Kafka streams with suppress option. I am facing memory issues when the Kafka application is running continuously for 2 to 3 days of deployment without any restart on the machine.
We are using hopping window and buffer size as 1GB. We are getting there exceptions very frequently, and stream processing is getting failed because of these exceptions. Please help us in resolving this issue. Kafka Version: 2.1.0 I am adding the POC code below KTable<Windowed<String>, OurObject> cdstream = source .groupByKey() .windowedBy(TimeWindows.of(Duration.ofHours(4)).advanceBy(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1))) .aggregate(() -> new OurObject(), //initializer (aggKey, newValue, aggValue) -> { aggValue.addNew(newValue); return aggValue; }, //adder Materialized.as("windowed_aggregated_store")) //state store name .suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(1024 * 1024 * 1024).shutDownWhenFull())); The log file contents are: Successfully started cd stream listner .... MBR-0x01:Consumer Consumer_metric_data-cd_metric_worker_2adc3_0 is ready, Consuming the messages...MBR-0x01:Consumer Consumer_metric_data-cd_metric_worker_2adc3_1 is ready, Consuming the messages...MBR-0x01:Consumer Consumer_cache-key-delete-queue-cache_key_delete_2adc3_2adc3-vjp43_0 is ready, Consuming the messages... Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-3" Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-2" java.lang.OutOfMemoryError: Java heap space at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:249) at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer$$Lambda$389/0x0000000800bd2840.restoreBatch(Unknown Source) at org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89) at org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:83) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:310) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92) at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-8" java.lang.OutOfMemoryError: Java heap space Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-7" java.lang.OutOfMemoryError: Java heap space at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61) at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348) at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541) at org.apache.kafka.common.network.Selector.poll(Selector.java:467) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1243) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:88) at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-10" java.lang.OutOfMemoryError: Java heap space Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-4" java.lang.OutOfMemoryError: Java heap space Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-9" java.lang.OutOfMemoryError: Java heap space Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-6" java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1069) Caused by: java.lang.OutOfMemoryError: Java heap space Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-5" java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1069) Caused by: java.lang.OutOfMemoryError: Java heap space Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-3" java.lang.OutOfMemoryError: Java heap space Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-1" java.lang.OutOfMemoryError: Java heap space ----- Thanks, Prakash.