Hi all,
I have a Spark streaming job which constantly receives messages from Kafka.
I was using Spark 1.0.2 and the job has been running for a month. However,
when I am currently using Spark 1.1.0. the Spark streaming job cannot
receive any messages from Kafka. I have not made any change to the code.
Below please find the code snippet for the Kafka consumer:
var Array(zkQuorum, topics, mysqlTable) = args
val currentTime: Long = System.currentTimeMillis
val group = "my-group-" + currentTime.toString
println(topics)
println(zkQuorum)
val numThreads = 1
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
ssc = new StreamingContext(conf, Seconds(batch))
ssc.checkpoint(hadoopOutput + "checkpoint")
val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicMap).map(_._2)
val lineCounts =
lines.count.saveAsTextFiles(hadoopOutput+"counts/result")
I checked the values in topics and zkQuorum and they are correct. I use the
same information with kafka-console-consumer and it works correctly.
Does anyone know the reason? Thanks!
Bill