Happy to hear it works now for you, Ratha. -Michael
On Wed, Oct 12, 2016 at 6:06 AM, Ratha v <vijayara...@gmail.com> wrote: > Sorry my fault, In the kafkaConsumer I messed with 'value.deserializer' > property.. > Now things are working fine.. > Thanks a lot. > > On 12 October 2016 at 14:10, Ratha v <vijayara...@gmail.com> wrote: > > > HI Michael; > > Sorry , after setting "auto.offset.reset" to 'earliest' , I see messages > > in my 'targetTopic'. > > But still I get my class cast exception issue, when I consume message > from > > the 'targetTopic'. (To consume message I use KafkaConsumer highlevel API) > > > > *ConsumerRecords<?, ?> records = consumer.poll(Long.MAX_VALUE);* > > > > > > > > *Exception* > > > > *java.lang.ClassCastException: java.lang.String cannot be cast to > > xxx.core.kafkamodels.KafkaPayload at > > xx.core.listener.KafkaMessageListener.receiveData( > KafkaMessageListener.java:108) > > ~[classes/:?]* > > > > at xx.core.listener.KafkaMessageListenerThread.process( > > KafkaMessageListenerThread.java:68) ~[classes/:?] > > > > at xx.core.listener.KafkaMessageListenerThread.lambda$run$1( > > KafkaMessageListenerThread.java:50) ~[classes/:?] > > > > at java.lang.Iterable.forEach(Iterable.java:75) ~[?:1.8.0_66] > > > > at com.leightonobrien.core.listener.KafkaMessageListenerThread.run( > > KafkaMessageListenerThread.java:50) [classes/:?] > > > > at java.util.concurrent.Executors$RunnableAdapter. > call(Executors.java:511) > > [?:1.8.0_66] > > > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_66] > > > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > > [?:1.8.0_66] > > > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > > [?:1.8.0_66] > > > > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_66] > > > > > > > > On 12 October 2016 at 13:19, Ratha v <vijayara...@gmail.com> wrote: > > > >> HI Michael; > >> > >> Really appreciate for the clear explanation.. > >> I modified my code as you mentioned. I have written custom, Serde, > >> serializer,deserializer. > >> But now the problem i see is, both topics are not merged. Means, > Messages > >> in the 'sourcetopic' not to passed to 'targetTopic' . ('targetTopic has > '0' > >> messages) > >> I do not see any exceptions. > >> > >> Here is my custom serde, serializer/deserializer and the logic; Also I > >> have properties file where i defined following parameters; > >> > >> *bootstrap.servers=xx.com <http://xx.com>\:9092,xx.com > >> <http://xx.com>\:9092,xx.com <http://xx.com>\:9092* > >> > >> *key.serde=org.apache.kafka.com > >> <http://org.apache.kafka.com>mon.serialization.Serdes$StringSerde* > >> > >> *value.serde=xx.kafkamodels.KafkaPayloadSerdes$KafkaPayloadSerde* > >> > >> *application.id <http://application.id>=stream-pipe* > >> > >> > >> Do you see any issue here? Why messages are not written to ' > targetTopic'? > >> > >> > >> > >> *LOGIC* > >> > >> /** > >> > >> * create stream from source topics and write it to the target topic > >> > >> * @param sourceTopics > >> > >> * @param targetTopic > >> > >> */ > >> > >> public void write(String[] sourceTopics, String targetTopic) { > >> > >> KafkaStreams streams = null; > >> > >> KStreamBuilder builder = new KStreamBuilder(); > >> > >> try { > >> > >> KStream<String, KafkaPayload> kafkaPayloadStream = builder > >> .stream(stringSerde, kafkaPayloadSerde, sourceTopics); > >> > >> kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde, > >> targetTopic); > >> > >> streams = new KafkaStreams(builder, properties); > >> > >> streams.start(); > >> > >> Thread.sleep(5000L); > >> > >> } catch (InterruptedException e) { > >> > >> log.warn(e); > >> > >> } catch (Exception e) { > >> > >> log.error("Topic merge failed. ",e); > >> > >> } finally { > >> > >> if (streams != null) { > >> > >> streams.close(); > >> > >> } > >> > >> } > >> > >> } > >> > >> > >> > >> > >> *SERDE* > >> > >> > >> public class KafkaPayloadSerdes { > >> > >> static private class WrapperSerde<KafkaPayload> implements > >> Serde<KafkaPayload> { > >> final private Serializer<KafkaPayload> serializer; > >> final private Deserializer<KafkaPayload> deserializer; > >> > >> public WrapperSerde(Serializer<KafkaPayload> serializer, > >> Deserializer<KafkaPayload> deserializer) { > >> this.serializer = serializer; > >> this.deserializer = deserializer; > >> } > >> > >> @Override > >> public void configure(Map<String, ?> configs, boolean isKey) { > >> serializer.configure(configs, isKey); > >> deserializer.configure(configs, isKey); > >> } > >> > >> @Override > >> public void close() { > >> serializer.close(); > >> deserializer.close(); > >> } > >> > >> @Override > >> public Serializer<KafkaPayload> serializer() { > >> return serializer; > >> } > >> > >> @Override > >> public Deserializer<KafkaPayload> deserializer() { > >> return deserializer; > >> } > >> } > >> > >> static public final class KafkaPayloadSerde extends > >> WrapperSerde<KafkaPayload> { > >> public KafkaPayloadSerde() { > >> super(new KafkaPayloadSerializer(), new KafkaPayloadSerializer()); > >> } > >> } > >> > >> /** > >> * A serde for nullable < KafkaPayload> type. > >> */ > >> static public Serde<KafkaPayload> KafkaPayload() { > >> return new KafkaPayloadSerde(); > >> } > >> > >> } > >> > >> > >> *Serilizer/Deserializer* > >> > >> > >> > >> public class KafkaPayloadSerializer implements Serializer<KafkaPayload>, > >> Deserializer<KafkaPayload> { > >> > >> private static final Logger log = org.apache.logging.log4j.LogManager > >> .getLogger(MethodHandles.lookup().lookupClass().getCanonicalName()); > >> > >> @Override > >> public KafkaPayload deserialize(String topic, byte[] arg1) { > >> ByteArrayInputStream bis = new ByteArrayInputStream(arg1); > >> ObjectInput in = null; > >> Object obj = null; > >> try { > >> in = new ObjectInputStream(bis); > >> obj = in.readObject(); > >> } catch (IOException e) { > >> log.error(e); > >> } catch (ClassNotFoundException e) { > >> log.error(e); > >> } finally { > >> try { > >> bis.close(); > >> if (in != null) { > >> in.close(); > >> } > >> } catch (IOException ex) { > >> log.error(ex); > >> } > >> } > >> return (KafkaPayload) obj; > >> } > >> > >> @Override > >> public void close() { > >> // TODO Auto-generated method stub > >> > >> } > >> > >> @Override > >> public byte[] serialize(String topic, KafkaPayload kpayload) { > >> ByteArrayOutputStream bos = new ByteArrayOutputStream(); > >> ObjectOutput out = null; > >> byte[] payload = null; > >> try { > >> out = new ObjectOutputStream(bos); > >> out.writeObject(kpayload); > >> payload = bos.toByteArray(); > >> > >> } catch (IOException e) { > >> e.printStackTrace(); > >> } finally { > >> try { > >> if (out != null) { > >> out.close(); > >> bos.close(); > >> } > >> } catch (Exception ex) { > >> log.error(ex); > >> } > >> } > >> return payload; > >> } > >> > >> @Override > >> public void configure(Map configs, boolean isKey) { > >> // TODO Auto-generated method stub > >> > >> } > >> > >> } > >> > >> > >> > >> On 11 October 2016 at 20:13, Michael Noll <mich...@confluent.io> wrote: > >> > >>> When I wrote: > >>> > >>> "If you haven't changed to default key and value serdes, then > `to()` > >>> will fail because [...]" > >>> > >>> it should have read: > >>> > >>> "If you haven't changed the default key and value serdes, then > `to()` > >>> will fail because [...]" > >>> > >>> > >>> > >>> On Tue, Oct 11, 2016 at 11:12 AM, Michael Noll <mich...@confluent.io> > >>> wrote: > >>> > >>> > Ratha, > >>> > > >>> > if you based your problematic code on the PipeDemo example, then you > >>> > should have these two lines in your code (which most probably you > >>> haven't > >>> > changed): > >>> > > >>> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > >>> > Serdes.String().getClass()); > >>> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > >>> > Serdes.String().getClass()); > >>> > > >>> > This configures your application to interpret (= encode/decode), by > >>> > default, the keys and values of any messages it reads from Kafka as > >>> > strings. This works for the PipeDemo example because the keys and > >>> values > >>> > are actually strings. > >>> > > >>> > In your application, however, you do: > >>> > > >>> > KStream<String, KafkaPayload> kafkaPayloadStream = > >>> > builder.stream(sourceTopics); > >>> > > >>> > This won't work, because `builder.stream()`, when calling it without > >>> > explicit serdes, will use the default serdes configured for your > >>> > application. So `builder.stream(sourceTopics)` will give you > >>> > `KStream<String, String>`, not `KStream<String, KafkaPayload>`. > Also, > >>> you > >>> > can't just cast a String to KafkaPayload to "fix" the problem; if > you > >>> > attempt to do so you run into the ClassCastException that you > reported > >>> > below. > >>> > > >>> > What you need to do fix your problem is: > >>> > > >>> > 1. Provide a proper serde for `KafkaPayload`. See > >>> > http://docs.confluent.io/current/streams/developer- > >>> > guide.html#implementing-custom-serializers-deserializers-serdes. > >>> There > >>> > are also example implementations of such custom serdes at [1] and > [2]. > >>> > > >>> > Once you have that, you can e.g. write: > >>> > > >>> > final Serde<String> stringSerde = Serdes.String(); // provided by > >>> Kafka > >>> > final Serde<KafkaPayload> kafkaPayloadSerde = ...; // must be > >>> provided > >>> > by you! > >>> > > >>> > 2. Call `builder.stream()` with explicit serdes to overrides the > >>> default > >>> > serdes. stringSerde is for the keys, kafkaPayloadSerde is for the > >>> values. > >>> > > >>> > KStream<String, KafkaPayload> kafkaPayloadStream = > >>> > builder.stream(stringSerde, kafkaPayloadSerde, sourceTopics); > >>> > > >>> > That should do it. > >>> > > >>> > Lastly, you must think about serialization also when calling `to()` > or > >>> > `through()`: > >>> > > >>> > kafkaPayloadStream.to(targetTopic); > >>> > > >>> > If you haven't changed to default key and value serdes, then `to()` > >>> will > >>> > fail because it will by default (in your app configuration) interpret > >>> > message values still as strings rather than KafkaPayload. To fix > this > >>> you > >>> > should call: > >>> > > >>> > kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde, > >>> targetTopic); > >>> > > >>> > You need to override the default serdes whenever the data must be > >>> written > >>> > with, well, non-default serdes. > >>> > > >>> > I'd recommend reading http://docs.confluent.io/curre > >>> nt/streams/developer- > >>> > guide.html#data-types-and-serialization to better understand how > this > >>> > works. > >>> > > >>> > > >>> > Hope this helps, > >>> > Michael > >>> > > >>> > > >>> > > >>> > [1] http://docs.confluent.io/current/streams/developer- > >>> > guide.html#available-serializers-deserializers-serdes > >>> > [2] https://github.com/confluentinc/examples/tree/ > >>> > kafka-0.10.0.1-cp-3.0.1/kafka-streams/src/main/java/io/ > >>> > confluent/examples/streams/utils > >>> > > >>> > > >>> > > >>> > > >>> > On Tue, Oct 11, 2016 at 7:38 AM, Ratha v <vijayara...@gmail.com> > >>> wrote: > >>> > > >>> >> I checked my target topic and I see few messages than the source > >>> topic. > >>> >> (If > >>> >> source topic have 5 messages, I see 2 messages in my target topic) > >>> What > >>> >> settings I need to do ? > >>> >> > >>> >> And, when I try to consume message from the target topic, I get > >>> ClassCast > >>> >> Exception. > >>> >> > >>> >> java.lang.ClassCastException: java.lang.String cannot be cast to > >>> >> xx.yy.core.kafkamodels.KafkaPayload; > >>> >> > >>> >> * receivedPayload = (KafkaPayload) consumerRecord.value();* > >>> >> > >>> >> > >>> >> I Merge two topics like; > >>> >> > >>> >> * KStreamBuilder builder = new KStreamBuilder();* > >>> >> > >>> >> * KStream<String, KafkaPayload> kafkaPayloadStream = > >>> >> builder.stream(sourceTopics);* > >>> >> > >>> >> * kafkaPayloadStream.to(targetTopic);* > >>> >> > >>> >> * streams = new KafkaStreams(builder, properties);* > >>> >> > >>> >> * streams.start();* > >>> >> > >>> >> > >>> >> Why do I see classcast exception when consuming the message? > >>> >> > >>> >> > >>> >> On 11 October 2016 at 15:19, Ratha v <vijayara...@gmail.com> wrote: > >>> >> > >>> >> > Hi all; > >>> >> > I have custom datatype defined (a pojo class). > >>> >> > I copy messages from one topic to another topic. > >>> >> > I do not see any messages in my target topic. > >>> >> > This works fro string messages, but not for my custom message. > >>> >> > Waht might be the cause? > >>> >> > I followed this sample [1] > >>> >> > [1] > >>> >> > https://github.com/apache/kafka/blob/trunk/streams/ > >>> >> > examples/src/main/java/org/apache/kafka/streams/examples/ > >>> >> > pipe/PipeDemo.java > >>> >> > > >>> >> > > >>> >> > -- > >>> >> > -Ratha > >>> >> > http://vvratha.blogspot.com/ > >>> >> > > >>> >> > >>> >> > >>> >> > >>> >> -- > >>> >> -Ratha > >>> >> http://vvratha.blogspot.com/ > >>> >> > >>> > > >>> > > >>> > > >>> > >> > >> > >> > >> -- > >> -Ratha > >> http://vvratha.blogspot.com/ > >> > > > > > > > > -- > > -Ratha > > http://vvratha.blogspot.com/ > > > > > > -- > -Ratha > http://vvratha.blogspot.com/ >