When I wrote: "If you haven't changed to default key and value serdes, then `to()` will fail because [...]"
it should have read: "If you haven't changed the default key and value serdes, then `to()` will fail because [...]" On Tue, Oct 11, 2016 at 11:12 AM, Michael Noll <mich...@confluent.io> wrote: > Ratha, > > if you based your problematic code on the PipeDemo example, then you > should have these two lines in your code (which most probably you haven't > changed): > > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > > This configures your application to interpret (= encode/decode), by > default, the keys and values of any messages it reads from Kafka as > strings. This works for the PipeDemo example because the keys and values > are actually strings. > > In your application, however, you do: > > KStream<String, KafkaPayload> kafkaPayloadStream = > builder.stream(sourceTopics); > > This won't work, because `builder.stream()`, when calling it without > explicit serdes, will use the default serdes configured for your > application. So `builder.stream(sourceTopics)` will give you > `KStream<String, String>`, not `KStream<String, KafkaPayload>`. Also, you > can't just cast a String to KafkaPayload to "fix" the problem; if you > attempt to do so you run into the ClassCastException that you reported > below. > > What you need to do fix your problem is: > > 1. Provide a proper serde for `KafkaPayload`. See > http://docs.confluent.io/current/streams/developer- > guide.html#implementing-custom-serializers-deserializers-serdes. There > are also example implementations of such custom serdes at [1] and [2]. > > Once you have that, you can e.g. write: > > final Serde<String> stringSerde = Serdes.String(); // provided by Kafka > final Serde<KafkaPayload> kafkaPayloadSerde = ...; // must be provided > by you! > > 2. Call `builder.stream()` with explicit serdes to overrides the default > serdes. stringSerde is for the keys, kafkaPayloadSerde is for the values. > > KStream<String, KafkaPayload> kafkaPayloadStream = > builder.stream(stringSerde, kafkaPayloadSerde, sourceTopics); > > That should do it. > > Lastly, you must think about serialization also when calling `to()` or > `through()`: > > kafkaPayloadStream.to(targetTopic); > > If you haven't changed to default key and value serdes, then `to()` will > fail because it will by default (in your app configuration) interpret > message values still as strings rather than KafkaPayload. To fix this you > should call: > > kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde, targetTopic); > > You need to override the default serdes whenever the data must be written > with, well, non-default serdes. > > I'd recommend reading http://docs.confluent.io/current/streams/developer- > guide.html#data-types-and-serialization to better understand how this > works. > > > Hope this helps, > Michael > > > > [1] http://docs.confluent.io/current/streams/developer- > guide.html#available-serializers-deserializers-serdes > [2] https://github.com/confluentinc/examples/tree/ > kafka-0.10.0.1-cp-3.0.1/kafka-streams/src/main/java/io/ > confluent/examples/streams/utils > > > > > On Tue, Oct 11, 2016 at 7:38 AM, Ratha v <vijayara...@gmail.com> wrote: > >> I checked my target topic and I see few messages than the source topic. >> (If >> source topic have 5 messages, I see 2 messages in my target topic) What >> settings I need to do ? >> >> And, when I try to consume message from the target topic, I get ClassCast >> Exception. >> >> java.lang.ClassCastException: java.lang.String cannot be cast to >> xx.yy.core.kafkamodels.KafkaPayload; >> >> * receivedPayload = (KafkaPayload) consumerRecord.value();* >> >> >> I Merge two topics like; >> >> * KStreamBuilder builder = new KStreamBuilder();* >> >> * KStream<String, KafkaPayload> kafkaPayloadStream = >> builder.stream(sourceTopics);* >> >> * kafkaPayloadStream.to(targetTopic);* >> >> * streams = new KafkaStreams(builder, properties);* >> >> * streams.start();* >> >> >> Why do I see classcast exception when consuming the message? >> >> >> On 11 October 2016 at 15:19, Ratha v <vijayara...@gmail.com> wrote: >> >> > Hi all; >> > I have custom datatype defined (a pojo class). >> > I copy messages from one topic to another topic. >> > I do not see any messages in my target topic. >> > This works fro string messages, but not for my custom message. >> > Waht might be the cause? >> > I followed this sample [1] >> > [1] >> > https://github.com/apache/kafka/blob/trunk/streams/ >> > examples/src/main/java/org/apache/kafka/streams/examples/ >> > pipe/PipeDemo.java >> > >> > >> > -- >> > -Ratha >> > http://vvratha.blogspot.com/ >> > >> >> >> >> -- >> -Ratha >> http://vvratha.blogspot.com/ >> > > >