Below is code I have written. I am getting NotSerializableException. How can I handle this scenario?
kafkaStream.foreachRDD(rdd => { println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<") rdd.foreachPartition(partitionOfRecords => { partitionOfRecords.foreach( record => { //Write for CSV. if (true == true) { val structType = table.schema val csvFile = ssc.sparkContext.textFile(record.toString()) val rowRDD = csvFile.map(x => getMappedRowFromCsvRecord(structType, x)) } }) -- Regards, Nishant