Hi:
I used TimeWindow for aggregate data in kafka.

this is code snippet ;

  view.flatMap(new 
MultipleKeyValueMapper(client)).groupByKey(Serialized.with(Serdes.String(),
                Serdes.serdeFrom(new CountInfoSerializer(), new 
CountInfoDeserializer())))
        .windowedBy(TimeWindows.of(60000)).reduce(new Reducer<CountInfo>() {
            @Override
            public CountInfo apply(CountInfo value1, CountInfo value2) {
                return new CountInfo(value1.start + value2.start, value1.active 
+ value2.active, value1.fresh + value2.fresh);
            }
        }) .toStream(new KeyValueMapper<Windowed<String>, CountInfo, String>() {
            @Override
            public String apply(Windowed<String> key, CountInfo value) {
                return key.key();
            }
        }).print(Printed.toSysOut());

        KafkaStreams streams = new KafkaStreams(builder.build(), 
KStreamReducer.getConf());
        streams.start();

and I test 30000 data in kafka .
and I print key value .


[KTABLE-TOSTREAM-0000000007]: 
[99999_99999_2018-03-09_hour_21@1520601300000/1520601360000], 
CountInfo{start=12179, active=12179, fresh=12179}
[KTABLE-TOSTREAM-0000000007]: 
[99999_99999_2018-03-09@1520601300000/1520601360000], CountInfo{start=12179, 
active=12179, fresh=12179}
[KTABLE-TOSTREAM-0000000007]: 
[99999_99999_2018-03-09_hour_21@1520601300000/1520601360000], 
CountInfo{start=30000, active=30000, fresh=30000}
[KTABLE-TOSTREAM-0000000007]: 
[99999_99999_2018-03-09@1520601300000/1520601360000], CountInfo{start=30000, 
active=30000, fresh=30000}
why in one window duration will be print two result but not one result ?

________________________________
funk...@live.com

Reply via email to