Thanks for reporting this Kalyani, we'll take a look. By chance can provide log files?
Thanks, Bill On Mon, Jul 8, 2019 at 7:43 AM kalyani yarlagadda < kalyani.yarlagad...@gmail.com> wrote: > Hi, > > I need assistance in the below scenario. Please help me with this. > > I am using the hopping time window in Kafka streams with *suppress*() I am > seeing the following memory Errors. > > *1. Facing the memory issue when the Kafka application is running > continuously* for 2 to 3 days of deployment without any restart on the > machine > > > > > > > > > > > > > *Exception in thread > > "change_detection_stream-08bd427d-36fd-467a-8923-4f7bb67aa949-StreamThread-2" > java.lang.OutOfMemoryError: Java heap space at > > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:249) > at > > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer$$Lambda$385/0x0000000800bc7440.restoreBatch(Unknown > Source) at > > org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89) > at > > org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:83) > at > > org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:310) > at > > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92) > at > > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321) > at > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839) > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777) > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)* > > > we are having the following Specifications in the machine: > RAM: 16GB > > *2. /tmp Folder is filled with more memory also.* > > > *Kafka Version:* *2.1.0* > > *I am adding the POC code below* > > > > > > > > > > *// define the time window as a hopping time windowTimeWindows timeWindow = > > TimeWindows.of(Duration.ofHours(4)).advanceBy(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1));KTable<Windowed<String>, > MetricsTimeSeries> windowedMetricsTimeSeriesStream = > builder.stream("metrics_ip", Consumed.with(Serdes.String(), new > JSONSerde<>())).groupByKey().windowedBy(timeWindow).aggregate(() -> new > MetricsTimeSeries(), /* initializer */ * //*MetricsTimeSeries* is the > aggregator class > > > > > > > > > > > *(aggKey, newValue, aggValue) -> {aggValue.addDataPoint(newValue);return > aggValue;}, /* adder > */Materialized.as("windowed_aggregated_metric_store_poc_partitions")) /* > state store name > > */.suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(<1GB>).shutDownWhenFull()));windowedMetricsTimeSeriesStream.toStream().map((key, > value) -> //mapping logic goes here ).foreach(//logic to validate and > save);* > > *Properties set to Kafka Streams:* > > > > > > *StreamsConfig.APPLICATION_ID_CONFIG - > > "streams_changedetection_poc_partitions"StreamsConfig.BOOTSTRAP_SERVERS_CONFIG > - "kafka:9092"StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG - > > Serdes.String().getClass()StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS > - Serdes.String().getClass()StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG > - JSONSerde.class StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS - > JSONSerde.class* > > *StreamsConfig.NUM_STREAM_THREADS_CONFIG - 2* > > > *StreamsConfig.PROCESSING_GUARANTEE_CONFIG - > StreamsConfig.EXACTLY_ONCEStreamsConfig.COMMIT_INTERVAL_MS_CONFIG - 1000ms* > > > > Thanks in Advance. > > Kalyani Y, > 9177982636 >