Here's the solution (props to Damian G) JsonSerializer<AggKey> keySerializer = new JsonSerializer<>(); JsonDeserializer<AggKey> keyDeserializer = new JsonDeserializer<>(AggKey.class); Serde<AggKey> keySerde = Serdes.serdeFrom(keySerializer, keyDeserializer);
then for the aggregator call 'groupByKey(keySerde, prtRecordSerde)'. In the documentation where it says the 'no param' groupByKey will use the default serializers - this doesn't seem to be true. On Tue, Dec 6, 2016 at 12:28 PM, Jon Yeargers <jon.yearg...@cedexis.com> wrote: > Hmm. That's odd as the aggregation works ok if I use a String value for > the key (and the corresponding String serde). > > This error only started occurring when I tried to substitute my 'custom' > key for the original String. > > On Tue, Dec 6, 2016 at 12:24 PM, Radek Gruchalski <ra...@gruchalski.com> > wrote: > >> Yeah, I knew that already, this part of the error: >> >> > > >>> > > org.apache.kafka.streams.processor.internals. >> > RecordCollector.send( >> > > >>> > RecordCollector.java:73) >> >> points to this line: https://github.com/apache/kafka/blob/0.10.1/streams/ >> src/main/java/org/apache/kafka/streams/processor/ >> internals/RecordCollector.java#L73 >> >> which means that your error happens on the value, not the key. >> >> – >> Best regards, >> Radek Gruchalski >> ra...@gruchalski.com >> >> >> On December 6, 2016 at 9:18:53 PM, Jon Yeargers (jon.yearg...@cedexis.com) >> wrote: >> >> 0.10.1.0 >> >> On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski <ra...@gruchalski.com> >> wrote: >> >> > Jon, >> > >> > Are you using 0.10.1 or 0.10.0.1? >> > >> > – >> > Best regards, >> > Radek Gruchalski >> > ra...@gruchalski.com >> > >> > >> > On December 6, 2016 at 7:55:30 PM, Damian Guy (damian....@gmail.com) >> > wrote: >> > >> > Hi Jon, >> > >> > At a glance the code looks ok, i.e, i believe the aggregate() should >> have >> > picked up the default Serde set in your StreamsConfig. However, you >> could >> > try adding the Serdes to the groupBy(..) >> > >> > i.e., >> > rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...) >> > >> > Thanks, >> > Damian >> > >> > On Tue, 6 Dec 2016 at 18:42 Jon Yeargers <jon.yearg...@cedexis.com> >> wrote: >> > >> > > It's just a bunch of public 'int' and 'String' values. There's an >> empty >> > > constructor and a copy constructor. >> > > >> > > For functions I override 'equals' and the requirements for 'serde' >> > (close, >> > > configure, serializer and deserializer). >> > > >> > > @Override >> > > public Serializer serializer() { >> > > JsonSerializer<AggKey> jsonSerializer = new JsonSerializer<>(); >> > > return jsonSerializer; >> > > } >> > > >> > > @Override >> > > public Deserializer deserializer() { >> > > JsonDeserializer<AggKey> jsonDeserializer = new >> > > JsonDeserializer<>(); >> > > return jsonDeserializer; >> > > } >> > > >> > > >> > > >> > > >> > > Which relates to: >> > > >> > > public class JsonSerializer<T> implements Serializer<T> { >> > > >> > > private Gson gson = new Gson(); >> > > >> > > @Override >> > > public void configure(Map<String, ?> map, boolean b) { >> > > >> > > } >> > > >> > > @Override >> > > public byte[] serialize(String topic, T t) { >> > > return gson.toJson(t).getBytes(Charset.forName("UTF-8")); >> > > } >> > > >> > > @Override >> > > public void close() { >> > > >> > > } >> > > } >> > > >> > > >> > > >> > > public class JsonDeserializer<T> implements Deserializer<T> { >> > > >> > > private Gson gson = new Gson(); >> > > private Class<T> deserializedClass; >> > > >> > > public JsonDeserializer(Class<T> deserializedClass) { >> > > this.deserializedClass = deserializedClass; >> > > } >> > > >> > > public JsonDeserializer() { >> > > } >> > > >> > > @Override >> > > @SuppressWarnings("unchecked") >> > > public void configure(Map<String, ?> map, boolean b) { >> > > if(deserializedClass == null) { >> > > deserializedClass = (Class<T>) map.get("serializedClass"); >> > > } >> > > } >> > > >> > > @Override >> > > public T deserialize(String s, byte[] bytes) { >> > > if(bytes == null){ >> > > return null; >> > > } >> > > >> > > return gson.fromJson(new String(bytes),deserializedClass); >> > > >> > > } >> > > >> > > @Override >> > > public void close() { >> > > >> > > } >> > > } >> > > >> > > On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski < >> ra...@gruchalski.com> >> > > wrote: >> > > >> > > > Do you mind sharing the code of AggKey class? >> > > > >> > > > – >> > > > Best regards, >> > > > Radek Gruchalski >> > > > ra...@gruchalski.com >> > > > >> > > > >> > > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers ( >> > > jon.yearg...@cedexis.com) >> > > > wrote: >> > > > >> > > > The 2nd. >> > > > >> > > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski < >> > ra...@gruchalski.com> >> > > > wrote: >> > > > >> > > >> Is the error happening at this stage? >> > > >> >> > > >> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, >> > > value) >> > > >> -> new KeyValue<>(new AggKey(value), value)); >> > > >> >> > > >> or here: >> > > >> >> > > >> KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail = >> > > >> rtRekey.groupByKey().aggregate( >> > > >> BqRtDetailLogLine_aggregate::new, >> > > >> new PRTAggregate(), >> > > >> TimeWindows.of(60 * 60 * 1000L), >> > > >> collectorSerde, "prt_minute_agg_stream"); >> > > >> >> > > >> – >> > > >> >> > > >> Best regards, >> > > >> Radek Gruchalski >> > > >> ra...@gruchalski.com >> > > >> >> > > >> >> > > >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers ( >> > > jon.yearg...@cedexis.com) >> > > >> wrote: >> > > >> >> > > >> If I comment out the aggregation step and just .print the .map >> step I >> > > >> don't hit the error. It's coming from aggregating the non-String >> key. >> > > >> >> > > >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski < >> > ra...@gruchalski.com> >> > > >> wrote: >> > > >> >> > > >>> Jon, >> > > >>> >> > > >>> Looking at your code: >> > > >>> >> > > >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >> > > >>> Serdes.String().getClass().getName()); >> > > >>> >> > > >>> and later: >> > > >>> >> > > >>> KStream<String, RtDetailLogLine> rtDetailLines = >> > > >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); >> > > >>> >> > > >>> Is RtDetailLogLine inheriting from String? It is not, as the >> error >> > > >>> suggests. >> > > >>> You may have to write your own Serializer / Deserializer for >> > > >>> RtDetailLogLine. >> > > >>> >> > > >>> – >> > > >>> Best regards, >> > > >>> Radek Gruchalski >> > > >>> ra...@gruchalski.com >> > > >>> >> > > >>> >> > > >>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers ( >> > > >>> jon.yearg...@cedexis.com) wrote: >> > > >>> >> > > >>> Using 0.10.1.0 >> > > >>> >> > > >>> This is my topology: >> > > >>> >> > > >>> Properties config = new Properties(); >> > > >>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); >> > > >>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, >> ZOOKEEPER_IP); >> > > >>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" >> ); >> > > >>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >> > > >>> AggKey.class.getName()); >> > > >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >> > > >>> Serdes.String().getClass().getName()); >> > > >>> >> > > >>> JsonSerializer<BqRtDetailLogLine_aggregate> sumRecordsSerializer >> = >> > new >> > > >>> JsonSerializer<>(); >> > > >>> JsonDeserializer<BqRtDetailLogLine_aggregate> >> > sumRecordsDeserializer = >> > > >>> new >> > > >>> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class); >> > > >>> Serde<BqRtDetailLogLine_aggregate> collectorSerde = >> > > >>> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer); >> > > >>> >> > > >>> StringSerializer stringSerializer = new StringSerializer(); >> > > >>> StringDeserializer stringDeserializer = new StringDeserializer(); >> > > >>> Serde<String> stringSerde = >> > > >>> Serdes.serdeFrom(stringSerializer,stringDeserializer); >> > > >>> >> > > >>> JsonDeserializer<RtDetailLogLine> prtRecordDeserializer = new >> > > >>> JsonDeserializer<>(RtDetailLogLine.class); >> > > >>> JsonSerializer<RtDetailLogLine> prtRecordJsonSerializer = new >> > > >>> JsonSerializer<>(); >> > > >>> Serde<RtDetailLogLine> prtRecordSerde = >> > > >>> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer); >> > > >>> >> > > >>> KStreamBuilder kStreamBuilder = new KStreamBuilder(); >> > > >>> >> > > >>> KStream<String, RtDetailLogLine> rtDetailLines = >> > > >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); >> > > >>> >> > > >>> // change the keying >> > > >>> KStream<AggKey, RtDetailLogLine> rtRekey = >> rtDetailLines.map((key, >> > > value) >> > > >>> -> new KeyValue<>(new AggKey(value), value)); >> > > >>> >> > > >>> KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail >> = >> > > >>> rtRekey.groupByKey().aggregate( >> > > >>> BqRtDetailLogLine_aggregate::new, >> > > >>> new PRTAggregate(), >> > > >>> TimeWindows.of(60 * 60 * 1000L), >> > > >>> collectorSerde, "prt_minute_agg_stream"); >> > > >>> >> > > >>> ktRtDetail.toStream().print(); >> > > >>> >> > > >>> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, >> config); >> > > >>> >> > > >>> kafkaStreams.start(); >> > > >>> >> > > >>> >> > > >>> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy <damian....@gmail.com> >> >> > > wrote: >> > > >>> >> > > >>> > Hi Jon, >> > > >>> > >> > > >>> > A couple of things: Which version are you using? >> > > >>> > Can you share the code you are using to the build the topology? >> > > >>> > >> > > >>> > Thanks, >> > > >>> > Damian >> > > >>> > >> > > >>> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers < >> jon.yearg...@cedexis.com >> > > >> > > >>> wrote: >> > > >>> > >> > > >>> > > Im using .map to convert my (k/v) string/Object to >> Object/Object >> > > but >> > > >>> > when I >> > > >>> > > chain this to an aggregation step Im getting this exception: >> > > >>> > > >> > > >>> > > Exception in thread "StreamThread-1" >> > java.lang.ClassCastException: >> > > >>> > > com.company.prtminuteagg.types.RtDetailLogLine cannot be >> cast to >> > > >>> > > java.lang.String >> > > >>> > > at >> > > >>> > > >> > > >>> > > org.apache.kafka.common.serialization. >> > StringSerializer.serialize( >> > > >>> > StringSerializer.java:24) >> > > >>> > > at >> > > >>> > > >> > > >>> > > org.apache.kafka.streams.processor.internals. >> > RecordCollector.send( >> > > >>> > RecordCollector.java:73) >> > > >>> > > at >> > > >>> > > >> > > >>> > > org.apache.kafka.streams.processor.internals.SinkNode. >> > > >>> > process(SinkNode.java:72) >> > > >>> > > at >> > > >>> > > >> > > >>> > > org.apache.kafka.streams.processor.internals. >> > > >>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > > >>> > > at >> > > >>> > > >> > > >>> > > org.apache.kafka.streams.kstream.internals.KStreamFilter$ >> > > >>> > KStreamFilterProcessor.process(KStreamFilter.java:44) >> > > >>> > > at >> > > >>> > > >> > > >>> > > org.apache.kafka.streams.processor.internals. >> > ProcessorNode.process( >> > > >>> > ProcessorNode.java:82) >> > > >>> > > at >> > > >>> > > >> > > >>> > > org.apache.kafka.streams.processor.internals. >> > > >>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > > >>> > > at >> > > >>> > > >> > > >>> > > org.apache.kafka.streams.kstream.internals.KStreamMap$ >> > > >>> > KStreamMapProcessor.process(KStreamMap.java:43) >> > > >>> > > at >> > > >>> > > >> > > >>> > > org.apache.kafka.streams.processor.internals. >> > ProcessorNode.process( >> > > >>> > ProcessorNode.java:82) >> > > >>> > > at >> > > >>> > > >> > > >>> > > org.apache.kafka.streams.processor.internals. >> > > >>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > > >>> > > at >> > > >>> > > >> > > >>> > > org.apache.kafka.streams.processor.internals. >> > > >>> > SourceNode.process(SourceNode.java:66) >> > > >>> > > at >> > > >>> > > >> > > >>> > > org.apache.kafka.streams.processor.internals. >> > > >>> > StreamTask.process(StreamTask.java:181) >> > > >>> > > at >> > > >>> > > >> > > >>> > > org.apache.kafka.streams.processor.internals. >> > StreamThread.runLoop( >> > > >>> > StreamThread.java:436) >> > > >>> > > at >> > > >>> > > >> > > >>> > > org.apache.kafka.streams.processor.internals. >> > > >>> > StreamThread.run(StreamThread.java:242) >> > > >>> > > >> > > >>> > > My key object implements Serde and returns a JsonSerializer >> for >> > the >> > > >>> > > 'Serializer()' override. >> > > >>> > > >> > > >>> > > In the config for the topology Im >> > > >>> > > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >> > > >>> > > AggKey.class.getName()); >> > > >>> > > >> > > >>> > > Where else do I need to specify the (de)serializer for my key >> > > class? >> > > >>> > > >> > > >>> > >> > > >>> >> > > >>> >> > > >> >> > > > >> > > >> > >> >> >