Hello Adrienne,

Are you running the Streams library of 0.10.0.0 version? If yes could you
try it again with current trunk of Kafka?

Guozhang


On Thu, Sep 8, 2016 at 8:19 AM, Adrienne Kole <adrienneko...@gmail.com>
wrote:

> Hi,
>
> I am trying to implement simple scenario on streams library of kafka.
>
> I insert data to kafka topic 1 tuple/second.
> Streams library is connected to particular topic and what it does is:
>             1. construct 8 second windows with 4 second sliding time,
>             2. sum values of tuples (price field) and set count field
> (possibly to find average)
>             3. output the results
>
> So one would expect that I will have more or less aggregation results once
> per 4 seconds.
> However, I get the 2 output per second.
>
> After analyzing a bit, I can conclude that for each tuple streams library
> computes aggregation result  with each sub-window aggregator (as each tuple
> can be part of several windows), and directly sends to downstream. It keeps
> aggregated object in sub-windows so when new object comes, it is updated,
> to make it non-blocking.
>
> However, I see that the result of window is not updated w.r.t. previous
> values. That is, if I set a count field for aggregated object, I see
> count=1 for all of them.
>
> Here is my code:
>
>
>
>         public static void main(String[] args) throws Exception {
>
>             if (args == null || args.length != 1){
>                 throw new Exception("commandline argument is needed: path
> to configuration file");
>             }
>             String confPath = args[0];
>             YamlReader reader = new YamlReader(new FileReader(confPath));
>             Object object = reader.read();
>             Map conf = (Map)object;
>
>             ArrayList<String> bootstrapServersArr = (ArrayList<String>)
> conf.get("kafka.brokers");
>             Integer kafkaPort = new Integer
> (conf.get("kafka.port").toString());
>             String bootstapServers  =
> conf.get("bootstrap.servers").toString();
>             String bootstapServers  = conf.get("zoo.servers").toString();
>
>             String kafkaTopic = conf.get("kafka.topic").toString();
>             String dataGeneratorHost =
> InetAddress.getLocalHost().getHostName();
>             Integer dataGeneratorPort = new
> Integer(conf.get("datasourcesocket.port").toString());
>
>
>             Properties props = new Properties();
>             props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "kafka-benchmark");
>             props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> bootstapServers);
>             props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> zookeeperServers);
>             props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.Long().getClass().getName());
>             props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
>             props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> "earliest");
>
>
>             int slideWindowLength = new
> Integer(conf.get("slidingwindow.length").toString());        // 8000 ms
>             int slideWindowSlide = new
> Integer(conf.get("slidingwindow.slide").toString());            // 4000 ms
>
>             KStreamBuilder builder = new KStreamBuilder();
>             KStream<Long, String> source =
> builder.stream(Serdes.Long(),Serdes.String(), kafkaTopic);
>
>             KStream<Long, Double> tsAppended = source.map(new
> KeyValueMapper<Long, String, KeyValue<Long, Double>>() {
>                 @Override
>                 public KeyValue<Long, Double > apply(Long aLong, String s)
> {
>                     JSONObject obj = new JSONObject(s);
>                     Double val =  obj.getJSONObject("m").
> getDouble("price");
>                     Long id = obj.getJSONObject("s").getLong("aid1");
>
>                     return new KeyValue<Long, Double>(id val);
>                 }
>             } );
>
>             KTable<Windowed<Long>, AvgValue> windowAgg =
> tsAppended.aggregateByKey(
>                     new Initializer<AvgValue>() {
>                         @Override
>                         public AvgValue apply() {
>                             return new AvgValue(0, 0D, 0L);
>                         }
>                     },
>                     new AvgAggregator<Long, Double, AvgValue>(),
>                     TimeWindows.of("timewindow",
> slideWindowLength).advanceBy(slideWindowSlide),
>                     Serdes.Long(), new AvgSerde()
>             );
>
>             KTable<Windowed<Long>, AvgValue> windowAggResult  =
> windowAgg.mapValues((v)-> {
>                * System.out.println(v.getSum() + " - " + v.getCount() );;
> return  v;*}   )   ;
>
>
>             KafkaStreams streams = new KafkaStreams(builder, props);
>             streams.start();
>         }
>
>
>     public static class AvgAggregator<K, V, T>  implements Aggregator<Long,
> Double, AvgValue> {
>         @Override
>         public AvgValue apply(Long id, Double price, AvgValue avg) {
>             avg.setSum(avg.getSum() + price);
>             avg.setCount(avg.getCount() + 1);
>             return avg;
>         }
>     }
>
> }
>
>
>
> I output each tuple where it was created and check with the outputted value
> in kafka-streams.
>
> price - count
>
> 29.78169        //  this is generator
> 29.78169 - 1   // this is kafka-streams
> 29.78169 - 1    // this is kafka-streams
>  30.767307        //  this is generator
> 30.767307 - 1   // this is kafka-streams
> 30.767307 - 1   // this is kafka-streams
>  8.034571        //  this is generator
> 8.034571 - 1   // this is kafka-streams
> 8.034571 - 1   // this is kafka-streams
>  79.36154        //  this is generator
> 79.36154 - 1   // this is kafka-streams
> 79.36154 - 1   // this is kafka-streams
>  14.394047        //  this is generator
> 14.394047 - 1   // this is kafka-streams
> 14.394047 - 1   // this is kafka-streams
>  70.00672        //  this is generator
> 70.00672 - 1   // this is kafka-streams
> 70.00672 - 1   // this is kafka-streams
>  58.836906        //  this is generator
> 58.836906 - 1   // this is kafka-streams
> 58.836906 - 1   // this is kafka-streams
>  20.771767         //  this is generator
> 20.771767 - 1   // this is kafka-streams
> 20.771767 - 1   // this is kafka-streams
>  64.35614         //  this is generator
> 64.35614 - 1   // this is kafka-streams
> 64.35614 - 1   // this is kafka-streams
>
>
>
> So the tuples do not get updated aggregate values as they pass through
> window.
>
> Am I doing something wrong, or what can be the reason for this strange
> behavior?
>
>
> Cheers
> Adrienne
>



-- 
-- Guozhang

Reply via email to