Hi Eno, Sorry for late reply, it's not a deserialization exception, it's a pattern matching exception in my logic.
val jvnStream: KStream[String, JVNModel] = sourceStream.leftJoin(userTable, (eventId: String, datatup: (DataLog, Option[CrawlData])) => { datatup._1.rawData.userId }, (tuple, fbData: FacebookData) => { val (dmpData, Some(crawData)) = tuple // exception here // something here }) Thanks On Thu, Aug 17, 2017 at 11:11 PM, Duy Truong <ducduytruong2...@gmail.com> wrote: > Hi everyone, > > My kafka stream app has an exception (my business exception), and then it > doesn't consume messages anymore. Is there any way to make my app continues > consume messages when the exception occurs? > > Thanks > > -- > *Duy Truong* > -- *Duy Truong*