Hi No, my application id doesn't change пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl <[email protected]>:
> Hi > Does your applicationId change? > Best regards > Patrik > > > Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <[email protected]>: > > > > Hi everyone! I use kafka-streams, and i have a problem when i use > > windowedBy. Everything works well until I restart the application. After > > restarting my aggregation starts from beginning. > > Code bellow: > >> > >> StreamsBuilder builder = new StreamsBuilder() > >> KStream stream = builder.stream(topic, > Consumed.with(Serdes.String(), Serdes.String())) > >> > >> KTable table = > stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15))) > >> .aggregate( > >> { new AggregatorModel() }, > >> { key, value, aggregate -> > >> return aggregate.add(value) > >> } > >> ) > >> .toStream() > >> .map({ k, v -> > >> new KeyValue<>(k.window().end(), v) > >> }) > >> .to('output') > >> > >> def config = new Properties() > >> config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId) > >> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092') > >> config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > TimeUnit.SECONDS.toMillis(60)) > >> > >> KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config) > >> kafkaStreams.start() > >> > >> > > I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to > > 'latest' and 'earliest' but it didn't help. > > Can you help me understand what I'm doing wrong? > > Thank you. >
