Hi Does your applicationId change? Best regards Patrik
> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <afgmeis...@gmail.com>: > > Hi everyone! I use kafka-streams, and i have a problem when i use > windowedBy. Everything works well until I restart the application. After > restarting my aggregation starts from beginning. > Code bellow: >> >> StreamsBuilder builder = new StreamsBuilder() >> KStream stream = builder.stream(topic, Consumed.with(Serdes.String(), >> Serdes.String())) >> >> KTable table = >> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15))) >> .aggregate( >> { new AggregatorModel() }, >> { key, value, aggregate -> >> return aggregate.add(value) >> } >> ) >> .toStream() >> .map({ k, v -> >> new KeyValue<>(k.window().end(), v) >> }) >> .to('output') >> >> def config = new Properties() >> config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId) >> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092') >> config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, >> TimeUnit.SECONDS.toMillis(60)) >> >> KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config) >> kafkaStreams.start() >> >> > I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to > 'latest' and 'earliest' but it didn't help. > Can you help me understand what I'm doing wrong? > Thank you.