Kryo With Exception below:

com.esotericsoftware.kryo.KryoException
(com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0,
required: 1)
com.esotericsoftware.kryo.io.Output.require(Output.java:138)
com.esotericsoftware.kryo.io.Output.writeAscii_slow(Output.java:446)
com.esotericsoftware.kryo.io.Output.writeString(Output.java:306)
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153)
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:79)
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
com.twitter.chill.SomeSerializer.write(SomeSerializer.scala:21)
com.twitter.chill.SomeSerializer.write(SomeSerializer.scala:19)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:124)
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223)
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
java.lang.Thread.run(Thread.java:722)

~~~~~~~~~~~

package com.semi.nlp

import org.apache.spark._
import SparkContext._
import scala.io.Source
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyRegistrator extends KryoRegistrator {
    override def registerClasses(kryo: Kryo) {
        kryo.register(classOf[Map[String,Int]])
    }
}

object WFilter2 {
    def initspark(name:String) = {
        val conf = new SparkConf()
                    .setMaster("yarn-standalone")
                    .setAppName(name)
                    .setSparkHome(System.getenv("SPARK_HOME"))
                    .setJars(SparkContext.jarOfClass(this.getClass))
                    .set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
                    .set("spark.kryo.registrator",
"com.semi.nlp.MyRegistrator")
        new SparkContext(conf)
    }

    def main(args: Array[String]) {
        val spark = initspark("word filter mapping")
        val stopset =
Source.fromURL(this.getClass.getResource("/stoplist.txt")).getLines.map(_.trim).toSet
        val file = spark.textFile("hdfs://ns1/nlp/wiki.splited")
        val tf_map = spark broadcast
file.flatMap(_.split("\t")).map((_,1)).countByKey
        val df_map = spark broadcast
file.map(x=>Set(x.split("\t"):_*)).flatMap(_.map(_->1)).countByKey
        val word_mapping = spark broadcast
Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
        def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w)
< 4 || (stopset contains w)) false else true
        val mapped =
file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t"))
       
spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")
        mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs")
        spark.stop()
    }
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4809.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to