I have written a process function where I am parsing the JSON and if it is not according to the expected format it passes as Failure to the process function and I print the records which are working fine. Now, I was trying to print the message and the record in case of Success and Failure. I implemented the below code and it gave me the error. What exactly I am missing?
package KafkaAsSource import com.fasterxml.jackson.databind.ObjectMapper import com.networknt.schema.{JsonSchemaFactory, SpecVersion} import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala.OutputTag import org.apache.flink.util.Collector import scala.jdk.CollectionConverters._ import scala.util.{Failure, Success, Try} class JSONParsingProcessFunction extends ProcessFunction[String,String] { val outputTag = new OutputTag[String]("failed") def parseJson(json: String): Either[String, String] = { val schemaJsonString = """ { "$schema": "http://json-schema.org/draft-04/schema#", "title": "Product", "description": "A product from the catalog", "type": "object", "properties": { "id": { "description": "The unique identifier for a product", "type": "integer" }, "premium": { "description": "Annual Premium", "type": "integer" }, "eventTime": { "description": "Timestamp at which record has arrived at source / generated", "type": "string" } }, "required": ["id", "premium","eventTime"] } """ Try { val schema = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString) // You can read a JSON object from String, a file, URL, etc. val parsedJson = new ObjectMapper().readTree(json) val validationMessages = schema.validate(parsedJson).asScala //validationMessages.foreach(msg => println(msg.getMessage)) require(validationMessages.isEmpty) //parsedJson.toString() if(validationMessages.isEmpty) { (parsedJson.toString(),validationMessages.foreach((msg=>msg.getMessage.toString))) } else { (parsedJson.toString(),"Format is correct...") } } match { case Success(x) => { println("Good: " + x) Right(x) } case Failure(err) => { println("Bad: " + json) Left(json) } } } override def processElement(i: String, context: ProcessFunction[String, String]#Context, collector: Collector[String]): Unit = { parseJson(i) match { case Right(data) => { collector.collect(data) println("Good Records: " + data) } case Left(json) => { context.output(outputTag, json) println("Bad Records: " + json) } } } } Error: type mismatch; found : (String, Any) required: String Right(x)