Hi everyone! I use kafka-streams, and i have a problem when i use
windowedBy. Everything works well until I restart the application. After
restarting my aggregation starts from beginning.
Code bellow:
>
>     StreamsBuilder builder = new StreamsBuilder()
>     KStream stream = builder.stream(topic, Consumed.with(Serdes.String(), 
> Serdes.String()))
>
>     KTable table = 
> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
>             .aggregate(
>             { new AggregatorModel() },
>             { key, value, aggregate ->
>                 return aggregate.add(value)
>             }
>     )
>             .toStream()
>             .map({ k, v ->
>         new KeyValue<>(k.window().end(), v)
>     })
>             .to('output')
>
>     def config = new Properties()
>     config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
>     config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092')
>     config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
> TimeUnit.SECONDS.toMillis(60))
>
>     KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config)
>     kafkaStreams.start()
>
>
I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to
'latest' and 'earliest' but it didn't help.
Can you help me understand what I'm doing wrong?
Thank you.

Reply via email to