I have written a process function where I am parsing the JSON and if it is
not according to the expected format it passes as Failure to the process
function and I print the records which are working fine. Now, I was trying
to print the message and the record in case of Success and Failure. I
implemented the below code and it gave me the error. What exactly I am
missing?

package KafkaAsSource

import com.fasterxml.jackson.databind.ObjectMapper
import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.OutputTag
import org.apache.flink.util.Collector
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

class JSONParsingProcessFunction extends ProcessFunction[String,String] {
  val outputTag = new OutputTag[String]("failed")

  def parseJson(json: String): Either[String, String] = {
    val schemaJsonString =
      """
{
    "$schema": "http://json-schema.org/draft-04/schema#";,
    "title": "Product",
    "description": "A product from the catalog",
    "type": "object",
    "properties": {
        "id": {
            "description": "The unique identifier for a product",
            "type": "integer"
        },
        "premium": {
            "description": "Annual Premium",
            "type": "integer"
        },
        "eventTime": {
            "description": "Timestamp at which record has arrived at
source / generated",
            "type": "string"
        }
    },
    "required": ["id", "premium","eventTime"]
}
"""
    Try {
      val schema =
JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
      // You can read a JSON object from String, a file, URL, etc.
      val parsedJson = new ObjectMapper().readTree(json)
      val validationMessages = schema.validate(parsedJson).asScala
      //validationMessages.foreach(msg => println(msg.getMessage))
      require(validationMessages.isEmpty)
      //parsedJson.toString()
      if(validationMessages.isEmpty)
        {
          
(parsedJson.toString(),validationMessages.foreach((msg=>msg.getMessage.toString)))
        }
      else
        {
          (parsedJson.toString(),"Format is correct...")
        }

    }
    match {
      case Success(x) => {
        println("Good: " + x)
        Right(x)
      }
      case Failure(err) => {
        println("Bad:  " + json)
        Left(json)
      }
    }
  }
  override def processElement(i: String, context:
ProcessFunction[String, String]#Context, collector:
Collector[String]): Unit = {
    parseJson(i) match {
      case Right(data) => {
        collector.collect(data)
        println("Good Records: " + data)
      }
      case Left(json) => {
        context.output(outputTag, json)
        println("Bad Records: " + json)
      }
    }
  }
}


Error:

type mismatch;
 found   : (String, Any)
 required: String
        Right(x)

Reply via email to