Well I try to read that offset via kafka-console-consumer.sh too and it fails with same error.
So was wondering if I can apply any of the suggestion as per http://docs.confluent.io/3.2.0/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-messages If there is any other was just to get the contents of that message it would be helpful. Thanks Sachin On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <damian....@gmail.com> wrote: > Hi Sachin, > > Have you tried firing up a consumer (non-streams), seeking to that offset > on the topic and seeing what the message is? Might be easier to debug? Like > you say, it is failing in the consumer. > Thanks, > Damian > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <sjmit...@gmail.com> wrote: > > > I think I am following the third option. > > > > My pipeline is: > > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ()); > > > > builder.stream(Serdes.String(), serde, "advice-stream") > > .filter(new Predicate<String, V>() { ...}) > > .groupByKey() > > .aggregate(new Initializer<V1>() {...}, new Aggregator<String, V, V1>() > > {...}, windows, supplier) > > .mapValues(new ValueMapper<V1, V2>() { ... }) > > .foreach(new ForeachAction<Windowed<String>, V2>() {... }); > > > > > > and In VDeserializer (implements Deserializer<V>) I am doing something > like > > this: > > > > public V deserialize(String paramString, byte[] paramArrayOfByte) { > > if (paramArrayOfByte == null) { return null;} > > V data = null; > > try { > > data = objectMapper.readValue(paramArrayOfByte, new > > TypeReference<V>() {}); > > } catch (Exception e) { > > e.printStackTrace(); > > } > > return data; > > } > > > > So I am catching any exception that may happen when deserializing the > data. > > > > This is what third option suggest (if I am not mistaken). > > > > Please let me know given the pipeline we which option would be best and > how > > can we incorporate that in our pipeline. > > > > Also not exception is happening when reading from source topic which is > > "advice-stream", so looks like flow is not going to pipeline at all for > us > > to handle. It is terminating right at consumer poll. > > > > Thanks > > Sachin > > > > > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll <mich...@confluent.io> > > wrote: > > > > > Could this be a corrupted message ("poison pill") in your topic? > > > > > > If so, take a look at > > > http://docs.confluent.io/current/streams/faq.html# > > > > > handling-corrupted-records-and-deserialization-errors- > poison-pill-messages > > > > > > FYI: We're currently investigating a more elegant way to address such > > > poison pill problems. If you have feedback on that front, feel free to > > > share it with us. :-) > > > > > > -Michael > > > > > > > > > > > > > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <sjmit...@gmail.com> > > > wrote: > > > > > > > Hi, > > > > This is for first time we are getting a weird exception. > > > > After this the streams caches. > > > > > > > > Only work around is to manually seek and commit offset to a greater > > > number > > > > and we are needing this manual intervention again and again. > > > > > > > > Any idea what is causing it and how can we circumvent this. > > > > > > > > Note this error happens in both cases when 10.2 client or 10.1.1 > client > > > > connect to kafka server 10.1.1 > > > > > > > > So this does not looks like version issue. > > > > > > > > Also we have following setting > > > > message.max.bytes=5000013 > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576" > > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576" > > > > > > > > Rest is all default and also increasing the value for > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not help. > > > > > > > > Stack trace below. > > > > > > > > Thanks > > > > Sachin > > > > > > > > > > > > org.apache.kafka.common.errors.SerializationException: Error > > > deserializing > > > > key/value for partition advice-stream-6 at offset 45153795 > > > > java.lang.IllegalArgumentException: null > > > > at java.nio.Buffer.limit(Buffer.java:275) ~[na:1.8.0_122-ea] > > > > at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils. > java:791) > > > > ~[kafka-clients-0.10.2.0.jar:na] > > > > at org.apache.kafka.common.record.Record.value(Record.java:268) > > > > ~[kafka-clients-0.10.2.0.jar:na] > > > > at org.apache.kafka.clients.consumer.internals.Fetcher. > > > > parseRecord(Fetcher.java:867) > > > > ~[kafka-clients-0.10.2.0.jar:na] > > > > at org.apache.kafka.clients.consumer.internals.Fetcher. > > > > parseCompletedFetch(Fetcher.java:775) ~[kafka-clients-0.10.2.0.jar: > na] > > > > at org.apache.kafka.clients.consumer.internals.Fetcher. > > > > fetchedRecords(Fetcher.java:473) ~[kafka-clients-0.10.2.0.jar:na] > > > > at org.apache.kafka.clients.consumer.KafkaConsumer. > > > > pollOnce(KafkaConsumer.java:1062) ~[kafka-clients-0.10.2.0.jar:na] > > > > at org.apache.kafka.clients.consumer.KafkaConsumer.poll( > > > > KafkaConsumer.java:995) > > > > ~[kafka-clients-0.10.2.0.jar:na] > > > > at org.apache.kafka.streams.processor.internals. > StreamThread.runLoop( > > > > StreamThread.java:592) > > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > > > at org.apache.kafka.streams.processor.internals. > > > > StreamThread.run(StreamThread.java:378) ~[kafka-streams-0.10.2.1- > > > > SNAPSHOT.jar:na] > > > > > > > > > >