-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

You should create input/intermediate and output topic manually before
you start you Kafka Streams application.


- -Matthias

On 10/18/16 3:34 PM, Furkan KAMACI wrote:
> Sorry about concurrent questions. Tried below code, didn't get any
> error but couldn't get created output topic:
> 
> Properties props = new Properties(); props.put("bootstrap.servers",
> "localhost:9092"); props.put("acks", "all"); props.put("retries",
> 0); props.put("batch.size", 16384); props.put("linger.ms", 1); 
> props.put("buffer.memory", 33554432); props.put("key.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer"); 
> props.put("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
> 
> Producer<String, String> producer = new KafkaProducer<>(props);
> 
> for (int i = 0; i < 1000; i++) { producer.send(new
> ProducerRecord<>( "input-topic", String.format("{\"type\":\"test\",
> \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)));
> 
> 
> final KStreamBuilder builder = new KStreamBuilder(); final
> KStream<String, Long> qps = builder.stream(Serdes.String(), 
> Serdes.Long(), "input-topic"); 
> qps.countByKey(TimeWindows.of("Hourly", 3600 * 
> 1000)).mapValues(Object::toString).to("output-topic");
> 
> final KafkaStreams streams = new KafkaStreams(builder, 
> streamsConfiguration); streams.start();
> 
> Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
> 
> On Wed, Oct 19, 2016 at 12:14 AM, Matthias J. Sax
> <matth...@confluent.io> wrote:
> 
> Two things:
> 
> 1) you should not apply the window to the first count, but to the
> base stream to get correct results.
> 
> 2) your windowed aggregation, doew not just return String type,
> but Window<K> type. Thus, you need to either insert a .map() to
> transform you data into String typo, or you provide a custom
> serializer when writing data to output topic (method, .to(...) has
> multiple overloads)
> 
> Per default, each topic read/write operation uses Serdes from the 
> streams config. If you data has a different type, you need to
> provide appropriate Serdes for those operators.
> 
> 
> -Matthias
> 
> On 10/18/16 2:01 PM, Furkan KAMACI wrote:
>>>> Hi Matthias,
>>>> 
>>>> I've tried this code:
>>>> 
>>>> *        final Properties streamsConfiguration = new 
>>>> Properties();* * 
>>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>>>
>>>> 
"myapp");* *
>>>> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>
>>>> 
"localhost:9092");* *
>>>> streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
>>>>
>>>> 
"localhost:2181");* *
>>>> streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>>>
>>>> 
Serdes.String().getClass().getName());* *
>>>> streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>>>
>>>> 
Serdes.String().getClass().getName());* *        final
>>>> KStreamBuilder builder = new KStreamBuilder();* *
>>>> final KStream input = builder.stream("myapp-test");*
>>>> 
>>>> *        final KStream<String, Long> searchCounts = 
>>>> input.countByKey("SearchRequests").toStream();* * 
>>>> searchCounts.countByKey(TimeWindows.of("Hourly", 3600 * 
>>>> 1000)).to("outputTopicHourlyCounts");*
>>>> 
>>>> *        final KafkaStreams streams = new
>>>> KafkaStreams(builder, streamsConfiguration);* *
>>>> streams.start();*
>>>> 
>>>> *        Runtime.getRuntime().addShutdownHook(new 
>>>> Thread(streams::close));*
>>>> 
>>>> However I get an error:
>>>> 
>>>> 
>>>> *Exception in thread "StreamThread-1" 
>>>> java.lang.ClassCastException: 
>>>> org.apache.kafka.streams.kstream.Windowed cannot be cast to 
>>>> java.lang.String*
>>>> 
>>>> On the other hand when I try this code:
>>>> 
>>>> https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760a
f3a
>>>>
>>>>
>>>> 
I get an error too which indicates that:
>>>> 
>>>> *Exception in thread "StreamThread-1" 
>>>> org.apache.kafka.common.errors.SerializationException: Size
>>>> of data received by LongDeserializer is not 8 *
>>>> 
>>>> Here is generated topic:
>>>> 
>>>> *kafka-console-consumer --zookeeper localhost:2181 --topic 
>>>> myapp-test --from-beginning* *        28952314828122* * 
>>>> 28988681653726* *        29080089383233*
>>>> 
>>>> I know that I miss something but couldn't find it.
>>>> 
>>>> Kind Regards, Furkan KAMACI
>>>> 
>>>> On Tue, Oct 18, 2016 at 10:34 PM, Matthias J. Sax 
>>>> <matth...@confluent.io> wrote:
>>>> 
>>>> I see. KGroupedStream will be part of 0.10.1.0 (should be
>>>> release the next weeks).
>>>> 
>>>> So, instead of
>>>> 
>>>>>>> .groupByKey().count()
>>>> 
>>>> you need to do
>>>> 
>>>>>>> .countByKey()
>>>> 
>>>> 
>>>> 
>>>> -Matthias
>>>> 
>>>> On 10/18/16 12:05 PM, Furkan KAMACI wrote:
>>>>>>> Hi Matthias,
>>>>>>> 
>>>>>>> Thanks for your detailed answer. By the way I couldn't
>>>>>>> find "KGroupedStream" at version of 0.10.0.1?
>>>>>>> 
>>>>>>> Kind Regards, Furkan KAMACI
>>>>>>> 
>>>>>>> On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax 
>>>>>>> <matth...@confluent.io> wrote:
>>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>> You just need to read you stream and apply an
>>>>>>> (windowed) aggregation on it.
>>>>>>> 
>>>>>>> If you use non-windowed aggregation you will get "since
>>>>>>> the beginning". If you use windowed aggregation you can
>>>>>>> specify the window size as 1 hour and get those
>>>>>>> results.
>>>>>>> 
>>>>>>> One comment: it seems that you want to count *all*
>>>>>>> queries. To make this work, you need to make sure all
>>>>>>> records are using the same key (because Kafka Streams
>>>>>>> only supports aggregation over keyed streams). Keep in
>>>>>>> mind, that this prohibits parallelization of you
>>>>>>> aggregation!
>>>>>>> 
>>>>>>> As a workaround, you could also do two consecutive 
>>>>>>> aggregation, and do parallelize the first one, and do
>>>>>>> not parallelize the second one (ie, using the first one
>>>>>>> as a pre aggregation similar to a combine step)
>>>>>>> 
>>>>>>> Without pre aggregation and assuming all records use
>>>>>>> the same key something like this (for current trunk):
>>>>>>> 
>>>>>>> 
>>>>>>>>>> KStreamBuilder builder = new KStreamBuilder():
>>>>>>>>>> KStream input = builder.stream("yourTopic");
>>>>>>>>>> 
>>>>>>>>>> KGroupedStream groupedInput =
>>>>>>>>>> input.groupByKey();
>>>>>>>>>> 
>>>>>>>>>> groupedInput.count("countStore").to("outputTopicCountFromBegi
nni
>
>>>>>>>>>> 
ng"
>>>> 
>>>>>>>>>> 
> );
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>> 
>>>>>>>>>> 
>>>> groupedInput.count(TimeWindows.of(3600 * 1000),
>>>>>>> "windowedCountStore").to("outputTopicHourlyCounts"):
>>>>>>> 
>>>>>>> 
>>>>>>> For more details, please see the docs and examples:
>>>>>>> 
>>>>>>> - http://docs.confluent.io/current/streams/index.html
>>>>>>> - 
>>>>>>> https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-
3.0
>
>>>>>>> 
.1/
>>>> 
>>>>>>> 
> ka
>>>>>>> 
>>>>>>> 
>>>> fka-streams
>>>>>>> 
>>>>>>> 
>>>>>>> -Matthias
>>>>>>> 
>>>>>>> On 10/18/16 5:00 AM, Furkan KAMACI wrote:
>>>>>>>>>> Hi,
>>>>>>>>>> 
>>>>>>>>>> I could successfully run Kafka at my environment.
>>>>>>>>>> I want to monitor Queries per Second at my
>>>>>>>>>> search application with Kafka. Whenever a search
>>>>>>>>>> request is done I create a ProducerRecord which
>>>>>>>>>> holds current nano time of the system.
>>>>>>>>>> 
>>>>>>>>>> I know that I have to use a streaming API for 
>>>>>>>>>> calculation i.e. Kafka Streams or Spark Streams.
>>>>>>>>>> My choice is to use Kafka Streams.
>>>>>>>>>> 
>>>>>>>>>> For last 1 hours, or since the beginning, I have
>>>>>>>>>> to calculate the queries per second. How can I
>>>>>>>>>> make such an aggregation at Kafka Streams?
>>>>>>>>>> 
>>>>>>>>>> Kind Regards, Furkan KAMACI
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYBqkRAAoJECnhiMLycopPLTUP/0fZeo1QP4xT0egmMfqvMw66
//NkZZSFacv+6EdXRmsewvJOTzYhePsDoBQX8SSAFuSc+PiMTz/A3opisorZj+zU
u5OngfvmIWpxMd/V960AZZIynPsvpqTR+7NVjGfvCJ3LNKVJNEMp3wyM5bGmGMsf
Nh69P4g6WNo6lG6weZDLYME+E8+gDQ0HgdoheK+/5ZBhxZFI6no/ab1hg9uIxjmG
EI1gMrnL2WP3ttTZVpOqiJnxZoKsu2wPLhpea86BRGtIaOdDQPXx/Mkt4+f36EjK
9h3+er2RUsKmXjFPgZe9zefigVPcYVIEjtLy2HjWF3Qwb3XhAWV9cY9izscPmsJy
spb7Gub8gQFI3SJJlYeS9Rlb7HaxQ2jUjsg0r3WUaQwcIZBwpk/1FI77Pj/Y60WH
ebhGPg6WJrg+5msw6N1wR4cxRaql2VPxHehlGT+D/7zyNuT7TiwZA6VU66Iao98o
rBodwnQAQ5DDZyFpGAdm1PucqzT5Psq10Nkxn4BMBvpg7+CzjkMpn2lcRfU+j3HL
4QBBDA2iH+glEzzjN3ag17SRJd3wwPbpxkbHQsQtCyDR9Ki4jpEyIO6xrLRb+9hg
G6u37217Zxgbh0jfUzWhNAOtu8wCtx6EVZ7QjJ5RUsgQhoQSFTgYBVVVU7G0kQT2
FF4TM+KTfpWwXKPTEwyV
=+X93
-----END PGP SIGNATURE-----

Reply via email to