Hi Eno,

Sorry for late reply, it's not a deserialization exception, it's a pattern
matching exception in my logic.

val jvnStream: KStream[String, JVNModel] = sourceStream.leftJoin(userTable,
      (eventId: String, datatup: (DataLog, Option[CrawlData])) => {
        datatup._1.rawData.userId
      },
      (tuple, fbData: FacebookData) => {
        val (dmpData, Some(crawData)) = tuple // exception here

        // something here

      })

Thanks


On Thu, Aug 17, 2017 at 11:11 PM, Duy Truong <ducduytruong2...@gmail.com>
wrote:

> Hi everyone,
>
> My kafka stream app has an exception (my business exception), and then it
> doesn't consume messages anymore. Is there any way to make my app continues
> consume messages when the exception occurs?
>
> Thanks
>
> --
> *Duy Truong*
>



-- 
*Duy Truong*

Reply via email to