Hi, I need assistance in the below scenario. Please help me with this.
I am using the hopping time window in Kafka streams. I am facing an issue on the restart of my Kafka application, the application is processing the data from the beginning offset. However, it is happening only when the topic has more than one partition. If the topic has only 1 partition then on the restart of the application the sliding window is working fine. *Kafka Version:* *2.1.0* *Eg:* Time Window is 4 hours advanceBy 5 minutes Application is started at time A and running for every 5minutes with the stream data of 4hours and now stopped at a timestamp say X, and restarted at timestamp Y *The behavior for a single partition*: After the restart, the streams are processed from time X to time Y for every 5minutes. *The behavior for a more than one partition: *After the restart, the streams are processed from time A to time Y for every 5minutes. *I am adding the POC code below* *// define the time window as a hopping time window TimeWindows timeWindow = TimeWindows.of(Duration.ofHours(4)) .advanceBy(Duration.ofMinutes(5)) .grace(Duration.ofMinutes(1)); KTable<Windowed<String>, MetricsTimeSeries> windowedMetricsTimeSeriesStream = builder.stream("metrics_ip", Consumed.with(Serdes.String(), new JSONSerde<>())) .groupByKey() .windowedBy(timeWindow) .aggregate(() -> new MetricsTimeSeries(), /* initializer */ * //*MetricsTimeSeries* is the aggregator class * (aggKey, newValue, aggValue) -> { aggValue.addDataPoint(newValue); return aggValue; }, /* adder */ Materialized.as("windowed_aggregated_metric_store_poc_partitions")) /* state store name */ .suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(10*1024*1024).shutDownWhenFull())); windowedMetricsTimeSeriesStream .toStream() .map((key, value) -> //mapping logic goes here ) .to("metrics_op");* *Properties set to Kafka Streams:* *StreamsConfig.APPLICATION_ID_CONFIG - "streams_changedetection_poc_partitions"StreamsConfig.BOOTSTRAP_SERVERS_CONFIG - "kafka:9092"StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG - Serdes.String().getClass()StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS - Serdes.String().getClass()StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG - JSONSerde.class StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS - JSONSerde.classConsumerConfig.AUTO_OFFSET_RESET_CONFIG - "latest"* Thanks in Advance. Kalyani Y, 9177982636