Hello,
I'm finding problems to run a spark streaming job for more than a few hours
(3 or 4). It begins working OK, but it degrades until failure. Some of the
symptoms:
- Consumed memory and CPU keeps getting higher ang higher, and finally some
error is being thrown (java.lang.Exception: Could not compute split, block
input-0-1429168311800 not found) and data stops being calculated.
- The delay showed in web UI keeps also increasing.
- After some hours disk space is being consumed. There are a lot of
directories with name like "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c"
The job is basically reading information from kafka topic, and calculate
several topN tables for some key and value camps related with netflow data,
some of the parameters are this:
- batch interval: 10 seconds
- window calculation: 1 minute
- spark.cleaner.ttl: 5 minutes
The execution is standalone on one machine (16GB RAM , 12 cores), and the
options to run it is as follows:
/opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops"
--jars $JARS --class $APPCLASS --master local[2] $APPJAR
someone has some clues about the problem? I don't know if it is a
configuration problem or some error in the code that is causing memory
leaks..
Thank you in advance!
Miquel
PD: the code is basically this:--------------------------------------
object NetflowTopn {
var appPath = "."
var zkQuorum = ""
var group = ""
var topics = ""
var numThreads = 1
var batch_interval = 10
var n_window = 1
var n_slide = 1
var topnsize = 10
var hm = Map[String,Int]()
hm += ( "unix_secs" -> 0 )
hm += ( "unix_nsecs" -> 1 )
hm += ( "sysuptime" -> 2 )
hm += ( "exaddr" -> 3 )
hm += ( "dpkts" -> 4 )
hm += ( "doctets" -> 5 )
hm += ( "first" -> 6 )
hm += ( "last" -> 7 )
hm += ( "engine_type" -> 8 )
hm += ( "engine_id" -> 9 )
hm += ( "srcaddr" -> 10 )
hm += ( "dstaddr" -> 11 )
hm += ( "nexthop" -> 12 )
hm += ( "input" -> 13 )
hm += ( "output" -> 14 )
hm += ( "srcport" -> 15 )
hm += ( "dstport" -> 16 )
hm += ( "prot" -> 17 )
hm += ( "tos" -> 18 )
hm += ( "tcp_flags" -> 19 )
hm += ( "src_mask" -> 20 )
hm += ( "dst_mask" -> 21 )
hm += ( "src_as" -> 22 )
hm += ( "dst_as" -> 23 )
def getKey (lcamps: Array[String], camp: String): String = {
if (camp == "total") return "total"
else return lcamps(hm(camp))
}
def getVal (lcamps: Array[String], camp: String): Long = {
if (camp == "flows") return 1L
else return lcamps(hm(camp)).toLong
}
def getKeyVal (line: String, keycamps: List[String], valcamp: String ) = {
val arr = line.split(",")
(keycamps.map(getKey(arr, _)).mkString(",") , getVal(arr,valcamp) )
}
def writeOutput (data: Array[(Long, String)], keycamps_str: String,
csvheader: String, valcamp: String, prefix: String) = {
val ts = System.currentTimeMillis
val f1 = appPath + "/data/" + prefix + "_" + keycamps_str + "_" +
valcamp + ".csv"
val f1f = new File(f1);
val ftmpf = new File(f1 + ts);
val pw = new PrintWriter(ftmpf)
pw.println(csvheader)
data.foreach{
t => pw.println (t._2 + "," + t._1)
}
pw.close
ftmpf.renameTo(f1f);
}
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: NetflowTopn <apppath>")
System.exit(1)
}
appPath = args(0)
try {
val prop = new Properties()
prop.load(new FileInputStream(appPath + "/conf/app.properties"))
zkQuorum = prop.getProperty("KAFKA_HOST")
group = prop.getProperty("KAFKA_GROUP")
topics = prop.getProperty("KAFKA_TOPIC")
numThreads = prop.getProperty("THREADS").toInt
} catch { case e: Exception =>
e.printStackTrace()
sys.exit(1)
}
val sparkConf = new SparkConf().setAppName("netflow-topn")
.set("spark.default.parallelism", "2")
.set("spark.rdd.compress", "true")
.set("spark.streaming.unpersist", "true")
.set("spark.cleaner.ttl", "300")
val ssc = new StreamingContext(sparkConf, Seconds(batch_interval))
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val kafpar = Map[String, String](
"zookeeper.connect" -> zkQuorum,
"group.id" -> group,
"zookeeper.connection.timeout.ms" -> "5000",
"auto.commit.interval.ms" -> "60000",
"auto.offset.reset" -> "largest"
)
val lines = KafkaUtils.createStream[String, String, StringDecoder,
StringDecoder] (ssc, kafpar, topicMap,
StorageLevel.MEMORY_ONLY_SER).map(_._2).cache()
val ll_keycamps = List ( List("srcaddr", "dstaddr")
,List("dstaddr")
,List("srcaddr")
,List("srcport")
,List("dstport")
,List("total")
)
val l_valcamps = List ("doctets"
,"dpkts"
,"flows"
)
for (keycamps <- ll_keycamps) {
val keycamps_str = keycamps.mkString("-")
val csvheader = keycamps.mkString(",") + ",amount"
for (valcamp <- l_valcamps) {
val lines2 = lines.map( getKeyVal (_, keycamps, valcamp )
).cache()
lines2.reduceByKeyAndWindow((a:Long,b:Long)=>a+b, Seconds(60),
Seconds(10))
.map(_.swap)
.transform(_.sortByKey(false))
.foreachRDD(rdd => {
val data = rdd.take(20)
writeOutput (data, keycamps_str, csvheader,
valcamp, "DATAWINDOW")
})
lines2.reduceByKey((a:Long,b:Long)=>a+b)
.map(_.swap)
.transform(_.sortByKey(false))
.foreachRDD(rdd => {
val data = rdd.take(20)
writeOutput (data, keycamps_str, csvheader,
valcamp, "DATA")
})
}
}
ssc.start()
ssc.awaitTermination()
}
}
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-problems-running-24x7-tp22518.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]