Hi,

I need assistance in the below scenario. Please help me with this.

I am using the hopping time window in Kafka streams. I am facing an issue
on the restart of my Kafka application, the application is processing the
data from the beginning offset.
However, it is happening only when the topic has more than one partition.
If the topic has only 1 partition then on the restart of the application
the sliding window is working fine.

*Kafka Version:* *2.1.0*

*Eg:*

Time Window is 4 hours
advanceBy 5 minutes
Application is started at time A and running for every 5minutes with the
stream data of 4hours
and now stopped at a timestamp say X, and restarted at timestamp Y

*The behavior for a single partition*: After the restart, the streams are
processed from time X to time Y for every 5minutes.
*The behavior for a more than one partition: *After the restart, the
streams are processed from time A to time Y for every 5minutes.

*I am adding the POC code below*









*// define the time window as a hopping time window TimeWindows timeWindow
= TimeWindows.of(Duration.ofHours(4)) .advanceBy(Duration.ofMinutes(5))
.grace(Duration.ofMinutes(1)); KTable<Windowed<String>, MetricsTimeSeries>
windowedMetricsTimeSeriesStream = builder.stream("metrics_ip",
Consumed.with(Serdes.String(), new JSONSerde<>())) .groupByKey()
.windowedBy(timeWindow) .aggregate(() -> new MetricsTimeSeries(), /*
initializer */ * //*MetricsTimeSeries*  is the aggregator class










* (aggKey, newValue, aggValue) -> { aggValue.addDataPoint(newValue); return
aggValue; }, /* adder */
Materialized.as("windowed_aggregated_metric_store_poc_partitions")) /*
state store name */
.suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(10*1024*1024).shutDownWhenFull()));
windowedMetricsTimeSeriesStream .toStream() .map((key, value) -> //mapping
logic goes here ) .to("metrics_op");*

*Properties set to Kafka Streams:*






*StreamsConfig.APPLICATION_ID_CONFIG -
"streams_changedetection_poc_partitions"StreamsConfig.BOOTSTRAP_SERVERS_CONFIG
- "kafka:9092"StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG -
Serdes.String().getClass()StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
- Serdes.String().getClass()StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG
- JSONSerde.class StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS -
JSONSerde.classConsumerConfig.AUTO_OFFSET_RESET_CONFIG - "latest"*





Thanks in Advance.

Kalyani Y,
9177982636

Reply via email to