Thanks for reporting this Kalyani, we'll take a look.
By chance can provide log files?

Thanks,
Bill

On Mon, Jul 8, 2019 at 7:43 AM kalyani yarlagadda <
kalyani.yarlagad...@gmail.com> wrote:

> Hi,
>
> I need assistance in the below scenario. Please help me with this.
>
> I am using the hopping time window in Kafka streams with *suppress*() I am
> seeing the following memory Errors.
>
> *1. Facing the memory issue when the Kafka application is running
> continuously* for 2 to 3 days of deployment without any restart on the
> machine
>
>
>
>
>
>
>
>
>
>
>
>
> *Exception in thread
>
> "change_detection_stream-08bd427d-36fd-467a-8923-4f7bb67aa949-StreamThread-2"
> java.lang.OutOfMemoryError: Java heap space        at
>
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:249)
>       at
>
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer$$Lambda$385/0x0000000800bc7440.restoreBatch(Unknown
> Source)        at
>
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
>       at
>
> org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:83)
>       at
>
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:310)
>       at
>
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92)
>       at
>
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
>       at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
>       at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
>       at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)*
>
>
> we are having the following Specifications in the machine:
> RAM: 16GB
>
> *2.   /tmp Folder is filled with more memory also.*
>
>
> *Kafka Version:* *2.1.0*
>
> *I am adding the POC code below*
>
>
>
>
>
>
>
>
>
> *// define the time window as a hopping time windowTimeWindows timeWindow =
>
> TimeWindows.of(Duration.ofHours(4)).advanceBy(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1));KTable<Windowed<String>,
> MetricsTimeSeries> windowedMetricsTimeSeriesStream =
> builder.stream("metrics_ip", Consumed.with(Serdes.String(), new
> JSONSerde<>())).groupByKey().windowedBy(timeWindow).aggregate(() -> new
> MetricsTimeSeries(), /* initializer */ * //*MetricsTimeSeries*  is the
> aggregator class
>
>
>
>
>
>
>
>
>
>
> *(aggKey, newValue, aggValue) -> {aggValue.addDataPoint(newValue);return
> aggValue;}, /* adder
> */Materialized.as("windowed_aggregated_metric_store_poc_partitions")) /*
> state store name
>
> */.suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(<1GB>).shutDownWhenFull()));windowedMetricsTimeSeriesStream.toStream().map((key,
> value) -> //mapping logic goes here ).foreach(//logic to validate and
> save);*
>
> *Properties set to Kafka Streams:*
>
>
>
>
>
> *StreamsConfig.APPLICATION_ID_CONFIG -
>
> "streams_changedetection_poc_partitions"StreamsConfig.BOOTSTRAP_SERVERS_CONFIG
> - "kafka:9092"StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG -
>
> Serdes.String().getClass()StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
> - Serdes.String().getClass()StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG
> - JSONSerde.class StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS -
> JSONSerde.class*
>
> *StreamsConfig.NUM_STREAM_THREADS_CONFIG -  2*
>
>
> *StreamsConfig.PROCESSING_GUARANTEE_CONFIG -
> StreamsConfig.EXACTLY_ONCEStreamsConfig.COMMIT_INTERVAL_MS_CONFIG - 1000ms*
>
>
>
> Thanks in Advance.
>
> Kalyani Y,
> 9177982636
>

Reply via email to