Folks is there any updated on https://issues.apache.org/jira/browse/KAFKA-4740. This is now so far only pressing issue for our streams application.
It happens from time to time and so far our only solution is to increase the offset of the group for that partition beyond the offset that caused this issue. Other thing I have noticed is that such (poison pill) messages then to bunch together. So usually we need to increase the offset by say 10000 to get past the issue. So basically we loose processing of these many messages. If we increase the offset by just one then we again see another such offset few offsets higher and so on. So in order to prevent multiple manual restart of the streams application we simply increase the offset by 10000. Then streams application works uninterrupted for few months and one fine day we again see such messages. As the bug is critical shouldn't we get some fix or workaround in next release. Thanks Sachin On Thu, Mar 30, 2017 at 6:59 PM, Michael Noll <mich...@confluent.io> wrote: > Sachin, > > there's a JIRA that seems related to what you're seeing: > https://issues.apache.org/jira/browse/KAFKA-4740 > > Perhaps you could check the above and report back? > > -Michael > > > > > On Thu, Mar 30, 2017 at 3:23 PM, Michael Noll <mich...@confluent.io> > wrote: > > > Hmm, I re-read the stacktrace again. It does look like the value-side > > being the culprit (as Sachin suggested earlier). > > > > -Michael > > > > > > On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll <mich...@confluent.io> > > wrote: > > > >> Sachin, > >> > >> you have this line: > >> > >> > builder.stream(Serdes.String(), serde, "advice-stream") > >> > >> Could the problem be that not the record values are causing the problem > >> -- because your value deserializer does try-catch any such errors -- but > >> that the record *keys* are malformed? The built-in `Serdes.String()` > does > >> not try-catch deserialization errors, and from a quick look at the > source > >> it seems that the `Fetcher` class (clients/src/main/java/org/apa > >> che/kafka/clients/consumer/internals/Fetcher.java) is throwing your > >> error above ("Error deserializing key/value for partition..."), and the > >> Fetcher is swallowing the more specific SerializationException of > >> `String.Serdes()` (but it will include the original exception/Throwable > in > >> its own SerializationException). > >> > >> -Michael > >> > >> > >> > >> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal <sjmit...@gmail.com> > >> wrote: > >> > >>> My streams application does run in debug mode only. > >>> Also I have checked the code around these lines > >>> > >>> at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791) > >>> ~[kafka-clients-0.10.2.0.jar:na] > >>> at org.apache.kafka.common.record.Record.value(Record.java:268) > >>> ~[kafka-clients-0.10.2.0.jar:na] > >>> at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec > >>> ord(Fetcher.java:867) > >>> ~[kafka-clients-0.10.2.0.jar:na] > >>> > >>> I don't see any log statement which will give me more information. > >>> > >>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main > >>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867 > >>> > >>> The issue is happening at this line and perhaps handling the exception > >>> and > >>> setting the value to be null may be better options. > >>> Yes at client side nothing can be done because exception is happening > >>> before this.valueDeserializer.deserialize can be called. > >>> > >>> Thanks > >>> Sachin > >>> > >>> > >>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy <damian....@gmail.com> > >>> wrote: > >>> > >>> > The suggestions in that FAQ won't help as it is too late, i.e., the > >>> message > >>> > has already been received into Streams. > >>> > You could create a simple app that uses the Consumer, seeks to the > >>> offset, > >>> > and tries to read the message. If you did this in debug mode you > might > >>> find > >>> > out some more information. > >>> > > >>> > > >>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal <sjmit...@gmail.com> > wrote: > >>> > > >>> > > Well I try to read that offset via kafka-console-consumer.sh too > and > >>> it > >>> > > fails with same error. > >>> > > > >>> > > So was wondering if I can apply any of the suggestion as per > >>> > > > >>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling- > >>> > corrupted-records-and-deserialization-errors-poison-pill-messages > >>> > > > >>> > > If there is any other was just to get the contents of that message > it > >>> > would > >>> > > be helpful. > >>> > > > >>> > > Thanks > >>> > > Sachin > >>> > > > >>> > > > >>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <damian....@gmail.com> > >>> > wrote: > >>> > > > >>> > > > Hi Sachin, > >>> > > > > >>> > > > Have you tried firing up a consumer (non-streams), seeking to > that > >>> > offset > >>> > > > on the topic and seeing what the message is? Might be easier to > >>> debug? > >>> > > Like > >>> > > > you say, it is failing in the consumer. > >>> > > > Thanks, > >>> > > > Damian > >>> > > > > >>> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <sjmit...@gmail.com> > >>> wrote: > >>> > > > > >>> > > > > I think I am following the third option. > >>> > > > > > >>> > > > > My pipeline is: > >>> > > > > > >>> > > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer > ()); > >>> > > > > > >>> > > > > builder.stream(Serdes.String(), serde, "advice-stream") > >>> > > > > .filter(new Predicate<String, V>() { ...}) > >>> > > > > .groupByKey() > >>> > > > > .aggregate(new Initializer<V1>() {...}, new > Aggregator<String, > >>> V, > >>> > > V1>() > >>> > > > > {...}, windows, supplier) > >>> > > > > .mapValues(new ValueMapper<V1, V2>() { ... }) > >>> > > > > .foreach(new ForeachAction<Windowed<String>, V2>() {... }); > >>> > > > > > >>> > > > > > >>> > > > > and In VDeserializer (implements Deserializer<V>) I am doing > >>> > something > >>> > > > like > >>> > > > > this: > >>> > > > > > >>> > > > > public V deserialize(String paramString, byte[] > >>> > paramArrayOfByte) { > >>> > > > > if (paramArrayOfByte == null) { return null;} > >>> > > > > V data = null; > >>> > > > > try { > >>> > > > > data = objectMapper.readValue(paramArrayOfByte, > new > >>> > > > > TypeReference<V>() {}); > >>> > > > > } catch (Exception e) { > >>> > > > > e.printStackTrace(); > >>> > > > > } > >>> > > > > return data; > >>> > > > > } > >>> > > > > > >>> > > > > So I am catching any exception that may happen when > >>> deserializing the > >>> > > > data. > >>> > > > > > >>> > > > > This is what third option suggest (if I am not mistaken). > >>> > > > > > >>> > > > > Please let me know given the pipeline we which option would be > >>> best > >>> > and > >>> > > > how > >>> > > > > can we incorporate that in our pipeline. > >>> > > > > > >>> > > > > Also not exception is happening when reading from source topic > >>> which > >>> > is > >>> > > > > "advice-stream", so looks like flow is not going to pipeline at > >>> all > >>> > for > >>> > > > us > >>> > > > > to handle. It is terminating right at consumer poll. > >>> > > > > > >>> > > > > Thanks > >>> > > > > Sachin > >>> > > > > > >>> > > > > > >>> > > > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll < > >>> mich...@confluent.io> > >>> > > > > wrote: > >>> > > > > > >>> > > > > > Could this be a corrupted message ("poison pill") in your > >>> topic? > >>> > > > > > > >>> > > > > > If so, take a look at > >>> > > > > > http://docs.confluent.io/current/streams/faq.html# > >>> > > > > > > >>> > > > > handling-corrupted-records-and-deserialization-errors- > >>> > > > poison-pill-messages > >>> > > > > > > >>> > > > > > FYI: We're currently investigating a more elegant way to > >>> address > >>> > such > >>> > > > > > poison pill problems. If you have feedback on that front, > feel > >>> > free > >>> > > to > >>> > > > > > share it with us. :-) > >>> > > > > > > >>> > > > > > -Michael > >>> > > > > > > >>> > > > > > > >>> > > > > > > >>> > > > > > > >>> > > > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal < > >>> > sjmit...@gmail.com> > >>> > > > > > wrote: > >>> > > > > > > >>> > > > > > > Hi, > >>> > > > > > > This is for first time we are getting a weird exception. > >>> > > > > > > After this the streams caches. > >>> > > > > > > > >>> > > > > > > Only work around is to manually seek and commit offset to a > >>> > greater > >>> > > > > > number > >>> > > > > > > and we are needing this manual intervention again and > again. > >>> > > > > > > > >>> > > > > > > Any idea what is causing it and how can we circumvent this. > >>> > > > > > > > >>> > > > > > > Note this error happens in both cases when 10.2 client or > >>> 10.1.1 > >>> > > > client > >>> > > > > > > connect to kafka server 10.1.1 > >>> > > > > > > > >>> > > > > > > So this does not looks like version issue. > >>> > > > > > > > >>> > > > > > > Also we have following setting > >>> > > > > > > message.max.bytes=5000013 > >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576" > >>> > > > > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576" > >>> > > > > > > > >>> > > > > > > Rest is all default and also increasing the value for > >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not > >>> help. > >>> > > > > > > > >>> > > > > > > Stack trace below. > >>> > > > > > > > >>> > > > > > > Thanks > >>> > > > > > > Sachin > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > org.apache.kafka.common.errors.SerializationException: > Error > >>> > > > > > deserializing > >>> > > > > > > key/value for partition advice-stream-6 at offset 45153795 > >>> > > > > > > java.lang.IllegalArgumentException: null > >>> > > > > > > at java.nio.Buffer.limit(Buffer.java:275) > >>> ~[na:1.8.0_122-ea] > >>> > > > > > > at org.apache.kafka.common.utils. > >>> Utils.sizeDelimited(Utils. > >>> > > > java:791) > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na] > >>> > > > > > > at org.apache.kafka.common.record.Record.value(Record. > >>> > java:268) > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na] > >>> > > > > > > at org.apache.kafka.clients.consumer.internals.Fetcher. > >>> > > > > > > parseRecord(Fetcher.java:867) > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na] > >>> > > > > > > at org.apache.kafka.clients.consumer.internals.Fetcher. > >>> > > > > > > parseCompletedFetch(Fetcher.java:775) > >>> > ~[kafka-clients-0.10.2.0.jar: > >>> > > > na] > >>> > > > > > > at org.apache.kafka.clients.consumer.internals.Fetcher. > >>> > > > > > > fetchedRecords(Fetcher.java:473) > >>> ~[kafka-clients-0.10.2.0.jar: > >>> > na] > >>> > > > > > > at org.apache.kafka.clients.consumer.KafkaConsumer. > >>> > > > > > > pollOnce(KafkaConsumer.java:1062) > >>> ~[kafka-clients-0.10.2.0.jar: > >>> > na] > >>> > > > > > > at org.apache.kafka.clients.consumer.KafkaConsumer.poll( > >>> > > > > > > KafkaConsumer.java:995) > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na] > >>> > > > > > > at org.apache.kafka.streams.processor.internals. > >>> > > > StreamThread.runLoop( > >>> > > > > > > StreamThread.java:592) > >>> > > > > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > >>> > > > > > > at org.apache.kafka.streams.processor.internals. > >>> > > > > > > StreamThread.run(StreamThread.java:378) > >>> > ~[kafka-streams-0.10.2.1- > >>> > > > > > > SNAPSHOT.jar:na] > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >> > >> > >> > > > > >