The processing speed displayed in the UI doesn’t seem to take everything into account. I also had a low processing time but had to increase batch duration from 30 seconds to 1 minute because waiting batches kept increasing. Now it runs fine.
> On 17.04.2015, at 13:30, González Salgado, Miquel > <miquel.gonza...@tecsidel.es> wrote: > > Hi, > > Thank you for your response, > I think it is not because of the processing speed, in fact the delay is under > 1 second, while the batch interval is 10 seconds… The data volume is low (10 > lines / second) > > Changing to local[8] was worsening the problem (cpu increase more quickly) > > By the way, I have seen some results changing to this call of Kafkautils: > > KafkaUtils.createDirectStream > > CPU usage is low and stable, but memory is slowly increasing… But at least > the process last longer.. > > Best regards, > Miquel > > > De: bit1...@163.com <mailto:bit1...@163.com> [mailto:bit1...@163.com > <mailto:bit1...@163.com>] > Enviado el: jueves, 16 de abril de 2015 10:58 > Para: González Salgado, Miquel; user > Asunto: Re: Streaming problems running 24x7 > > From your description, looks like the data processing speed is far behind the > data receiving speed > > Could you try to increase the core number when you submit the application? > such as local[8]? > > bit1...@163.com <mailto:bit1...@163.com> > > From: Miquel <mailto:miquel.gonza...@tecsidel.es> > Date: 2015-04-16 16:39 > To: user <mailto:user@spark.apache.org> > Subject: Streaming problems running 24x7 > Hello, > I'm finding problems to run a spark streaming job for more than a few hours > (3 or 4). It begins working OK, but it degrades until failure. Some of the > symptoms: > > - Consumed memory and CPU keeps getting higher ang higher, and finally some > error is being thrown (java.lang.Exception: Could not compute split, block > input-0-1429168311800 not found) and data stops being calculated. > > - The delay showed in web UI keeps also increasing. > > - After some hours disk space is being consumed. There are a lot of > directories with name like "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c" > > The job is basically reading information from kafka topic, and calculate > several topN tables for some key and value camps related with netflow data, > some of the parameters are this: > - batch interval: 10 seconds > - window calculation: 1 minute > - spark.cleaner.ttl: 5 minutes > > The execution is standalone on one machine (16GB RAM , 12 cores), and the > options to run it is as follows: > /opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops" > --jars $JARS --class $APPCLASS --master local[2] $APPJAR > > someone has some clues about the problem? I don't know if it is a > configuration problem or some error in the code that is causing memory > leaks.. > > Thank you in advance! > Miquel > > PD: the code is basically this:-------------------------------------- > > object NetflowTopn { > > var appPath = "." > var zkQuorum = "" > var group = "" > var topics = "" > var numThreads = 1 > > var batch_interval = 10 > var n_window = 1 > var n_slide = 1 > var topnsize = 10 > > var hm = Map[String,Int]() > hm += ( "unix_secs" -> 0 ) > hm += ( "unix_nsecs" -> 1 ) > hm += ( "sysuptime" -> 2 ) > hm += ( "exaddr" -> 3 ) > hm += ( "dpkts" -> 4 ) > hm += ( "doctets" -> 5 ) > hm += ( "first" -> 6 ) > hm += ( "last" -> 7 ) > hm += ( "engine_type" -> 8 ) > hm += ( "engine_id" -> 9 ) > hm += ( "srcaddr" -> 10 ) > hm += ( "dstaddr" -> 11 ) > hm += ( "nexthop" -> 12 ) > hm += ( "input" -> 13 ) > hm += ( "output" -> 14 ) > hm += ( "srcport" -> 15 ) > hm += ( "dstport" -> 16 ) > hm += ( "prot" -> 17 ) > hm += ( "tos" -> 18 ) > hm += ( "tcp_flags" -> 19 ) > hm += ( "src_mask" -> 20 ) > hm += ( "dst_mask" -> 21 ) > hm += ( "src_as" -> 22 ) > hm += ( "dst_as" -> 23 ) > > def getKey (lcamps: Array[String], camp: String): String = { > if (camp == "total") return "total" > else return lcamps(hm(camp)) > } > > def getVal (lcamps: Array[String], camp: String): Long = { > if (camp == "flows") return 1L > else return lcamps(hm(camp)).toLong > } > > def getKeyVal (line: String, keycamps: List[String], valcamp: String ) = { > val arr = line.split(",") > (keycamps.map(getKey(arr, _)).mkString(",") , getVal(arr,valcamp) ) > } > > def writeOutput (data: Array[(Long, String)], keycamps_str: String, > csvheader: String, valcamp: String, prefix: String) = { > > val ts = System.currentTimeMillis > val f1 = appPath + "/data/" + prefix + "_" + keycamps_str + "_" + > valcamp + ".csv" > val f1f = new File(f1); > val ftmpf = new File(f1 + ts); > val pw = new PrintWriter(ftmpf) > pw.println(csvheader) > data.foreach{ > t => pw.println (t._2 + "," + t._1) > } > pw.close > ftmpf.renameTo(f1f); > > } > > > def main(args: Array[String]) { > > if (args.length < 1) { > System.err.println("Usage: NetflowTopn <apppath>") > System.exit(1) > } > > appPath = args(0) > > try { > val prop = new Properties() > prop.load(new FileInputStream(appPath + "/conf/app.properties")) > > zkQuorum = prop.getProperty("KAFKA_HOST") > group = prop.getProperty("KAFKA_GROUP") > topics = prop.getProperty("KAFKA_TOPIC") > numThreads = prop.getProperty("THREADS").toInt > > } catch { case e: Exception => > e.printStackTrace() > sys.exit(1) > } > > val sparkConf = new SparkConf().setAppName("netflow-topn") > .set("spark.default.parallelism", "2") > .set("spark.rdd.compress", "true") > .set("spark.streaming.unpersist", "true") > .set("spark.cleaner.ttl", "300") > > val ssc = new StreamingContext(sparkConf, Seconds(batch_interval)) > > val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap > > val kafpar = Map[String, String]( > "zookeeper.connect" -> zkQuorum, > "group.id" -> group, > "zookeeper.connection.timeout.ms" -> "5000", > "auto.commit.interval.ms" -> "60000", > "auto.offset.reset" -> "largest" > ) > > val lines = KafkaUtils.createStream[String, String, StringDecoder, > StringDecoder] (ssc, kafpar, topicMap, > StorageLevel.MEMORY_ONLY_SER).map(_._2).cache() > > val ll_keycamps = List ( List("srcaddr", "dstaddr") > ,List("dstaddr") > ,List("srcaddr") > ,List("srcport") > ,List("dstport") > ,List("total") > ) > > val l_valcamps = List ("doctets" > ,"dpkts" > ,"flows" > ) > > for (keycamps <- ll_keycamps) { > > val keycamps_str = keycamps.mkString("-") > val csvheader = keycamps.mkString(",") + ",amount" > > for (valcamp <- l_valcamps) { > > val lines2 = lines.map( getKeyVal (_, keycamps, valcamp ) > ).cache() > > lines2.reduceByKeyAndWindow((a:Long,b:Long)=>a+b, Seconds(60), > Seconds(10)) > .map(_.swap) > .transform(_.sortByKey(false)) > .foreachRDD(rdd => { > val data = rdd.take(20) > writeOutput (data, keycamps_str, csvheader, > valcamp, "DATAWINDOW") > }) > > > lines2.reduceByKey((a:Long,b:Long)=>a+b) > .map(_.swap) > .transform(_.sortByKey(false)) > .foreachRDD(rdd => { > val data = rdd.take(20) > writeOutput (data, keycamps_str, csvheader, > valcamp, "DATA") > }) > > } > } > > ssc.start() > ssc.awaitTermination() > > } > } > > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-problems-running-24x7-tp22518.html > > <http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-problems-running-24x7-tp22518.html> > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > <mailto:user-unsubscr...@spark.apache.org> > For additional commands, e-mail: user-h...@spark.apache.org > <mailto:user-h...@spark.apache.org>