Thanks Sean! I got that working last night similar to how you solved it. Any ideas about how to monitor that same folder in another script by creating a stream? I can use sc.sequenceFile() to read in the RDD, but how do I get the name of the file that got added since there is no sequenceFileStream() method? Thanks again for your help.
> On Jul 22, 2014, at 1:57, "Sean Owen" <[email protected]> wrote: > > What about simply: > > dstream.foreachRDD(_.saveAsSequenceFile(...)) > > ? > >> On Tue, Jul 22, 2014 at 2:06 AM, Barnaby <[email protected]> wrote: >> First of all, I do not know Scala, but learning. >> >> I'm doing a proof of concept by streaming content from a socket, counting >> the words and write it to a Tachyon disk. A different script will read the >> file stream and print out the results. >> >> val lines = ssc.socketTextStream(args(0), args(1).toInt, >> StorageLevel.MEMORY_AND_DISK_SER) >> val words = lines.flatMap(_.split(" ")) >> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) >> wordCounts.saveAs???Files("tachyon://localhost:19998/files/WordCounts") >> ssc.start() >> ssc.awaitTermination() >> >> I already did a proof of concept to write and read sequence files but there >> doesn't seem to be a saveAsSequenceFiles() method in DStream. What is the >> best way to write out an RDD to a stream so that the timestamps are in the >> filenames and so there is minimal overhead in reading the data back in as >> "objects", see below. >> >> My simple successful proof was the following: >> val rdd = sc.parallelize(Array(("a",2), ("b",3), ("c",1))) >> rdd.saveAsSequenceFile("tachyon://.../123.sf2") >> val rdd2 = sc.sequenceFile[String,Int]("tachyon://.../123.sf2") >> >> How can I do something similar with streaming? >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/saveAsSequenceFile-for-DStream-tp10369.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
