when I put 200 png files to Hdfs , I found sparkStreaming counld detect 200 files , but the sum of rdd.count() is less than 200, always between 130 and 170, I don't know why...Is this a Bug? PS: When I put 200 files in hdfs before streaming run , It get the correct count and right result.
Here is the code: def main(args: Array[String]) { val conf = new SparkConf().setMaster(SparkURL) .setAppName("QimageStreaming-broadcast") .setSparkHome(System.getenv("SPARK_HOME")) .setJars(SparkContext.jarOfClass(this.getClass())) conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.kryo.registrator", "qing.hdu.Image.MyRegistrator") conf.set("spark.kryoserializer.buffer.mb", "10"); val ssc = new StreamingContext(conf, Seconds(2)) val inputFormatClass = classOf[QimageInputFormat[Text, Qimage]] val outputFormatClass = classOf[QimageOutputFormat[Text, Qimage]] val input_path = HdfsURL + "/Qimage/input" val output_path = HdfsURL + "/Qimage/output/" val bg_path = HdfsURL + "/Qimage/bg/" val bg = ssc.sparkContext.newAPIHadoopFile[Text, Qimage, QimageInputFormat[Text, Qimage]](bg_path) val bbg = bg.map(data => (data._1.toString(), data._2)) val broadcastbg = ssc.sparkContext.broadcast(bbg) val file = ssc.fileStream[Text, Qimage, QimageInputFormat[Text, Qimage]](input_path) val qingbg = broadcastbg.value.collectAsMap val foreachFunc = (rdd: RDD[(Text, Qimage)], time: Time) => { val rddnum = rdd.count System.out.println("\n\n"+ "rddnum is " + rddnum + "\n\n") if (rddnum > 0) { System.out.println("here is foreachFunc") val a = rdd.keys val b = a.first val cbg = qingbg.get(getbgID(b)).getOrElse(new Qimage) rdd.map(data => (data._1, (new QimageProc(data._1, data._2)).koutu(cbg))) .saveAsNewAPIHadoopFile(output_path, classOf[Text], classOf[Qimage], outputFormatClass) } } file.foreachRDD(foreachFunc) ssc.start() ssc.awaitTermination() } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/streaming-on-hdfs-can-detected-all-new-file-but-the-sum-of-all-the-rdd-count-not-equals-which-had-ded-tp5572.html Sent from the Apache Spark User List mailing list archive at Nabble.com.