Try setting the serializer to org.apache.spark.serializer.KryoSerializer (see 
http://spark.apache.org/docs/0.9.1/tuning.html), it should be considerably 
faster.

Matei

On Apr 24, 2014, at 8:01 PM, Earthson Lu <earthson...@gmail.com> wrote:

> spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")
> 
> this line is too slow. There are about 2 million elements in word_mapping.
> 
> Is there a good style for writing a large collection to hdfs?
> 
> import org.apache.spark._
> import SparkContext._
> import scala.io.Source
> object WFilter {
>     def main(args: Array[String]) {
>         val spark = new SparkContext("yarn-standalone","word 
> filter",System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
>         val stopset = 
> Source.fromURL(this.getClass.getResource("stoplist.txt")).getLines.map(_.trim).toSet
>         val file = spark.textFile("hdfs://ns1/nlp/wiki.splited")
>         val tf_map = spark broadcast 
> file.flatMap(_.split("\t")).map((_,1)).countByKey
>         val df_map = spark broadcast 
> file.map(x=>Set(x.split("\t"):_*)).flatMap(_.map(_->1)).countByKey
>         val word_mapping = spark broadcast 
> Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
>         def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w) < 
> 4 || (stopset contains w)) false else true
>         val mapped = 
> file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t"))
>         
> spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")
>         mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs")
>         spark.stop()
>     }
> } 
> 
> many thx:) 
> 
> -- 
> 
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> Perfection is achieved 
> not when there is nothing more to add
>  but when there is nothing left to take away

Reply via email to