Hello Adrienne, Are you running the Streams library of 0.10.0.0 version? If yes could you try it again with current trunk of Kafka?
Guozhang On Thu, Sep 8, 2016 at 8:19 AM, Adrienne Kole <adrienneko...@gmail.com> wrote: > Hi, > > I am trying to implement simple scenario on streams library of kafka. > > I insert data to kafka topic 1 tuple/second. > Streams library is connected to particular topic and what it does is: > 1. construct 8 second windows with 4 second sliding time, > 2. sum values of tuples (price field) and set count field > (possibly to find average) > 3. output the results > > So one would expect that I will have more or less aggregation results once > per 4 seconds. > However, I get the 2 output per second. > > After analyzing a bit, I can conclude that for each tuple streams library > computes aggregation result with each sub-window aggregator (as each tuple > can be part of several windows), and directly sends to downstream. It keeps > aggregated object in sub-windows so when new object comes, it is updated, > to make it non-blocking. > > However, I see that the result of window is not updated w.r.t. previous > values. That is, if I set a count field for aggregated object, I see > count=1 for all of them. > > Here is my code: > > > > public static void main(String[] args) throws Exception { > > if (args == null || args.length != 1){ > throw new Exception("commandline argument is needed: path > to configuration file"); > } > String confPath = args[0]; > YamlReader reader = new YamlReader(new FileReader(confPath)); > Object object = reader.read(); > Map conf = (Map)object; > > ArrayList<String> bootstrapServersArr = (ArrayList<String>) > conf.get("kafka.brokers"); > Integer kafkaPort = new Integer > (conf.get("kafka.port").toString()); > String bootstapServers = > conf.get("bootstrap.servers").toString(); > String bootstapServers = conf.get("zoo.servers").toString(); > > String kafkaTopic = conf.get("kafka.topic").toString(); > String dataGeneratorHost = > InetAddress.getLocalHost().getHostName(); > Integer dataGeneratorPort = new > Integer(conf.get("datasourcesocket.port").toString()); > > > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, > "kafka-benchmark"); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > bootstapServers); > props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > zookeeperServers); > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > Serdes.Long().getClass().getName()); > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > "earliest"); > > > int slideWindowLength = new > Integer(conf.get("slidingwindow.length").toString()); // 8000 ms > int slideWindowSlide = new > Integer(conf.get("slidingwindow.slide").toString()); // 4000 ms > > KStreamBuilder builder = new KStreamBuilder(); > KStream<Long, String> source = > builder.stream(Serdes.Long(),Serdes.String(), kafkaTopic); > > KStream<Long, Double> tsAppended = source.map(new > KeyValueMapper<Long, String, KeyValue<Long, Double>>() { > @Override > public KeyValue<Long, Double > apply(Long aLong, String s) > { > JSONObject obj = new JSONObject(s); > Double val = obj.getJSONObject("m"). > getDouble("price"); > Long id = obj.getJSONObject("s").getLong("aid1"); > > return new KeyValue<Long, Double>(id val); > } > } ); > > KTable<Windowed<Long>, AvgValue> windowAgg = > tsAppended.aggregateByKey( > new Initializer<AvgValue>() { > @Override > public AvgValue apply() { > return new AvgValue(0, 0D, 0L); > } > }, > new AvgAggregator<Long, Double, AvgValue>(), > TimeWindows.of("timewindow", > slideWindowLength).advanceBy(slideWindowSlide), > Serdes.Long(), new AvgSerde() > ); > > KTable<Windowed<Long>, AvgValue> windowAggResult = > windowAgg.mapValues((v)-> { > * System.out.println(v.getSum() + " - " + v.getCount() );; > return v;*} ) ; > > > KafkaStreams streams = new KafkaStreams(builder, props); > streams.start(); > } > > > public static class AvgAggregator<K, V, T> implements Aggregator<Long, > Double, AvgValue> { > @Override > public AvgValue apply(Long id, Double price, AvgValue avg) { > avg.setSum(avg.getSum() + price); > avg.setCount(avg.getCount() + 1); > return avg; > } > } > > } > > > > I output each tuple where it was created and check with the outputted value > in kafka-streams. > > price - count > > 29.78169 // this is generator > 29.78169 - 1 // this is kafka-streams > 29.78169 - 1 // this is kafka-streams > 30.767307 // this is generator > 30.767307 - 1 // this is kafka-streams > 30.767307 - 1 // this is kafka-streams > 8.034571 // this is generator > 8.034571 - 1 // this is kafka-streams > 8.034571 - 1 // this is kafka-streams > 79.36154 // this is generator > 79.36154 - 1 // this is kafka-streams > 79.36154 - 1 // this is kafka-streams > 14.394047 // this is generator > 14.394047 - 1 // this is kafka-streams > 14.394047 - 1 // this is kafka-streams > 70.00672 // this is generator > 70.00672 - 1 // this is kafka-streams > 70.00672 - 1 // this is kafka-streams > 58.836906 // this is generator > 58.836906 - 1 // this is kafka-streams > 58.836906 - 1 // this is kafka-streams > 20.771767 // this is generator > 20.771767 - 1 // this is kafka-streams > 20.771767 - 1 // this is kafka-streams > 64.35614 // this is generator > 64.35614 - 1 // this is kafka-streams > 64.35614 - 1 // this is kafka-streams > > > > So the tuples do not get updated aggregate values as they pass through > window. > > Am I doing something wrong, or what can be the reason for this strange > behavior? > > > Cheers > Adrienne > -- -- Guozhang