A very crucial thing to remember when using file stream is that the files
must be written to the monitored directory "atomically". That is when the
file system show the file in its listing, the file should not be appended /
updated after that. That often causes this kind of issues, as spark
streaming may the file (soon after it is visible in the listing) and may
try to process it even before all of the data has been written.

So the best way to feed data into spark streaming is to write the file to a
temp dir, and them "move" / "rename" them into the monitored directory.
That makes it "atomic". This is mentioned in the API docs of
fileStream<http://spark.apache.org/docs/0.9.1/api/streaming/index.html#org.apache.spark.streaming.StreamingContext>
.

TD



On Sun, May 11, 2014 at 7:30 PM, zzzzzqf12345 <zzzzzqf12...@gmail.com>wrote:

> when I put 200 png files to Hdfs , I found sparkStreaming counld detect 200
> files , but the sum of rdd.count() is less than 200, always between 130 and
> 170, I don't know why...Is this a Bug?
> PS: When I put 200 files in hdfs before streaming run , It get the correct
> count and right result.
>
> Here is the code:
>
> def main(args: Array[String]) {
> val conf = new SparkConf().setMaster(SparkURL)
> .setAppName("QimageStreaming-broadcast")
> .setSparkHome(System.getenv("SPARK_HOME"))
> .setJars(SparkContext.jarOfClass(this.getClass()))
> conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
> conf.set("spark.kryo.registrator", "qing.hdu.Image.MyRegistrator")
> conf.set("spark.kryoserializer.buffer.mb", "10");
> val ssc = new StreamingContext(conf, Seconds(2))
> val inputFormatClass = classOf[QimageInputFormat[Text, Qimage]]
> val outputFormatClass = classOf[QimageOutputFormat[Text, Qimage]]
> val input_path = HdfsURL + "/Qimage/input"
> val output_path = HdfsURL + "/Qimage/output/"
> val bg_path = HdfsURL + "/Qimage/bg/"
> val bg = ssc.sparkContext.newAPIHadoopFile[Text, Qimage,
> QimageInputFormat[Text, Qimage]](bg_path)
> val bbg = bg.map(data => (data._1.toString(), data._2))
> val broadcastbg = ssc.sparkContext.broadcast(bbg)
> val file = ssc.fileStream[Text, Qimage, QimageInputFormat[Text,
> Qimage]](input_path)
> val qingbg = broadcastbg.value.collectAsMap
> val foreachFunc = (rdd: RDD[(Text, Qimage)], time: Time) => {
> val rddnum = rdd.count
> System.out.println("\n\n"+ "rddnum is " + rddnum + "\n\n")
> if (rddnum > 0)
> {
> System.out.println("here is foreachFunc")
> val a = rdd.keys
> val b = a.first
>  val cbg = qingbg.get(getbgID(b)).getOrElse(new Qimage)
> rdd.map(data => (data._1, (new QimageProc(data._1, data._2)).koutu(cbg)))
> .saveAsNewAPIHadoopFile(output_path, classOf[Text], classOf[Qimage],
> outputFormatClass) }
> }
> file.foreachRDD(foreachFunc)
> ssc.start()
> ssc.awaitTermination()
> }
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/streaming-on-hdfs-can-detected-all-new-file-but-the-sum-of-all-the-rdd-count-not-equals-which-had-ded-tp5572.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Reply via email to