Hi: I used TimeWindow for aggregate data in kafka. this is code snippet ;
view.flatMap(new MultipleKeyValueMapper(client)).groupByKey(Serialized.with(Serdes.String(), Serdes.serdeFrom(new CountInfoSerializer(), new CountInfoDeserializer()))) .windowedBy(TimeWindows.of(60000)).reduce(new Reducer<CountInfo>() { @Override public CountInfo apply(CountInfo value1, CountInfo value2) { return new CountInfo(value1.start + value2.start, value1.active + value2.active, value1.fresh + value2.fresh); } }) .toStream(new KeyValueMapper<Windowed<String>, CountInfo, String>() { @Override public String apply(Windowed<String> key, CountInfo value) { return key.key(); } }).print(Printed.toSysOut()); KafkaStreams streams = new KafkaStreams(builder.build(), KStreamReducer.getConf()); streams.start(); and I test 30000 data in kafka . and I print key value . [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_21@1520601300000/1520601360000], CountInfo{start=12179, active=12179, fresh=12179} [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@1520601300000/1520601360000], CountInfo{start=12179, active=12179, fresh=12179} [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_21@1520601300000/1520601360000], CountInfo{start=30000, active=30000, fresh=30000} [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@1520601300000/1520601360000], CountInfo{start=30000, active=30000, fresh=30000} why in one window duration will be print two result but not one result ? ________________________________ funk...@live.com