I am trying to write an integration test using Embedded Kafka but I keep getting NullPointerException. My test case is very simple. It has following steps:
1. Read a JSON file & write messages to an inputTopic. 2. Perform a 'readStream' operation. 3. Do a 'select' on the Stream. This throws a NullPointerException. What am I doing wrong? Code is given below: "My Test which runs with Embedded Kafka" should "Generate correct Result" in { implicit val config: EmbeddedKafkaConfig = EmbeddedKafkaConfig( kafkaPort = 9066, zooKeeperPort = 2066, Map("log.dir" -> "./src/test/resources/") ) withRunningKafka { createCustomTopic(inputTopic) val source = Source.fromFile("src/test/resources/test1.json") source.getLines.toList.filterNot(_.isEmpty).foreach( line => publishStringMessageToKafka(inputTopic, line) ) source.close() implicit val deserializer: StringDeserializer = new StringDeserializer createCustomTopic(outputTopic) import spark2.implicits._ val schema = spark.read.json("my.json").schema val myStream = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9066") .option("subscribe", inputTopic) .load() // Schema looks good myStream.printSchema() // Following line throws NULLPointerException! Why? val df = myStream.select(from_json($"value".cast("string"), schema).alias("value")) // There's more code... but let's not worry about that for now. } }