Kryo With Exception below: com.esotericsoftware.kryo.KryoException (com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 1) com.esotericsoftware.kryo.io.Output.require(Output.java:138) com.esotericsoftware.kryo.io.Output.writeAscii_slow(Output.java:446) com.esotericsoftware.kryo.io.Output.writeString(Output.java:306) com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153) com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:79) com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) com.twitter.chill.SomeSerializer.write(SomeSerializer.scala:21) com.twitter.chill.SomeSerializer.write(SomeSerializer.scala:19) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:124) org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223) org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) java.lang.Thread.run(Thread.java:722)
~~~~~~~~~~~ package com.semi.nlp import org.apache.spark._ import SparkContext._ import scala.io.Source import com.esotericsoftware.kryo.Kryo import org.apache.spark.serializer.KryoRegistrator class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[Map[String,Int]]) } } object WFilter2 { def initspark(name:String) = { val conf = new SparkConf() .setMaster("yarn-standalone") .setAppName(name) .setSparkHome(System.getenv("SPARK_HOME")) .setJars(SparkContext.jarOfClass(this.getClass)) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrator", "com.semi.nlp.MyRegistrator") new SparkContext(conf) } def main(args: Array[String]) { val spark = initspark("word filter mapping") val stopset = Source.fromURL(this.getClass.getResource("/stoplist.txt")).getLines.map(_.trim).toSet val file = spark.textFile("hdfs://ns1/nlp/wiki.splited") val tf_map = spark broadcast file.flatMap(_.split("\t")).map((_,1)).countByKey val df_map = spark broadcast file.map(x=>Set(x.split("\t"):_*)).flatMap(_.map(_->1)).countByKey val word_mapping = spark broadcast Map(df_map.value.keys.zipWithIndex.toBuffer:_*) def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w) < 4 || (stopset contains w)) false else true val mapped = file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t")) spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping") mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs") spark.stop() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4809.html Sent from the Apache Spark User List mailing list archive at Nabble.com.