Hi Bill, We are using hopping window and buffer size as 1GB as like in below code
KTable<Windowed<String>, OurObject> windowedMetricsTimeSeriesStream = source .suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(<1GB>).shutDownWhenFull())); We are getting there exceptions very frequently, and stream processing is getting failed because of these exceptions. Please help us in resolving this issue. The requested log file contents are: Successfully started cd stream listner .... MBR-0x01:Consumer Consumer_metric_data-cd_metric_worker_2adc3_0 is ready, Consuming the messages...MBR-0x01:Consumer Consumer_metric_data-cd_metric_worker_2adc3_1 is ready, Consuming the messages...MBR-0x01:Consumer Consumer_cache-key-delete-queue-cache_key_delete_2adc3_2adc3-vjp43_0 is ready, Consuming the messages... Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-3" Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-2" java.lang.OutOfMemoryError: Java heap space at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:249) at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer$$Lambda$389/0x0000000800bd2840.restoreBatch(Unknown Source) at org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89) at org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:83) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:310) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92) at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-8" java.lang.OutOfMemoryError: Java heap space Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-7" java.lang.OutOfMemoryError: Java heap space at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61) at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348) at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541) at org.apache.kafka.common.network.Selector.poll(Selector.java:467) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1243) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:88) at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-10" java.lang.OutOfMemoryError: Java heap space Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-4" java.lang.OutOfMemoryError: Java heap space Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-9" java.lang.OutOfMemoryError: Java heap space Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-6" java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1069) Caused by: java.lang.OutOfMemoryError: Java heap space Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-5" java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1069) Caused by: java.lang.OutOfMemoryError: Java heap space Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-3" java.lang.OutOfMemoryError: Java heap space Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-1" java.lang.OutOfMemoryError: Java heap space ----- Thanks, Prakash. On 2019/07/11 20:25:32, Bill Bejeck <b...@confluent.io> wrote: > Thanks for reporting this Kalyani, we'll take a look. > By chance can provide log files? > > Thanks, > Bill > > On Mon, Jul 8, 2019 at 7:43 AM kalyani yarlagadda < > kalyani.yarlagad...@gmail.com> wrote: > > > Hi, > > > > I need assistance in the below scenario. Please help me with this. > > > > I am using the hopping time window in Kafka streams with *suppress*() I am > > seeing the following memory Errors. > > > > *1. Facing the memory issue when the Kafka application is running > > continuously* for 2 to 3 days of deployment without any restart on the > > machine > > > > > > > > > > > > > > > > > > > > > > > > > > *Exception in thread > > > > "change_detection_stream-08bd427d-36fd-467a-8923-4f7bb67aa949-StreamThread-2" > > java.lang.OutOfMemoryError: Java heap space at > > > > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:249) > > at > > > > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer$$Lambda$385/0x0000000800bc7440.restoreBatch(Unknown > > Source) at > > > > org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89) > > at > > > > org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:83) > > at > > > > org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:310) > > at > > > > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92) > > at > > > > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321) > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839) > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777) > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)* > > > > > > we are having the following Specifications in the machine: > > RAM: 16GB > > > > *2. /tmp Folder is filled with more memory also.* > > > > > > *Kafka Version:* *2.1.0* > > > > *I am adding the POC code below* > > > > > > > > > > > > > > > > > > > > *// define the time window as a hopping time windowTimeWindows timeWindow = > > > > TimeWindows.of(Duration.ofHours(4)).advanceBy(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1));KTable<Windowed<String>, > > MetricsTimeSeries> windowedMetricsTimeSeriesStream = > > builder.stream("metrics_ip", Consumed.with(Serdes.String(), new > > JSONSerde<>())).groupByKey().windowedBy(timeWindow).aggregate(() -> new > > MetricsTimeSeries(), /* initializer */ * //*MetricsTimeSeries* is the > > aggregator class > > > > > > > > > > > > > > > > > > > > > > *(aggKey, newValue, aggValue) -> {aggValue.addDataPoint(newValue);return > > aggValue;}, /* adder > > */Materialized.as("windowed_aggregated_metric_store_poc_partitions")) /* > > state store name > > > > */.suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(<1GB>).shutDownWhenFull()));windowedMetricsTimeSeriesStream.toStream().map((key, > > value) -> //mapping logic goes here ).foreach(//logic to validate and > > save);* > > > > *Properties set to Kafka Streams:* > > > > > > > > > > > > *StreamsConfig.APPLICATION_ID_CONFIG - > > > > "streams_changedetection_poc_partitions"StreamsConfig.BOOTSTRAP_SERVERS_CONFIG > > - "kafka:9092"StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG - > > > > Serdes.String().getClass()StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS > > - Serdes.String().getClass()StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG > > - JSONSerde.class StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS - > > JSONSerde.class* > > > > *StreamsConfig.NUM_STREAM_THREADS_CONFIG - 2* > > > > > > *StreamsConfig.PROCESSING_GUARANTEE_CONFIG - > > StreamsConfig.EXACTLY_ONCEStreamsConfig.COMMIT_INTERVAL_MS_CONFIG - 1000ms* > > > > > > > > Thanks in Advance. > > > > Kalyani Y, > > 9177982636 > > >