Hi, I am trying to implement simple scenario on streams library of kafka.
I insert data to kafka topic 1 tuple/second. Streams library is connected to particular topic and what it does is: 1. construct 8 second windows with 4 second sliding time, 2. sum values of tuples (price field) and set count field (possibly to find average) 3. output the results So one would expect that I will have more or less aggregation results once per 4 seconds. However, I get the 2 output per second. After analyzing a bit, I can conclude that for each tuple streams library computes aggregation result with each sub-window aggregator (as each tuple can be part of several windows), and directly sends to downstream. It keeps aggregated object in sub-windows so when new object comes, it is updated, to make it non-blocking. However, I see that the result of window is not updated w.r.t. previous values. That is, if I set a count field for aggregated object, I see count=1 for all of them. Here is my code: public static void main(String[] args) throws Exception { if (args == null || args.length != 1){ throw new Exception("commandline argument is needed: path to configuration file"); } String confPath = args[0]; YamlReader reader = new YamlReader(new FileReader(confPath)); Object object = reader.read(); Map conf = (Map)object; ArrayList<String> bootstrapServersArr = (ArrayList<String>) conf.get("kafka.brokers"); Integer kafkaPort = new Integer (conf.get("kafka.port").toString()); String bootstapServers = conf.get("bootstrap.servers").toString(); String bootstapServers = conf.get("zoo.servers").toString(); String kafkaTopic = conf.get("kafka.topic").toString(); String dataGeneratorHost = InetAddress.getLocalHost().getHostName(); Integer dataGeneratorPort = new Integer(conf.get("datasourcesocket.port").toString()); Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-benchmark"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstapServers); props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperServers); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); int slideWindowLength = new Integer(conf.get("slidingwindow.length").toString()); // 8000 ms int slideWindowSlide = new Integer(conf.get("slidingwindow.slide").toString()); // 4000 ms KStreamBuilder builder = new KStreamBuilder(); KStream<Long, String> source = builder.stream(Serdes.Long(),Serdes.String(), kafkaTopic); KStream<Long, Double> tsAppended = source.map(new KeyValueMapper<Long, String, KeyValue<Long, Double>>() { @Override public KeyValue<Long, Double > apply(Long aLong, String s) { JSONObject obj = new JSONObject(s); Double val = obj.getJSONObject("m").getDouble("price"); Long id = obj.getJSONObject("s").getLong("aid1"); return new KeyValue<Long, Double>(id val); } } ); KTable<Windowed<Long>, AvgValue> windowAgg = tsAppended.aggregateByKey( new Initializer<AvgValue>() { @Override public AvgValue apply() { return new AvgValue(0, 0D, 0L); } }, new AvgAggregator<Long, Double, AvgValue>(), TimeWindows.of("timewindow", slideWindowLength).advanceBy(slideWindowSlide), Serdes.Long(), new AvgSerde() ); KTable<Windowed<Long>, AvgValue> windowAggResult = windowAgg.mapValues((v)-> { * System.out.println(v.getSum() + " - " + v.getCount() );; return v;*} ) ; KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); } public static class AvgAggregator<K, V, T> implements Aggregator<Long, Double, AvgValue> { @Override public AvgValue apply(Long id, Double price, AvgValue avg) { avg.setSum(avg.getSum() + price); avg.setCount(avg.getCount() + 1); return avg; } } } I output each tuple where it was created and check with the outputted value in kafka-streams. price - count 29.78169 // this is generator 29.78169 - 1 // this is kafka-streams 29.78169 - 1 // this is kafka-streams 30.767307 // this is generator 30.767307 - 1 // this is kafka-streams 30.767307 - 1 // this is kafka-streams 8.034571 // this is generator 8.034571 - 1 // this is kafka-streams 8.034571 - 1 // this is kafka-streams 79.36154 // this is generator 79.36154 - 1 // this is kafka-streams 79.36154 - 1 // this is kafka-streams 14.394047 // this is generator 14.394047 - 1 // this is kafka-streams 14.394047 - 1 // this is kafka-streams 70.00672 // this is generator 70.00672 - 1 // this is kafka-streams 70.00672 - 1 // this is kafka-streams 58.836906 // this is generator 58.836906 - 1 // this is kafka-streams 58.836906 - 1 // this is kafka-streams 20.771767 // this is generator 20.771767 - 1 // this is kafka-streams 20.771767 - 1 // this is kafka-streams 64.35614 // this is generator 64.35614 - 1 // this is kafka-streams 64.35614 - 1 // this is kafka-streams So the tuples do not get updated aggregate values as they pass through window. Am I doing something wrong, or what can be the reason for this strange behavior? Cheers Adrienne