I could successfully get the total (not average). As far as I see, there is no need to create a topic manually before I run the app. Topic is created if there is data and topic name not exists. Here is my code:
KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> longs = builder.stream(Serdes.String(), Serdes.String(), "mytopic"); KTable<Windowed<String>, Long> longCounts = longs.countByKey(TimeWindows.of("output-topic", 3600 * 1000), Serdes.String()); KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); On Wed, Oct 19, 2016 at 1:58 AM, Matthias J. Sax <matth...@confluent.io> wrote: > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA512 > > You should create input/intermediate and output topic manually before > you start you Kafka Streams application. > > > - -Matthias > > On 10/18/16 3:34 PM, Furkan KAMACI wrote: > > Sorry about concurrent questions. Tried below code, didn't get any > > error but couldn't get created output topic: > > > > Properties props = new Properties(); props.put("bootstrap.servers", > > "localhost:9092"); props.put("acks", "all"); props.put("retries", > > 0); props.put("batch.size", 16384); props.put("linger.ms", 1); > > props.put("buffer.memory", 33554432); props.put("key.serializer", > > "org.apache.kafka.common.serialization.StringSerializer"); > > props.put("value.serializer", > > "org.apache.kafka.common.serialization.StringSerializer"); > > > > Producer<String, String> producer = new KafkaProducer<>(props); > > > > for (int i = 0; i < 1000; i++) { producer.send(new > > ProducerRecord<>( "input-topic", String.format("{\"type\":\"test\", > > \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i))); > > > > > > final KStreamBuilder builder = new KStreamBuilder(); final > > KStream<String, Long> qps = builder.stream(Serdes.String(), > > Serdes.Long(), "input-topic"); > > qps.countByKey(TimeWindows.of("Hourly", 3600 * > > 1000)).mapValues(Object::toString).to("output-topic"); > > > > final KafkaStreams streams = new KafkaStreams(builder, > > streamsConfiguration); streams.start(); > > > > Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); > > > > On Wed, Oct 19, 2016 at 12:14 AM, Matthias J. Sax > > <matth...@confluent.io> wrote: > > > > Two things: > > > > 1) you should not apply the window to the first count, but to the > > base stream to get correct results. > > > > 2) your windowed aggregation, doew not just return String type, > > but Window<K> type. Thus, you need to either insert a .map() to > > transform you data into String typo, or you provide a custom > > serializer when writing data to output topic (method, .to(...) has > > multiple overloads) > > > > Per default, each topic read/write operation uses Serdes from the > > streams config. If you data has a different type, you need to > > provide appropriate Serdes for those operators. > > > > > > -Matthias > > > > On 10/18/16 2:01 PM, Furkan KAMACI wrote: > >>>> Hi Matthias, > >>>> > >>>> I've tried this code: > >>>> > >>>> * final Properties streamsConfiguration = new > >>>> Properties();* * > >>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, > >>>> > >>>> > "myapp");* * > >>>> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > >>>> > >>>> > "localhost:9092");* * > >>>> streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > >>>> > >>>> > "localhost:2181");* * > >>>> streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > >>>> > >>>> > Serdes.String().getClass().getName());* * > >>>> streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > >>>> > >>>> > Serdes.String().getClass().getName());* * final > >>>> KStreamBuilder builder = new KStreamBuilder();* * > >>>> final KStream input = builder.stream("myapp-test");* > >>>> > >>>> * final KStream<String, Long> searchCounts = > >>>> input.countByKey("SearchRequests").toStream();* * > >>>> searchCounts.countByKey(TimeWindows.of("Hourly", 3600 * > >>>> 1000)).to("outputTopicHourlyCounts");* > >>>> > >>>> * final KafkaStreams streams = new > >>>> KafkaStreams(builder, streamsConfiguration);* * > >>>> streams.start();* > >>>> > >>>> * Runtime.getRuntime().addShutdownHook(new > >>>> Thread(streams::close));* > >>>> > >>>> However I get an error: > >>>> > >>>> > >>>> *Exception in thread "StreamThread-1" > >>>> java.lang.ClassCastException: > >>>> org.apache.kafka.streams.kstream.Windowed cannot be cast to > >>>> java.lang.String* > >>>> > >>>> On the other hand when I try this code: > >>>> > >>>> https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760a > f3a > >>>> > >>>> > >>>> > I get an error too which indicates that: > >>>> > >>>> *Exception in thread "StreamThread-1" > >>>> org.apache.kafka.common.errors.SerializationException: Size > >>>> of data received by LongDeserializer is not 8 * > >>>> > >>>> Here is generated topic: > >>>> > >>>> *kafka-console-consumer --zookeeper localhost:2181 --topic > >>>> myapp-test --from-beginning* * 28952314828122* * > >>>> 28988681653726* * 29080089383233* > >>>> > >>>> I know that I miss something but couldn't find it. > >>>> > >>>> Kind Regards, Furkan KAMACI > >>>> > >>>> On Tue, Oct 18, 2016 at 10:34 PM, Matthias J. Sax > >>>> <matth...@confluent.io> wrote: > >>>> > >>>> I see. KGroupedStream will be part of 0.10.1.0 (should be > >>>> release the next weeks). > >>>> > >>>> So, instead of > >>>> > >>>>>>> .groupByKey().count() > >>>> > >>>> you need to do > >>>> > >>>>>>> .countByKey() > >>>> > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> On 10/18/16 12:05 PM, Furkan KAMACI wrote: > >>>>>>> Hi Matthias, > >>>>>>> > >>>>>>> Thanks for your detailed answer. By the way I couldn't > >>>>>>> find "KGroupedStream" at version of 0.10.0.1? > >>>>>>> > >>>>>>> Kind Regards, Furkan KAMACI > >>>>>>> > >>>>>>> On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax > >>>>>>> <matth...@confluent.io> wrote: > >>>>>>> > >>>>>>> Hi, > >>>>>>> > >>>>>>> You just need to read you stream and apply an > >>>>>>> (windowed) aggregation on it. > >>>>>>> > >>>>>>> If you use non-windowed aggregation you will get "since > >>>>>>> the beginning". If you use windowed aggregation you can > >>>>>>> specify the window size as 1 hour and get those > >>>>>>> results. > >>>>>>> > >>>>>>> One comment: it seems that you want to count *all* > >>>>>>> queries. To make this work, you need to make sure all > >>>>>>> records are using the same key (because Kafka Streams > >>>>>>> only supports aggregation over keyed streams). Keep in > >>>>>>> mind, that this prohibits parallelization of you > >>>>>>> aggregation! > >>>>>>> > >>>>>>> As a workaround, you could also do two consecutive > >>>>>>> aggregation, and do parallelize the first one, and do > >>>>>>> not parallelize the second one (ie, using the first one > >>>>>>> as a pre aggregation similar to a combine step) > >>>>>>> > >>>>>>> Without pre aggregation and assuming all records use > >>>>>>> the same key something like this (for current trunk): > >>>>>>> > >>>>>>> > >>>>>>>>>> KStreamBuilder builder = new KStreamBuilder(): > >>>>>>>>>> KStream input = builder.stream("yourTopic"); > >>>>>>>>>> > >>>>>>>>>> KGroupedStream groupedInput = > >>>>>>>>>> input.groupByKey(); > >>>>>>>>>> > >>>>>>>>>> groupedInput.count("countStore").to("outputTopicCountFromBegi > nni > > > >>>>>>>>>> > ng" > >>>> > >>>>>>>>>> > > ); > >>>>>>>>>> > >>>>>>>>>> > >>>>>>> > >>>>>>>>>> > >>>> groupedInput.count(TimeWindows.of(3600 * 1000), > >>>>>>> "windowedCountStore").to("outputTopicHourlyCounts"): > >>>>>>> > >>>>>>> > >>>>>>> For more details, please see the docs and examples: > >>>>>>> > >>>>>>> - http://docs.confluent.io/current/streams/index.html > >>>>>>> - > >>>>>>> https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp- > 3.0 > > > >>>>>>> > .1/ > >>>> > >>>>>>> > > ka > >>>>>>> > >>>>>>> > >>>> fka-streams > >>>>>>> > >>>>>>> > >>>>>>> -Matthias > >>>>>>> > >>>>>>> On 10/18/16 5:00 AM, Furkan KAMACI wrote: > >>>>>>>>>> Hi, > >>>>>>>>>> > >>>>>>>>>> I could successfully run Kafka at my environment. > >>>>>>>>>> I want to monitor Queries per Second at my > >>>>>>>>>> search application with Kafka. Whenever a search > >>>>>>>>>> request is done I create a ProducerRecord which > >>>>>>>>>> holds current nano time of the system. > >>>>>>>>>> > >>>>>>>>>> I know that I have to use a streaming API for > >>>>>>>>>> calculation i.e. Kafka Streams or Spark Streams. > >>>>>>>>>> My choice is to use Kafka Streams. > >>>>>>>>>> > >>>>>>>>>> For last 1 hours, or since the beginning, I have > >>>>>>>>>> to calculate the queries per second. How can I > >>>>>>>>>> make such an aggregation at Kafka Streams? > >>>>>>>>>> > >>>>>>>>>> Kind Regards, Furkan KAMACI > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>>> > >> > > > -----BEGIN PGP SIGNATURE----- > Comment: GPGTools - https://gpgtools.org > > iQIcBAEBCgAGBQJYBqkRAAoJECnhiMLycopPLTUP/0fZeo1QP4xT0egmMfqvMw66 > //NkZZSFacv+6EdXRmsewvJOTzYhePsDoBQX8SSAFuSc+PiMTz/A3opisorZj+zU > u5OngfvmIWpxMd/V960AZZIynPsvpqTR+7NVjGfvCJ3LNKVJNEMp3wyM5bGmGMsf > Nh69P4g6WNo6lG6weZDLYME+E8+gDQ0HgdoheK+/5ZBhxZFI6no/ab1hg9uIxjmG > EI1gMrnL2WP3ttTZVpOqiJnxZoKsu2wPLhpea86BRGtIaOdDQPXx/Mkt4+f36EjK > 9h3+er2RUsKmXjFPgZe9zefigVPcYVIEjtLy2HjWF3Qwb3XhAWV9cY9izscPmsJy > spb7Gub8gQFI3SJJlYeS9Rlb7HaxQ2jUjsg0r3WUaQwcIZBwpk/1FI77Pj/Y60WH > ebhGPg6WJrg+5msw6N1wR4cxRaql2VPxHehlGT+D/7zyNuT7TiwZA6VU66Iao98o > rBodwnQAQ5DDZyFpGAdm1PucqzT5Psq10Nkxn4BMBvpg7+CzjkMpn2lcRfU+j3HL > 4QBBDA2iH+glEzzjN3ag17SRJd3wwPbpxkbHQsQtCyDR9Ki4jpEyIO6xrLRb+9hg > G6u37217Zxgbh0jfUzWhNAOtu8wCtx6EVZ7QjJ5RUsgQhoQSFTgYBVVVU7G0kQT2 > FF4TM+KTfpWwXKPTEwyV > =+X93 > -----END PGP SIGNATURE----- >