I got it. I mistakenly thought that each line is a wordid list. On Fri, Jan 15, 2016 at 3:24 AM, Bryan Cutler <cutl...@gmail.com> wrote: > What I mean is the input to LDA.run() is a RDD[(Long, Vector)] and the > Vector is a vector of counts of each term and should be the same size as the > vocabulary (so if the vocabulary, or dictionary has 10 words, each vector > should have a size of 10). This probably means that there will be some > elements with zero counts, and a sparse vector might be a good way to handle > that. > > On Wed, Jan 13, 2016 at 6:40 PM, Li Li <fancye...@gmail.com> wrote: >> >> It looks like the problem is the vectors of term counts in the corpus >> are not always the vocabulary size. >> Do you mean some integers not occured in the corpus? >> for example, I have the dictionary is 0 - 9 (total 10 words). >> The docs are: >> 0 2 4 6 8 >> 1 3 5 7 9 >> Then it will be correct >> If the docs are: >> 0 2 4 6 9 >> 1 3 5 6 7 >> 8 is not occured in any document, Then it will wrong? >> >> So the workaround is to process the input to re-encode terms? >> >> On Thu, Jan 14, 2016 at 6:53 AM, Bryan Cutler <cutl...@gmail.com> wrote: >> > I was now able to reproduce the exception using the master branch and >> > local >> > mode. It looks like the problem is the vectors of term counts in the >> > corpus >> > are not always the vocabulary size. Once I padded these with zero >> > counts to >> > the vocab size, it ran without the exception. >> > >> > Joseph, I also tried calling describeTopics and noticed that with the >> > improper vector size, it will not throw an exception but the term >> > indices >> > will start to be incorrect. For a small number of iterations, it is ok, >> > but >> > increasing iterations causes the indices to get larger also. Maybe that >> > is >> > what is going on in the JIRA you linked to? >> > >> > On Wed, Jan 13, 2016 at 1:17 AM, Li Li <fancye...@gmail.com> wrote: >> >> >> >> I will try spark 1.6.0 to see it is the bug of 1.5.2. >> >> >> >> On Wed, Jan 13, 2016 at 3:58 PM, Li Li <fancye...@gmail.com> wrote: >> >> > I have set up a stand alone spark cluster and use the same codes. it >> >> > still failed with the same exception >> >> > I also preprocessed the data to lines of integers and use the scala >> >> > codes of lda example. it still failed. >> >> > the codes: >> >> > >> >> > import org.apache.spark.mllib.clustering.{ LDA, DistributedLDAModel } >> >> > >> >> > import org.apache.spark.mllib.linalg.Vectors >> >> > >> >> > import org.apache.spark.SparkContext >> >> > >> >> > import org.apache.spark.SparkContext._ >> >> > >> >> > import org.apache.spark.SparkConf >> >> > >> >> > >> >> > object TestLDA { >> >> > >> >> > def main(args: Array[String]) { >> >> > >> >> > if(args.length!=4){ >> >> > >> >> > println("need 4 args inDir outDir topic iternum") >> >> > >> >> > System.exit(-1) >> >> > >> >> > } >> >> > >> >> > val conf = new SparkConf().setAppName("TestLDA") >> >> > >> >> > val sc = new SparkContext(conf) >> >> > >> >> > // Load and parse the data >> >> > >> >> > val data = sc.textFile(args(0)) >> >> > >> >> > val parsedData = data.map(s => Vectors.dense(s.trim.split(' >> >> > ').map(_.toDouble))) >> >> > >> >> > // Index documents with unique IDs >> >> > >> >> > val corpus = parsedData.zipWithIndex.map(_.swap).cache() >> >> > >> >> > val topicNum=Integer.valueOf(args(2)) >> >> > >> >> > val iterNum=Integer.valueOf(args(1)) >> >> > >> >> > // Cluster the documents into three topics using LDA >> >> > >> >> > val ldaModel = new >> >> > LDA().setK(topicNum).setMaxIterations(iterNum).run(corpus) >> >> > >> >> > >> >> > // Output topics. Each is a distribution over words (matching >> >> > word >> >> > count vectors) >> >> > >> >> > println("Learned topics (as distributions over vocab of " + >> >> > ldaModel.vocabSize + " words):") >> >> > >> >> > val topics = ldaModel.topicsMatrix >> >> > >> >> > for (topic <- Range(0, topicNum)) { >> >> > >> >> > print("Topic " + topic + ":") >> >> > >> >> > for (word <- Range(0, ldaModel.vocabSize)) { print(" " + >> >> > topics(word, topic)); } >> >> > >> >> > println() >> >> > >> >> > } >> >> > >> >> > >> >> > // Save and load model. >> >> > >> >> > ldaModel.save(sc, args(1)) >> >> > >> >> > } >> >> > >> >> > >> >> > } >> >> > >> >> > scripts to submit: >> >> > >> >> > ~/spark-1.5.2-bin-hadoop2.6/bin/spark-submit --class >> >> > com.mobvoi.knowledgegraph.scala_test.TestLDA \ >> >> > >> >> > --master spark://master:7077 \ >> >> > >> >> > --num-executors 10 \ >> >> > >> >> > --executor-memory 4g \ >> >> > >> >> > --executor-cores 3 \ >> >> > >> >> > scala_test-1.0-jar-with-dependencies.jar \ >> >> > >> >> > /test.txt \ >> >> > >> >> > 100 \ >> >> > >> >> > 5 \ >> >> > >> >> > /lda_model >> >> > >> >> > test.txt is in attachment >> >> > >> >> > >> >> > On Sat, Jan 9, 2016 at 6:21 AM, Bryan Cutler <cutl...@gmail.com> >> >> > wrote: >> >> >> Hi Li, >> >> >> >> >> >> I tried out your code and sample data in both local mode and Spark >> >> >> Standalone and it ran correctly with output that looks good. Sorry, >> >> >> I >> >> >> don't >> >> >> have a YARN cluster setup right now, so maybe the error you are >> >> >> seeing >> >> >> is >> >> >> specific to that. Btw, I am running the latest Spark code from the >> >> >> master >> >> >> branch. Hope that helps some! >> >> >> >> >> >> Bryan >> >> >> >> >> >> On Mon, Jan 4, 2016 at 8:42 PM, Li Li <fancye...@gmail.com> wrote: >> >> >>> >> >> >>> anyone could help? the problem is very easy to reproduce. What's >> >> >>> wrong? >> >> >>> >> >> >>> On Wed, Dec 30, 2015 at 8:59 PM, Li Li <fancye...@gmail.com> wrote: >> >> >>> > I use a small data and reproduce the problem. >> >> >>> > But I don't know my codes are correct or not because I am not >> >> >>> > familiar >> >> >>> > with spark. >> >> >>> > So I first post my codes here. If it's correct, then I will post >> >> >>> > the >> >> >>> > data. >> >> >>> > one line of my data like: >> >> >>> > >> >> >>> > { "time":"08-09-17","cmtUrl":"2094361" >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > ,"rvId":"rev_10000020","webpageUrl":"http://www.dianping.com/shop/2094361","word_vec":[0,1,2,3,4,5,6,2,7,8,9 >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > ,10,11,12,13,14,15,16,8,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,32,35,36,37,38,15,39,40,41,42,5,43,44,17,45,46,42,47,26,48,49]} >> >> >>> > >> >> >>> > it's a json file which contains webpageUrl and word_vec which is >> >> >>> > the >> >> >>> > encoded words. >> >> >>> > The first step is to prase the input rdd to a rdd of VectorUrl. >> >> >>> > BTW, if public VectorUrl call(String s) return null, is it ok? >> >> >>> > Then follow the example Index documents with unique IDs >> >> >>> > Then I create a rdd to map id to url so after lda training, I can >> >> >>> > find >> >> >>> > the url of the document. Then save this rdd to hdfs. >> >> >>> > Then create corpus rdd and train >> >> >>> > >> >> >>> > The exception stack is >> >> >>> > >> >> >>> > 15/12/30 20:45:42 ERROR yarn.ApplicationMaster: User class threw >> >> >>> > exception: java.lang.IndexOutOfBoundsException: (454,0) not in >> >> >>> > [-58,58) x [-100,100) >> >> >>> > java.lang.IndexOutOfBoundsException: (454,0) not in [-58,58) x >> >> >>> > [-100,100) >> >> >>> > at >> >> >>> > >> >> >>> > breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112) >> >> >>> > at >> >> >>> > >> >> >>> > >> >> >>> > org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534) >> >> >>> > at >> >> >>> > >> >> >>> > >> >> >>> > org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531) >> >> >>> > at >> >> >>> > >> >> >>> > >> >> >>> > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> >> >>> > at >> >> >>> > >> >> >>> > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> >> >>> > at >> >> >>> > >> >> >>> > >> >> >>> > org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531) >> >> >>> > at >> >> >>> > >> >> >>> > >> >> >>> > org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523) >> >> >>> > at >> >> >>> > >> >> >>> > >> >> >>> > com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:89) >> >> >>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >> >>> > at >> >> >>> > >> >> >>> > >> >> >>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> >> >>> > at >> >> >>> > >> >> >>> > >> >> >>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> >>> > at java.lang.reflect.Method.invoke(Method.java:606) >> >> >>> > at >> >> >>> > >> >> >>> > >> >> >>> > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525) >> >> >>> > >> >> >>> > >> >> >>> > ==========here is my codes============== >> >> >>> > >> >> >>> > SparkConf conf = new >> >> >>> > SparkConf().setAppName(ReviewLDA.class.getName()); >> >> >>> > >> >> >>> > JavaSparkContext sc = new JavaSparkContext(conf); >> >> >>> > >> >> >>> > >> >> >>> > // Load and parse the data >> >> >>> > >> >> >>> > JavaRDD<String> data = sc.textFile(inputDir + "/*"); >> >> >>> > >> >> >>> > JavaRDD<VectorUrl> parsedData = data.map(new Function<String, >> >> >>> > VectorUrl>() { >> >> >>> > >> >> >>> > public VectorUrl call(String s) { >> >> >>> > >> >> >>> > JsonParser parser = new JsonParser(); >> >> >>> > >> >> >>> > JsonObject jo = parser.parse(s).getAsJsonObject(); >> >> >>> > >> >> >>> > if (!jo.has("word_vec") || !jo.has("webpageUrl")) { >> >> >>> > >> >> >>> > return null; >> >> >>> > >> >> >>> > } >> >> >>> > >> >> >>> > JsonArray word_vec = jo.get("word_vec").getAsJsonArray(); >> >> >>> > >> >> >>> > String url = jo.get("webpageUrl").getAsString(); >> >> >>> > >> >> >>> > double[] values = new double[word_vec.size()]; >> >> >>> > >> >> >>> > for (int i = 0; i < values.length; i++) >> >> >>> > >> >> >>> > values[i] = word_vec.get(i).getAsInt(); >> >> >>> > >> >> >>> > return new VectorUrl(Vectors.dense(values), url); >> >> >>> > >> >> >>> > } >> >> >>> > >> >> >>> > }); >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > // Index documents with unique IDs >> >> >>> > >> >> >>> > JavaPairRDD<Long, VectorUrl> id2doc = >> >> >>> > JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map( >> >> >>> > >> >> >>> > new Function<Tuple2<VectorUrl, Long>, Tuple2<Long, >> >> >>> > VectorUrl>>() >> >> >>> > { >> >> >>> > >> >> >>> > public Tuple2<Long, VectorUrl> call(Tuple2<VectorUrl, >> >> >>> > Long> >> >> >>> > doc_id) { >> >> >>> > >> >> >>> > return doc_id.swap(); >> >> >>> > >> >> >>> > } >> >> >>> > >> >> >>> > })); >> >> >>> > >> >> >>> > JavaPairRDD<Long, String> id2Url = >> >> >>> > JavaPairRDD.fromJavaRDD(id2doc >> >> >>> > >> >> >>> > .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long, >> >> >>> > String>>() { >> >> >>> > >> >> >>> > @Override >> >> >>> > >> >> >>> > public Tuple2<Long, String> call(Tuple2<Long, >> >> >>> > VectorUrl> >> >> >>> > id2doc) throws Exception { >> >> >>> > >> >> >>> > return new Tuple2(id2doc._1, id2doc._2.url); >> >> >>> > >> >> >>> > } >> >> >>> > >> >> >>> > })); >> >> >>> > >> >> >>> > id2Url.saveAsTextFile(id2UrlPath); >> >> >>> > >> >> >>> > JavaPairRDD<Long, Vector> corpus = >> >> >>> > JavaPairRDD.fromJavaRDD(id2doc >> >> >>> > >> >> >>> > .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long, >> >> >>> > Vector>>() { >> >> >>> > >> >> >>> > @Override >> >> >>> > >> >> >>> > public Tuple2<Long, Vector> call(Tuple2<Long, >> >> >>> > VectorUrl> >> >> >>> > id2doc) throws Exception { >> >> >>> > >> >> >>> > return new Tuple2(id2doc._1, id2doc._2.vec); >> >> >>> > >> >> >>> > } >> >> >>> > >> >> >>> > })); >> >> >>> > >> >> >>> > corpus.cache(); >> >> >>> > >> >> >>> > >> >> >>> > // Cluster the documents into three topics using LDA >> >> >>> > >> >> >>> > DistributedLDAModel ldaModel = (DistributedLDAModel) new >> >> >>> > LDA().setMaxIterations(iterNumber) >> >> >>> > >> >> >>> > .setK(topicNumber).run(corpus); >> >> >>> > >> >> >>> > On Wed, Dec 30, 2015 at 3:34 PM, Li Li <fancye...@gmail.com> >> >> >>> > wrote: >> >> >>> >> I will use a portion of data and try. will the hdfs block affect >> >> >>> >> spark?(if so, it's hard to reproduce) >> >> >>> >> >> >> >>> >> On Wed, Dec 30, 2015 at 3:22 AM, Joseph Bradley >> >> >>> >> <jos...@databricks.com> >> >> >>> >> wrote: >> >> >>> >>> Hi Li, >> >> >>> >>> >> >> >>> >>> I'm wondering if you're running into the same bug reported >> >> >>> >>> here: >> >> >>> >>> https://issues.apache.org/jira/browse/SPARK-12488 >> >> >>> >>> >> >> >>> >>> I haven't figured out yet what is causing it. Do you have a >> >> >>> >>> small >> >> >>> >>> corpus >> >> >>> >>> which reproduces this error, and which you can share on the >> >> >>> >>> JIRA? >> >> >>> >>> If >> >> >>> >>> so, >> >> >>> >>> that would help a lot in debugging this failure. >> >> >>> >>> >> >> >>> >>> Thanks! >> >> >>> >>> Joseph >> >> >>> >>> >> >> >>> >>> On Sun, Dec 27, 2015 at 7:26 PM, Li Li <fancye...@gmail.com> >> >> >>> >>> wrote: >> >> >>> >>>> >> >> >>> >>>> I ran my lda example in a yarn 2.6.2 cluster with spark 1.5.2. >> >> >>> >>>> it throws exception in line: Matrix topics = >> >> >>> >>>> ldaModel.topicsMatrix(); >> >> >>> >>>> But in yarn job history ui, it's successful. What's wrong with >> >> >>> >>>> it? >> >> >>> >>>> I submit job with >> >> >>> >>>> .bin/spark-submit --class Myclass \ >> >> >>> >>>> --master yarn-client \ >> >> >>> >>>> --num-executors 2 \ >> >> >>> >>>> --driver-memory 4g \ >> >> >>> >>>> --executor-memory 4g \ >> >> >>> >>>> --executor-cores 1 \ >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> My codes: >> >> >>> >>>> >> >> >>> >>>> corpus.cache(); >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> // Cluster the documents into three topics using LDA >> >> >>> >>>> >> >> >>> >>>> DistributedLDAModel ldaModel = (DistributedLDAModel) new >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> LDA().setOptimizer("em").setMaxIterations(iterNumber).setK(topicNumber).run(corpus); >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> // Output topics. Each is a distribution over words >> >> >>> >>>> (matching >> >> >>> >>>> word >> >> >>> >>>> count vectors) >> >> >>> >>>> >> >> >>> >>>> System.out.println("Learned topics (as distributions over >> >> >>> >>>> vocab >> >> >>> >>>> of >> >> >>> >>>> " + ldaModel.vocabSize() >> >> >>> >>>> >> >> >>> >>>> + " words):"); >> >> >>> >>>> >> >> >>> >>>> //Line81, exception here: Matrix topics = >> >> >>> >>>> ldaModel.topicsMatrix(); >> >> >>> >>>> >> >> >>> >>>> for (int topic = 0; topic < topicNumber; topic++) { >> >> >>> >>>> >> >> >>> >>>> System.out.print("Topic " + topic + ":"); >> >> >>> >>>> >> >> >>> >>>> for (int word = 0; word < ldaModel.vocabSize(); word++) >> >> >>> >>>> { >> >> >>> >>>> >> >> >>> >>>> System.out.print(" " + topics.apply(word, topic)); >> >> >>> >>>> >> >> >>> >>>> } >> >> >>> >>>> >> >> >>> >>>> System.out.println(); >> >> >>> >>>> >> >> >>> >>>> } >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> ldaModel.save(sc.sc(), modelPath); >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> Exception in thread "main" >> >> >>> >>>> java.lang.IndexOutOfBoundsException: >> >> >>> >>>> (1025,0) not in [-58,58) x [-100,100) >> >> >>> >>>> >> >> >>> >>>> at >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112) >> >> >>> >>>> >> >> >>> >>>> at >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534) >> >> >>> >>>> >> >> >>> >>>> at >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531) >> >> >>> >>>> >> >> >>> >>>> at >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> >> >>> >>>> >> >> >>> >>>> at >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> >> >>> >>>> >> >> >>> >>>> at >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531) >> >> >>> >>>> >> >> >>> >>>> at >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523) >> >> >>> >>>> >> >> >>> >>>> at >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:81) >> >> >>> >>>> >> >> >>> >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native >> >> >>> >>>> Method) >> >> >>> >>>> >> >> >>> >>>> at >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> >> >>> >>>> >> >> >>> >>>> at >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> >>> >>>> >> >> >>> >>>> at java.lang.reflect.Method.invoke(Method.java:606) >> >> >>> >>>> >> >> >>> >>>> at >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674) >> >> >>> >>>> >> >> >>> >>>> at >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) >> >> >>> >>>> >> >> >>> >>>> at >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) >> >> >>> >>>> >> >> >>> >>>> at >> >> >>> >>>> >> >> >>> >>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) >> >> >>> >>>> >> >> >>> >>>> at >> >> >>> >>>> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >> >> >>> >>>> >> >> >>> >>>> 15/12/23 00:01:16 INFO spark.SparkContext: Invoking stop() >> >> >>> >>>> from >> >> >>> >>>> shutdown >> >> >>> >>>> hook >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> --------------------------------------------------------------------- >> >> >>> >>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org >> >> >>> >>>> For additional commands, e-mail: dev-h...@spark.apache.org >> >> >>> >>>> >> >> >>> >>> >> >> >>> >> >> >>> >> >> >>> --------------------------------------------------------------------- >> >> >>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org >> >> >>> For additional commands, e-mail: dev-h...@spark.apache.org >> >> >>> >> >> >> >> > >> > > >
--------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org