I could successfully get the total (not average). As far as I see, there is
no need to create a topic manually before I run the app. Topic is created
if there is data and topic name not exists. Here is my code:

KStreamBuilder builder = new KStreamBuilder();

        KStream<String, String> longs = builder.stream(Serdes.String(),
Serdes.String(), "mytopic");

        KTable<Windowed<String>, Long> longCounts =
                longs.countByKey(TimeWindows.of("output-topic", 3600 *
1000),
                        Serdes.String());

        KafkaStreams streams = new KafkaStreams(builder,
streamsConfiguration);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

On Wed, Oct 19, 2016 at 1:58 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> You should create input/intermediate and output topic manually before
> you start you Kafka Streams application.
>
>
> - -Matthias
>
> On 10/18/16 3:34 PM, Furkan KAMACI wrote:
> > Sorry about concurrent questions. Tried below code, didn't get any
> > error but couldn't get created output topic:
> >
> > Properties props = new Properties(); props.put("bootstrap.servers",
> > "localhost:9092"); props.put("acks", "all"); props.put("retries",
> > 0); props.put("batch.size", 16384); props.put("linger.ms", 1);
> > props.put("buffer.memory", 33554432); props.put("key.serializer",
> > "org.apache.kafka.common.serialization.StringSerializer");
> > props.put("value.serializer",
> > "org.apache.kafka.common.serialization.StringSerializer");
> >
> > Producer<String, String> producer = new KafkaProducer<>(props);
> >
> > for (int i = 0; i < 1000; i++) { producer.send(new
> > ProducerRecord<>( "input-topic", String.format("{\"type\":\"test\",
> > \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)));
> >
> >
> > final KStreamBuilder builder = new KStreamBuilder(); final
> > KStream<String, Long> qps = builder.stream(Serdes.String(),
> > Serdes.Long(), "input-topic");
> > qps.countByKey(TimeWindows.of("Hourly", 3600 *
> > 1000)).mapValues(Object::toString).to("output-topic");
> >
> > final KafkaStreams streams = new KafkaStreams(builder,
> > streamsConfiguration); streams.start();
> >
> > Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
> >
> > On Wed, Oct 19, 2016 at 12:14 AM, Matthias J. Sax
> > <matth...@confluent.io> wrote:
> >
> > Two things:
> >
> > 1) you should not apply the window to the first count, but to the
> > base stream to get correct results.
> >
> > 2) your windowed aggregation, doew not just return String type,
> > but Window<K> type. Thus, you need to either insert a .map() to
> > transform you data into String typo, or you provide a custom
> > serializer when writing data to output topic (method, .to(...) has
> > multiple overloads)
> >
> > Per default, each topic read/write operation uses Serdes from the
> > streams config. If you data has a different type, you need to
> > provide appropriate Serdes for those operators.
> >
> >
> > -Matthias
> >
> > On 10/18/16 2:01 PM, Furkan KAMACI wrote:
> >>>> Hi Matthias,
> >>>>
> >>>> I've tried this code:
> >>>>
> >>>> *        final Properties streamsConfiguration = new
> >>>> Properties();* *
> >>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> >>>>
> >>>>
> "myapp");* *
> >>>> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> >>>>
> >>>>
> "localhost:9092");* *
> >>>> streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> >>>>
> >>>>
> "localhost:2181");* *
> >>>> streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >>>>
> >>>>
> Serdes.String().getClass().getName());* *
> >>>> streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>>>
> >>>>
> Serdes.String().getClass().getName());* *        final
> >>>> KStreamBuilder builder = new KStreamBuilder();* *
> >>>> final KStream input = builder.stream("myapp-test");*
> >>>>
> >>>> *        final KStream<String, Long> searchCounts =
> >>>> input.countByKey("SearchRequests").toStream();* *
> >>>> searchCounts.countByKey(TimeWindows.of("Hourly", 3600 *
> >>>> 1000)).to("outputTopicHourlyCounts");*
> >>>>
> >>>> *        final KafkaStreams streams = new
> >>>> KafkaStreams(builder, streamsConfiguration);* *
> >>>> streams.start();*
> >>>>
> >>>> *        Runtime.getRuntime().addShutdownHook(new
> >>>> Thread(streams::close));*
> >>>>
> >>>> However I get an error:
> >>>>
> >>>>
> >>>> *Exception in thread "StreamThread-1"
> >>>> java.lang.ClassCastException:
> >>>> org.apache.kafka.streams.kstream.Windowed cannot be cast to
> >>>> java.lang.String*
> >>>>
> >>>> On the other hand when I try this code:
> >>>>
> >>>> https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760a
> f3a
> >>>>
> >>>>
> >>>>
> I get an error too which indicates that:
> >>>>
> >>>> *Exception in thread "StreamThread-1"
> >>>> org.apache.kafka.common.errors.SerializationException: Size
> >>>> of data received by LongDeserializer is not 8 *
> >>>>
> >>>> Here is generated topic:
> >>>>
> >>>> *kafka-console-consumer --zookeeper localhost:2181 --topic
> >>>> myapp-test --from-beginning* *        28952314828122* *
> >>>> 28988681653726* *        29080089383233*
> >>>>
> >>>> I know that I miss something but couldn't find it.
> >>>>
> >>>> Kind Regards, Furkan KAMACI
> >>>>
> >>>> On Tue, Oct 18, 2016 at 10:34 PM, Matthias J. Sax
> >>>> <matth...@confluent.io> wrote:
> >>>>
> >>>> I see. KGroupedStream will be part of 0.10.1.0 (should be
> >>>> release the next weeks).
> >>>>
> >>>> So, instead of
> >>>>
> >>>>>>> .groupByKey().count()
> >>>>
> >>>> you need to do
> >>>>
> >>>>>>> .countByKey()
> >>>>
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 10/18/16 12:05 PM, Furkan KAMACI wrote:
> >>>>>>> Hi Matthias,
> >>>>>>>
> >>>>>>> Thanks for your detailed answer. By the way I couldn't
> >>>>>>> find "KGroupedStream" at version of 0.10.0.1?
> >>>>>>>
> >>>>>>> Kind Regards, Furkan KAMACI
> >>>>>>>
> >>>>>>> On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax
> >>>>>>> <matth...@confluent.io> wrote:
> >>>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> You just need to read you stream and apply an
> >>>>>>> (windowed) aggregation on it.
> >>>>>>>
> >>>>>>> If you use non-windowed aggregation you will get "since
> >>>>>>> the beginning". If you use windowed aggregation you can
> >>>>>>> specify the window size as 1 hour and get those
> >>>>>>> results.
> >>>>>>>
> >>>>>>> One comment: it seems that you want to count *all*
> >>>>>>> queries. To make this work, you need to make sure all
> >>>>>>> records are using the same key (because Kafka Streams
> >>>>>>> only supports aggregation over keyed streams). Keep in
> >>>>>>> mind, that this prohibits parallelization of you
> >>>>>>> aggregation!
> >>>>>>>
> >>>>>>> As a workaround, you could also do two consecutive
> >>>>>>> aggregation, and do parallelize the first one, and do
> >>>>>>> not parallelize the second one (ie, using the first one
> >>>>>>> as a pre aggregation similar to a combine step)
> >>>>>>>
> >>>>>>> Without pre aggregation and assuming all records use
> >>>>>>> the same key something like this (for current trunk):
> >>>>>>>
> >>>>>>>
> >>>>>>>>>> KStreamBuilder builder = new KStreamBuilder():
> >>>>>>>>>> KStream input = builder.stream("yourTopic");
> >>>>>>>>>>
> >>>>>>>>>> KGroupedStream groupedInput =
> >>>>>>>>>> input.groupByKey();
> >>>>>>>>>>
> >>>>>>>>>> groupedInput.count("countStore").to("outputTopicCountFromBegi
> nni
> >
> >>>>>>>>>>
> ng"
> >>>>
> >>>>>>>>>>
> > );
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>
> >>>>>>>>>>
> >>>> groupedInput.count(TimeWindows.of(3600 * 1000),
> >>>>>>> "windowedCountStore").to("outputTopicHourlyCounts"):
> >>>>>>>
> >>>>>>>
> >>>>>>> For more details, please see the docs and examples:
> >>>>>>>
> >>>>>>> - http://docs.confluent.io/current/streams/index.html
> >>>>>>> -
> >>>>>>> https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-
> 3.0
> >
> >>>>>>>
> .1/
> >>>>
> >>>>>>>
> > ka
> >>>>>>>
> >>>>>>>
> >>>> fka-streams
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>> On 10/18/16 5:00 AM, Furkan KAMACI wrote:
> >>>>>>>>>> Hi,
> >>>>>>>>>>
> >>>>>>>>>> I could successfully run Kafka at my environment.
> >>>>>>>>>> I want to monitor Queries per Second at my
> >>>>>>>>>> search application with Kafka. Whenever a search
> >>>>>>>>>> request is done I create a ProducerRecord which
> >>>>>>>>>> holds current nano time of the system.
> >>>>>>>>>>
> >>>>>>>>>> I know that I have to use a streaming API for
> >>>>>>>>>> calculation i.e. Kafka Streams or Spark Streams.
> >>>>>>>>>> My choice is to use Kafka Streams.
> >>>>>>>>>>
> >>>>>>>>>> For last 1 hours, or since the beginning, I have
> >>>>>>>>>> to calculate the queries per second. How can I
> >>>>>>>>>> make such an aggregation at Kafka Streams?
> >>>>>>>>>>
> >>>>>>>>>> Kind Regards, Furkan KAMACI
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYBqkRAAoJECnhiMLycopPLTUP/0fZeo1QP4xT0egmMfqvMw66
> //NkZZSFacv+6EdXRmsewvJOTzYhePsDoBQX8SSAFuSc+PiMTz/A3opisorZj+zU
> u5OngfvmIWpxMd/V960AZZIynPsvpqTR+7NVjGfvCJ3LNKVJNEMp3wyM5bGmGMsf
> Nh69P4g6WNo6lG6weZDLYME+E8+gDQ0HgdoheK+/5ZBhxZFI6no/ab1hg9uIxjmG
> EI1gMrnL2WP3ttTZVpOqiJnxZoKsu2wPLhpea86BRGtIaOdDQPXx/Mkt4+f36EjK
> 9h3+er2RUsKmXjFPgZe9zefigVPcYVIEjtLy2HjWF3Qwb3XhAWV9cY9izscPmsJy
> spb7Gub8gQFI3SJJlYeS9Rlb7HaxQ2jUjsg0r3WUaQwcIZBwpk/1FI77Pj/Y60WH
> ebhGPg6WJrg+5msw6N1wR4cxRaql2VPxHehlGT+D/7zyNuT7TiwZA6VU66Iao98o
> rBodwnQAQ5DDZyFpGAdm1PucqzT5Psq10Nkxn4BMBvpg7+CzjkMpn2lcRfU+j3HL
> 4QBBDA2iH+glEzzjN3ag17SRJd3wwPbpxkbHQsQtCyDR9Ki4jpEyIO6xrLRb+9hg
> G6u37217Zxgbh0jfUzWhNAOtu8wCtx6EVZ7QjJ5RUsgQhoQSFTgYBVVVU7G0kQT2
> FF4TM+KTfpWwXKPTEwyV
> =+X93
> -----END PGP SIGNATURE-----
>

Reply via email to