Hello, I'm trying to run the following code, var newContextCreated = false // Flag to detect whether new context was created or not val kafkaBrokers = "localhost:9092" // comma separated list of broker:host
private val batchDuration: Duration = Seconds(3) private val master: String = "local[2]" private val appName: String = this.getClass().getSimpleName() private val checkpointDir: String = "/tmp/spark-streaming-amqp-tests" // Create a Spark configuration val sparkConf = new SparkConf().setMaster(master).setAppName(appName) sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true") val ssc = new StreamingContext(sparkConf, batchDuration) ssc.checkpoint(checkpointDir) ssc.remember(Minutes(1)) // To make sure data is not deleted by the time we query it interactively val spark = SparkSession .builder .config(sparkConf) .getOrCreate() val lines = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "evil_queue") .load() lines.printSchema() import spark.implicits._ val noAggDF = lines.select("key") noAggDF .writeStream .format("console") .start() But I'm having the error: http://paste.scsys.co.uk/565658 How do I get my messages using kafka as format from Structured Streaming ? Thank you -- -- Daniel de Oliveira Mantovani Perl Evangelist/Data Hacker +1 786 459 1341