-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA512 You should create input/intermediate and output topic manually before you start you Kafka Streams application.
- -Matthias On 10/18/16 3:34 PM, Furkan KAMACI wrote: > Sorry about concurrent questions. Tried below code, didn't get any > error but couldn't get created output topic: > > Properties props = new Properties(); props.put("bootstrap.servers", > "localhost:9092"); props.put("acks", "all"); props.put("retries", > 0); props.put("batch.size", 16384); props.put("linger.ms", 1); > props.put("buffer.memory", 33554432); props.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > props.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > > Producer<String, String> producer = new KafkaProducer<>(props); > > for (int i = 0; i < 1000; i++) { producer.send(new > ProducerRecord<>( "input-topic", String.format("{\"type\":\"test\", > \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i))); > > > final KStreamBuilder builder = new KStreamBuilder(); final > KStream<String, Long> qps = builder.stream(Serdes.String(), > Serdes.Long(), "input-topic"); > qps.countByKey(TimeWindows.of("Hourly", 3600 * > 1000)).mapValues(Object::toString).to("output-topic"); > > final KafkaStreams streams = new KafkaStreams(builder, > streamsConfiguration); streams.start(); > > Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); > > On Wed, Oct 19, 2016 at 12:14 AM, Matthias J. Sax > <matth...@confluent.io> wrote: > > Two things: > > 1) you should not apply the window to the first count, but to the > base stream to get correct results. > > 2) your windowed aggregation, doew not just return String type, > but Window<K> type. Thus, you need to either insert a .map() to > transform you data into String typo, or you provide a custom > serializer when writing data to output topic (method, .to(...) has > multiple overloads) > > Per default, each topic read/write operation uses Serdes from the > streams config. If you data has a different type, you need to > provide appropriate Serdes for those operators. > > > -Matthias > > On 10/18/16 2:01 PM, Furkan KAMACI wrote: >>>> Hi Matthias, >>>> >>>> I've tried this code: >>>> >>>> * final Properties streamsConfiguration = new >>>> Properties();* * >>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, >>>> >>>> "myapp");* * >>>> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, >>>> >>>> "localhost:9092");* * >>>> streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, >>>> >>>> "localhost:2181");* * >>>> streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >>>> >>>> Serdes.String().getClass().getName());* * >>>> streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >>>> >>>> Serdes.String().getClass().getName());* * final >>>> KStreamBuilder builder = new KStreamBuilder();* * >>>> final KStream input = builder.stream("myapp-test");* >>>> >>>> * final KStream<String, Long> searchCounts = >>>> input.countByKey("SearchRequests").toStream();* * >>>> searchCounts.countByKey(TimeWindows.of("Hourly", 3600 * >>>> 1000)).to("outputTopicHourlyCounts");* >>>> >>>> * final KafkaStreams streams = new >>>> KafkaStreams(builder, streamsConfiguration);* * >>>> streams.start();* >>>> >>>> * Runtime.getRuntime().addShutdownHook(new >>>> Thread(streams::close));* >>>> >>>> However I get an error: >>>> >>>> >>>> *Exception in thread "StreamThread-1" >>>> java.lang.ClassCastException: >>>> org.apache.kafka.streams.kstream.Windowed cannot be cast to >>>> java.lang.String* >>>> >>>> On the other hand when I try this code: >>>> >>>> https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760a f3a >>>> >>>> >>>> I get an error too which indicates that: >>>> >>>> *Exception in thread "StreamThread-1" >>>> org.apache.kafka.common.errors.SerializationException: Size >>>> of data received by LongDeserializer is not 8 * >>>> >>>> Here is generated topic: >>>> >>>> *kafka-console-consumer --zookeeper localhost:2181 --topic >>>> myapp-test --from-beginning* * 28952314828122* * >>>> 28988681653726* * 29080089383233* >>>> >>>> I know that I miss something but couldn't find it. >>>> >>>> Kind Regards, Furkan KAMACI >>>> >>>> On Tue, Oct 18, 2016 at 10:34 PM, Matthias J. Sax >>>> <matth...@confluent.io> wrote: >>>> >>>> I see. KGroupedStream will be part of 0.10.1.0 (should be >>>> release the next weeks). >>>> >>>> So, instead of >>>> >>>>>>> .groupByKey().count() >>>> >>>> you need to do >>>> >>>>>>> .countByKey() >>>> >>>> >>>> >>>> -Matthias >>>> >>>> On 10/18/16 12:05 PM, Furkan KAMACI wrote: >>>>>>> Hi Matthias, >>>>>>> >>>>>>> Thanks for your detailed answer. By the way I couldn't >>>>>>> find "KGroupedStream" at version of 0.10.0.1? >>>>>>> >>>>>>> Kind Regards, Furkan KAMACI >>>>>>> >>>>>>> On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax >>>>>>> <matth...@confluent.io> wrote: >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> You just need to read you stream and apply an >>>>>>> (windowed) aggregation on it. >>>>>>> >>>>>>> If you use non-windowed aggregation you will get "since >>>>>>> the beginning". If you use windowed aggregation you can >>>>>>> specify the window size as 1 hour and get those >>>>>>> results. >>>>>>> >>>>>>> One comment: it seems that you want to count *all* >>>>>>> queries. To make this work, you need to make sure all >>>>>>> records are using the same key (because Kafka Streams >>>>>>> only supports aggregation over keyed streams). Keep in >>>>>>> mind, that this prohibits parallelization of you >>>>>>> aggregation! >>>>>>> >>>>>>> As a workaround, you could also do two consecutive >>>>>>> aggregation, and do parallelize the first one, and do >>>>>>> not parallelize the second one (ie, using the first one >>>>>>> as a pre aggregation similar to a combine step) >>>>>>> >>>>>>> Without pre aggregation and assuming all records use >>>>>>> the same key something like this (for current trunk): >>>>>>> >>>>>>> >>>>>>>>>> KStreamBuilder builder = new KStreamBuilder(): >>>>>>>>>> KStream input = builder.stream("yourTopic"); >>>>>>>>>> >>>>>>>>>> KGroupedStream groupedInput = >>>>>>>>>> input.groupByKey(); >>>>>>>>>> >>>>>>>>>> groupedInput.count("countStore").to("outputTopicCountFromBegi nni > >>>>>>>>>> ng" >>>> >>>>>>>>>> > ); >>>>>>>>>> >>>>>>>>>> >>>>>>> >>>>>>>>>> >>>> groupedInput.count(TimeWindows.of(3600 * 1000), >>>>>>> "windowedCountStore").to("outputTopicHourlyCounts"): >>>>>>> >>>>>>> >>>>>>> For more details, please see the docs and examples: >>>>>>> >>>>>>> - http://docs.confluent.io/current/streams/index.html >>>>>>> - >>>>>>> https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp- 3.0 > >>>>>>> .1/ >>>> >>>>>>> > ka >>>>>>> >>>>>>> >>>> fka-streams >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> On 10/18/16 5:00 AM, Furkan KAMACI wrote: >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> I could successfully run Kafka at my environment. >>>>>>>>>> I want to monitor Queries per Second at my >>>>>>>>>> search application with Kafka. Whenever a search >>>>>>>>>> request is done I create a ProducerRecord which >>>>>>>>>> holds current nano time of the system. >>>>>>>>>> >>>>>>>>>> I know that I have to use a streaming API for >>>>>>>>>> calculation i.e. Kafka Streams or Spark Streams. >>>>>>>>>> My choice is to use Kafka Streams. >>>>>>>>>> >>>>>>>>>> For last 1 hours, or since the beginning, I have >>>>>>>>>> to calculate the queries per second. How can I >>>>>>>>>> make such an aggregation at Kafka Streams? >>>>>>>>>> >>>>>>>>>> Kind Regards, Furkan KAMACI >>>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>> >> > -----BEGIN PGP SIGNATURE----- Comment: GPGTools - https://gpgtools.org iQIcBAEBCgAGBQJYBqkRAAoJECnhiMLycopPLTUP/0fZeo1QP4xT0egmMfqvMw66 //NkZZSFacv+6EdXRmsewvJOTzYhePsDoBQX8SSAFuSc+PiMTz/A3opisorZj+zU u5OngfvmIWpxMd/V960AZZIynPsvpqTR+7NVjGfvCJ3LNKVJNEMp3wyM5bGmGMsf Nh69P4g6WNo6lG6weZDLYME+E8+gDQ0HgdoheK+/5ZBhxZFI6no/ab1hg9uIxjmG EI1gMrnL2WP3ttTZVpOqiJnxZoKsu2wPLhpea86BRGtIaOdDQPXx/Mkt4+f36EjK 9h3+er2RUsKmXjFPgZe9zefigVPcYVIEjtLy2HjWF3Qwb3XhAWV9cY9izscPmsJy spb7Gub8gQFI3SJJlYeS9Rlb7HaxQ2jUjsg0r3WUaQwcIZBwpk/1FI77Pj/Y60WH ebhGPg6WJrg+5msw6N1wR4cxRaql2VPxHehlGT+D/7zyNuT7TiwZA6VU66Iao98o rBodwnQAQ5DDZyFpGAdm1PucqzT5Psq10Nkxn4BMBvpg7+CzjkMpn2lcRfU+j3HL 4QBBDA2iH+glEzzjN3ag17SRJd3wwPbpxkbHQsQtCyDR9Ki4jpEyIO6xrLRb+9hg G6u37217Zxgbh0jfUzWhNAOtu8wCtx6EVZ7QjJ5RUsgQhoQSFTgYBVVVU7G0kQT2 FF4TM+KTfpWwXKPTEwyV =+X93 -----END PGP SIGNATURE-----