Re: facing serialization issues with Kafka Streams application ..

2017-06-20 Thread Guozhang Wang
Damian, This type of questions seem to be common, could you add an entry into FAQ? https://cwiki.apache.org/confluence/display/KAFKA/FAQ Guozhang On Tue, Jun 20, 2017 at 8:23 AM, Debasish Ghosh wrote: > On Tue, Jun 20, 2017 at 7:03 PM, Damian Guy wrote: > > > m.groupBy(mapper, Serdes.String(

Re: facing serialization issues with Kafka Streams application ..

2017-06-20 Thread Debasish Ghosh
On Tue, Jun 20, 2017 at 7:03 PM, Damian Guy wrote: > m.groupBy(mapper, Serdes.String(), Serdes.String()).count(" > HostAggregateCount") > Thanks! It works .. -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http:

Re: facing serialization issues with Kafka Streams application ..

2017-06-20 Thread Damian Guy
Hi, You should provide the serdes in the `groupByKey()` operation. The `map` will trigger a re-partition in the `groupByKey` as you have changed the key. In fact you could replace the `map` and `groupByKey` with: m.groupBy(mapper, Serdes.String(), Serdes.String()).count("HostAggregateCount") Tha

facing serialization issues with Kafka Streams application ..

2017-06-20 Thread Debasish Ghosh
Hi - I have the following Scala snippet in a Kafka streams application .. val builder = new KStreamBuilder() val logRecords: KStream[Array[Byte], LogRecord] = builder.stream(Serdes.ByteArray(), logRecordSerde, config.toTopic) val m: KStream[Array[Byte], String] = logRecords.mapValues(hostExtract