Hi everyone! I use kafka-streams, and i have a problem when i use
windowedBy. Everything works well until I restart the application. After
restarting my aggregation starts from beginning.
Code bellow:
>
> StreamsBuilder builder = new StreamsBuilder()
> KStream stream = builder.stream(topic, Consumed.with(Serdes.String(),
> Serdes.String()))
>
> KTable table =
> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
> .aggregate(
> { new AggregatorModel() },
> { key, value, aggregate ->
> return aggregate.add(value)
> }
> )
> .toStream()
> .map({ k, v ->
> new KeyValue<>(k.window().end(), v)
> })
> .to('output')
>
> def config = new Properties()
> config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092')
> config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> TimeUnit.SECONDS.toMillis(60))
>
> KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config)
> kafkaStreams.start()
>
>
I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to
'latest' and 'earliest' but it didn't help.
Can you help me understand what I'm doing wrong?
Thank you.