After a lot of struggle with the pure Jackson library which doesn't have a
strict mode within it due to which I wasn't able to validate the JSON
schema. I finally found one way of doing it but now I am not able to map
the correct *Success* and *Failure* messages in order to call the Process
Function.

Below is my code:

case class Premium(id: String, premium: Long, eventTime: String)

class Splitter extends ProcessFunction[String,Premium] {
  val outputTag = new OutputTag[String]("failed")

  def fromJson[T](json: String)(implicit m: Manifest[T]): Either[String, T] = {
    Try {
      val schema =
JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
      // You can read a JSON object from String, a file, URL, etc.
      val parsedJson = new ObjectMapper().readTree(sampleJsonString)
      val validationMessages = schema.validate(parsedJson).asScala
      validationMessages.foreach(msg => println(msg.getMessage))
    } match {
      case Success(x) => {
        println(" Good: " + x)
        Right(x)
      }
      case Failure(err) => {
        println("Bad:  " + json)
        Left(json)
      }
    }
  }

  override def processElement(i: String, context:
ProcessFunction[String, Premium]#Context, collector:
Collector[Premium]): Unit = {
    fromJson(i) match {
      case Right(data) => {
        collector.collect(data)
        println("Good Records: " + data)
      }
      case Left(json) => {
        context.output(outputTag, json)
        println("Bad Records: " + json)
      }
    }
  }
}

Error:

type mismatch;
 found   : x.type (with underlying type Unit)
 required: T
        Right(x)

Reply via email to