Hello, I'm trying to run the following code,

var newContextCreated = false // Flag to detect whether new context
was created or not
val kafkaBrokers = "localhost:9092" // comma separated list of broker:host

private val batchDuration: Duration = Seconds(3)
private val master: String = "local[2]"
private val appName: String = this.getClass().getSimpleName()
private val checkpointDir: String = "/tmp/spark-streaming-amqp-tests"

// Create a Spark configuration

val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")

val ssc = new StreamingContext(sparkConf, batchDuration)
ssc.checkpoint(checkpointDir)
ssc.remember(Minutes(1)) // To make sure data is not deleted by the
time we query it interactively

val spark = SparkSession
  .builder
  .config(sparkConf)
  .getOrCreate()

val lines = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "evil_queue")
  .load()

lines.printSchema()

import spark.implicits._
val noAggDF = lines.select("key")

noAggDF
  .writeStream
  .format("console")
  .start()


But I'm having the error:

http://paste.scsys.co.uk/565658


How do I get my messages using kafka as format from Structured Streaming ?


Thank you


-- 

--
Daniel de Oliveira Mantovani
Perl Evangelist/Data Hacker
+1 786 459 1341

Reply via email to