After a lot of struggle with the pure Jackson library which doesn't have a strict mode within it due to which I wasn't able to validate the JSON schema. I finally found one way of doing it but now I am not able to map the correct *Success* and *Failure* messages in order to call the Process Function.
Below is my code: case class Premium(id: String, premium: Long, eventTime: String) class Splitter extends ProcessFunction[String,Premium] { val outputTag = new OutputTag[String]("failed") def fromJson[T](json: String)(implicit m: Manifest[T]): Either[String, T] = { Try { val schema = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString) // You can read a JSON object from String, a file, URL, etc. val parsedJson = new ObjectMapper().readTree(sampleJsonString) val validationMessages = schema.validate(parsedJson).asScala validationMessages.foreach(msg => println(msg.getMessage)) } match { case Success(x) => { println(" Good: " + x) Right(x) } case Failure(err) => { println("Bad: " + json) Left(json) } } } override def processElement(i: String, context: ProcessFunction[String, Premium]#Context, collector: Collector[Premium]): Unit = { fromJson(i) match { case Right(data) => { collector.collect(data) println("Good Records: " + data) } case Left(json) => { context.output(outputTag, json) println("Bad Records: " + json) } } } } Error: type mismatch; found : x.type (with underlying type Unit) required: T Right(x)