Hi,

I am using the hopping time window in Kafka streams with suppress option. I am 
facing memory issues when the Kafka application is running continuously for 2 
to 3 days of deployment without any restart on the machine.

We are using hopping window and buffer size as 1GB.

We are getting there exceptions very frequently, and stream processing is 
getting failed because of these exceptions. Please help us in resolving this 
issue.

Kafka Version: 2.1.0

I am adding the POC code below

KTable<Windowed<String>, OurObject> cdstream = source
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofHours(4)).advanceBy(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1)))
.aggregate(() -> new OurObject(),  //initializer
(aggKey, newValue, aggValue) -> {
aggValue.addNew(newValue);
return aggValue;
},  //adder
Materialized.as("windowed_aggregated_store"))  //state store name
.suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(1024 * 1024 * 
1024).shutDownWhenFull()));


The log file contents are:


Successfully started cd stream listner ....
MBR-0x01:Consumer Consumer_metric_data-cd_metric_worker_2adc3_0 is ready, 
Consuming the messages...MBR-0x01:Consumer 
Consumer_metric_data-cd_metric_worker_2adc3_1 is ready, Consuming the 
messages...MBR-0x01:Consumer 
Consumer_cache-key-delete-queue-cache_key_delete_2adc3_2adc3-vjp43_0 is ready, 
Consuming the messages...
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler 
in thread "kafka-producer-network-thread | producer-3"
Exception in thread 
"cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-2" 
java.lang.OutOfMemoryError: Java heap space
at 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:249)
at 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer$$Lambda$389/0x0000000800bd2840.restoreBatch(Unknown
 Source)
at 
org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
at 
org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:83)
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:310)
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92)
at 
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Exception in thread 
"cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-8" 
java.lang.OutOfMemoryError: Java heap space
Exception in thread 
"cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-7" 
java.lang.OutOfMemoryError: Java heap space
at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61)
at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348)
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541)
at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1243)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:88)
at 
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Exception in thread 
"cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-10" 
java.lang.OutOfMemoryError: Java heap space
Exception in thread 
"cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-4" 
java.lang.OutOfMemoryError: Java heap space
Exception in thread 
"cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-9" 
java.lang.OutOfMemoryError: Java heap space
Exception in thread 
"cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-6" 
java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1069)
Caused by: java.lang.OutOfMemoryError: Java heap space
Exception in thread 
"cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-5" 
java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1069)
Caused by: java.lang.OutOfMemoryError: Java heap space
Exception in thread 
"cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-3" 
java.lang.OutOfMemoryError: Java heap space
Exception in thread 
"cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-1" 
java.lang.OutOfMemoryError: Java heap space
-----

Thanks,
Prakash.

Reply via email to