Hi Yanbo I recently code up the trivial example from http://nlp.stanford.edu/IR-book/html/htmledition/naive-bayes-text-classifica tion-1.html I do not get the same results. I’ll put my code up on github over the weekend if anyone is interested
Andy From: Yanbo Liang <yblia...@gmail.com> Date: Tuesday, January 19, 2016 at 1:11 AM To: Andrew Davidson <a...@santacruzintegration.com> Cc: "user @spark" <user@spark.apache.org> Subject: Re: has any one implemented TF_IDF using ML transformers? > Hi Andy, > > The equation to calculate IDF is: > idf = log((m + 1) / (d(t) + 1)) > you can refer here: > https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/sp > ark/mllib/feature/IDF.scala#L150 > > The equation to calculate TFIDF is: > TFIDF=TF * IDF > you can refer: > https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/sp > ark/mllib/feature/IDF.scala#L226 > > > Thanks > Yanbo > > 2016-01-19 7:05 GMT+08:00 Andy Davidson <a...@santacruzintegration.com>: >> Hi Yanbo >> >> I am using 1.6.0. I am having a hard of time trying to figure out what the >> exact equation is. I do not know Scala. >> >> I took a look a the source code URL you provide. I do not know Scala >> >> override def transform(dataset: DataFrame): DataFrame = { >> transformSchema(dataset.schema, logging = true) >> val idf = udf { vec: Vector => idfModel.transform(vec) } >> dataset.withColumn($(outputCol), idf(col($(inputCol)))) >> } >> >> >> You mentioned the doc is out of date. >> http://spark.apache.org/docs/latest/mllib-feature-extraction.html#tf-idf >> >> Based on my understanding of the subject matter the equations in the java doc >> are correct. I could not find anything like the equations in the source code? >> >> IDF(t,D)=log|D|+1DF(t,D)+1, >> >> TFIDF(t,d,D)=TF(t,d)・IDF(t,D). >> >> >> I found the spark unit test org.apache.spark.mllib.feature.JavaTfIdfSuite the >> results do not match equation. (In general the unit test asserts seem >> incomplete). >> >> >> I have created several small test example to try and figure out how to use >> NaiveBase, HashingTF, and IDF. The values of TFIDF, theta, probabilities , … >> The result produced by spark not match the published results at >> http://nlp.stanford.edu/IR-book/html/htmledition/naive-bayes-text-classificat >> ion-1.html >> >> >> Kind regards >> >> Andy >> >> private DataFrame createTrainingData() { >> >> // make sure we only use dictionarySize words >> >> JavaRDD<Row> rdd = javaSparkContext.parallelize(Arrays.asList( >> >> // 0 is Chinese >> >> // 1 in notChinese >> >> RowFactory.create(0, 0.0, Arrays.asList("Chinese", "Beijing", >> "Chinese")), >> >> RowFactory.create(1, 0.0, Arrays.asList("Chinese", "Chinese", >> "Shanghai")), >> >> RowFactory.create(2, 0.0, Arrays.asList("Chinese", "Macao")), >> >> RowFactory.create(3, 1.0, Arrays.asList("Tokyo", "Japan", >> "Chinese")))); >> >> >> >> return createData(rdd); >> >> } >> >> >> >> private DataFrame createData(JavaRDD<Row> rdd) { >> >> StructField id = null; >> >> id = new StructField("id", DataTypes.IntegerType, false, >> Metadata.empty()); >> >> >> >> StructField label = null; >> >> label = new StructField("label", DataTypes.DoubleType, false, >> Metadata.empty()); >> >> >> >> StructField words = null; >> >> words = new StructField("words", >> DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty()); >> >> >> >> StructType schema = new StructType(new StructField[] { id, label, >> words }); >> >> DataFrame ret = sqlContext.createDataFrame(rdd, schema); >> >> >> >> return ret; >> >> } >> >> >> >> private DataFrame runPipleLineTF_IDF(DataFrame rawDF) { >> >> HashingTF hashingTF = new HashingTF() >> >> .setInputCol("words") >> >> .setOutputCol("tf") >> >> .setNumFeatures(dictionarySize); >> >> >> >> DataFrame termFrequenceDF = hashingTF.transform(rawDF); >> >> >> >> termFrequenceDF.cache(); // idf needs to make 2 passes over data set >> >> //val idf = new IDF(minDocFreq = 2).fit(tf) >> >> IDFModel idf = new IDF() >> >> //.setMinDocFreq(1) // our vocabulary has 6 words we >> hash into 7 >> >> .setInputCol(hashingTF.getOutputCol()) >> >> .setOutputCol("idf") >> >> .fit(termFrequenceDF); >> >> >> >> DataFrame ret = idf.transform(termFrequenceDF); >> >> >> >> return ret; >> >> } >> >> >> >> |-- id: integer (nullable = false) >> >> |-- label: double (nullable = false) >> >> |-- words: array (nullable = false) >> >> | |-- element: string (containsNull = true) >> >> |-- tf: vector (nullable = true) >> >> |-- idf: vector (nullable = true) >> >> >> >> +---+-----+----------------------------+-------------------------+----------- >> --------------------------------------------+ >> >> |id |label|words |tf |idf >> | >> >> +---+-----+----------------------------+-------------------------+----------- >> --------------------------------------------+ >> >> |0 |0.0 |[Chinese, Beijing, Chinese] |(7,[1,2],[2.0,1.0]) >> |(7,[1,2],[0.0,0.9162907318741551]) | >> >> |1 |0.0 |[Chinese, Chinese, Shanghai]|(7,[1,4],[2.0,1.0]) >> |(7,[1,4],[0.0,0.9162907318741551]) | >> >> |2 |0.0 |[Chinese, Macao] |(7,[1,6],[1.0,1.0]) >> |(7,[1,6],[0.0,0.9162907318741551]) | >> >> |3 |1.0 |[Tokyo, Japan, Chinese] >> |(7,[1,3,5],[1.0,1.0,1.0])|(7,[1,3,5],[0.0,0.9162907318741551,0.9162907318741 >> 551])| >> >> +---+-----+----------------------------+-------------------------+----------- >> --------------------------------------------+ >> >> >> >> >> Here is the spark test case >> >> >> >> @Test >> >> public void tfIdf() { >> >> // The tests are to check Java compatibility. >> >> HashingTF tf = new HashingTF(); >> >> @SuppressWarnings("unchecked") >> >> JavaRDD<List<String>> documents = sc.parallelize(Arrays.asList( >> >> Arrays.asList("this is a sentence".split(" ")), >> >> Arrays.asList("this is another sentence".split(" ")), >> >> Arrays.asList("this is still a sentence".split(" "))), 2); >> >> JavaRDD<Vector> termFreqs = tf.transform(documents); >> >> termFreqs.collect(); >> >> IDF idf = new IDF(); >> >> JavaRDD<Vector> tfIdfs = idf.fit(termFreqs).transform(termFreqs); >> >> List<Vector> localTfIdfs = tfIdfs.collect(); >> >> int indexOfThis = tf.indexOf("this"); >> >> System.err.println("AEDWIP: indexOfThis: " + indexOfThis); >> >> >> >> int indexOfSentence = tf.indexOf("sentence"); >> >> System.err.println("AEDWIP: indexOfSentence: " + indexOfSentence); >> >> >> >> int indexOfAnother = tf.indexOf("another"); >> >> System.err.println("AEDWIP: indexOfAnother: " + indexOfAnother); >> >> >> >> for (Vector v: localTfIdfs) { >> >> System.err.println("AEDWIP: V.toString() " + v.toString()); >> >> Assert.assertEquals(0.0, v.apply(indexOfThis), 1e-15); >> >> } >> >> } >> >> >> >> $ mvn test -DwildcardSuites=none >> -Dtest=org.apache.spark.mllib.feature.JavaTfIdfSuite >> >> >> AEDWIP: indexOfThis: 413342 >> >> AEDWIP: indexOfSentence: 251491 >> >> AEDWIP: indexOfAnother: 263939 >> >> AEDWIP: V.toString() >> (1048576,[97,3370,251491,413342],[0.28768207245178085,0.0,0.0,0.0]) >> >> AEDWIP: V.toString() >> (1048576,[3370,251491,263939,413342],[0.0,0.0,0.6931471805599453,0.0]) >> >> AEDWIP: V.toString() >> (1048576,[97,3370,251491,413342,713128],[0.28768207245178085,0.0,0.0,0.0,0.69 >> 31471805599453]) >> >> Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 2.908 sec - >> in org.apache.spark.mllib.feature.JavaTfIdfSuite >> >> >> From: Yanbo Liang <yblia...@gmail.com> >> Date: Sunday, January 17, 2016 at 12:34 AM >> To: Andrew Davidson <a...@santacruzintegration.com> >> Cc: "user @spark" <user@spark.apache.org> >> Subject: Re: has any one implemented TF_IDF using ML transformers? >> >>> Hi Andy, >>> >>> Actually, the output of ML IDF model is the TF-IDF vector of each instance >>> rather than IDF vector. >>> So it's unnecessary to do member wise multiplication to calculate TF-IDF >>> value. You can refer the code at here: >>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/ >>> spark/ml/feature/IDF.scala#L121 >>> I found the document of IDF is not very clear, we need to update it. >>> >>> Thanks >>> Yanbo >>> >>> 2016-01-16 6:10 GMT+08:00 Andy Davidson <a...@santacruzintegration.com>: >>>> I wonder if I am missing something? TF-IDF is very popular. Spark ML has a >>>> lot of transformers how ever it TF_IDF is not supported directly. >>>> >>>> Spark provide a HashingTF and IDF transformer. The java doc >>>> http://spark.apache.org/docs/latest/mllib-feature-extraction.html#tf-idf >>>> >>>> Mentions you can implement TFIDF as follows >>>> >>>> TFIDF(t,d,D)=TF(t,d)・IDF(t,D). >>>> >>>> The problem I am running into is both HashingTF and IDF return a sparse >>>> vector. >>>> >>>> Ideally the spark code to implement TFIDF would be one line. >>>> >>>> DataFrame ret = tmp.withColumn("features", >>>> tmp.col("tf").multiply(tmp.col("idf"))); >>>> >>>> org.apache.spark.sql.AnalysisException: cannot resolve '(tf * idf)' due to >>>> data type mismatch: '(tf * idf)' requires numeric type, not vector; >>>> >>>> I could implement my own UDF to do member wise multiplication how ever >>>> given how common TF-IDF is I wonder if this code already exists some where >>>> >>>> I found org.apache.spark.util.Vector.Multiplier. There is no documentation >>>> how ever give the argument is double, my guess is it just does scalar >>>> multiplication. >>>> >>>> I guess I could do something like >>>> >>>> Double[] v = mySparkVector.toArray(); >>>> Then use JBlas to do member wise multiplication >>>> >>>> I assume sparceVectors are not distributed so there would not be any >>>> additional communication cost >>>> >>>> >>>> If this code is truly missing. I would be happy to write it and donate it >>>> >>>> Andy >>>> >>>> >>>> From: Andrew Davidson <a...@santacruzintegration.com> >>>> Date: Wednesday, January 13, 2016 at 2:52 PM >>>> To: "user @spark" <user@spark.apache.org> >>>> Subject: trouble calculating TF-IDF data type mismatch: '(tf * idf)' >>>> requires numeric type, not vector; >>>> >>>>> Bellow is a little snippet of my Java Test Code. Any idea how I implement >>>>> member wise vector multiplication? >>>>> >>>>> Kind regards >>>>> >>>>> Andy >>>>> >>>>> transformed df printSchema() >>>>> >>>>> root >>>>> >>>>> |-- id: integer (nullable = false) >>>>> >>>>> |-- label: double (nullable = false) >>>>> >>>>> |-- words: array (nullable = false) >>>>> >>>>> | |-- element: string (containsNull = true) >>>>> >>>>> |-- tf: vector (nullable = true) >>>>> >>>>> |-- idf: vector (nullable = true) >>>>> >>>>> >>>>> >>>>> +---+-----+----------------------------+-------------------------+-------- >>>>> -----------------------------------------------+ >>>>> >>>>> |id |label|words |tf |idf >>>>> | >>>>> >>>>> +---+-----+----------------------------+-------------------------+-------- >>>>> -----------------------------------------------+ >>>>> >>>>> |0 |0.0 |[Chinese, Beijing, Chinese] |(7,[1,2],[2.0,1.0]) >>>>> |(7,[1,2],[0.0,0.9162907318741551]) | >>>>> >>>>> |1 |0.0 |[Chinese, Chinese, Shanghai]|(7,[1,4],[2.0,1.0]) >>>>> |(7,[1,4],[0.0,0.9162907318741551]) | >>>>> >>>>> |2 |0.0 |[Chinese, Macao] |(7,[1,6],[1.0,1.0]) >>>>> |(7,[1,6],[0.0,0.9162907318741551]) | >>>>> >>>>> |3 |1.0 |[Tokyo, Japan, Chinese] >>>>> |(7,[1,3,5],[1.0,1.0,1.0])|(7,[1,3,5],[0.0,0.9162907318741551,0.9162907318 >>>>> 741551])| >>>>> >>>>> +---+-----+----------------------------+-------------------------+-------- >>>>> -----------------------------------------------+ >>>>> >>>>> >>>>> @Test >>>>> >>>>> public void test() { >>>>> >>>>> DataFrame rawTrainingDF = createTrainingData(); >>>>> >>>>> DataFrame trainingDF = runPipleLineTF_IDF(rawTrainingDF); >>>>> >>>>> . . . >>>>> >>>>> } >>>>> >>>>> private DataFrame runPipleLineTF_IDF(DataFrame rawDF) { >>>>> >>>>> HashingTF hashingTF = new HashingTF() >>>>> >>>>> .setInputCol("words") >>>>> >>>>> .setOutputCol("tf") >>>>> >>>>> .setNumFeatures(dictionarySize); >>>>> >>>>> >>>>> >>>>> DataFrame termFrequenceDF = hashingTF.transform(rawDF); >>>>> >>>>> >>>>> >>>>> termFrequenceDF.cache(); // idf needs to make 2 passes over data >>>>> set >>>>> >>>>> IDFModel idf = new IDF() >>>>> >>>>> //.setMinDocFreq(1) // our vocabulary has 6 words >>>>> we hash into 7 >>>>> >>>>> .setInputCol(hashingTF.getOutputCol()) >>>>> >>>>> .setOutputCol("idf") >>>>> >>>>> .fit(termFrequenceDF); >>>>> >>>>> >>>>> >>>>> DataFrame tmp = idf.transform(termFrequenceDF); >>>>> >>>>> >>>>> >>>>> DataFrame ret = tmp.withColumn("features", >>>>> tmp.col("tf").multiply(tmp.col("idf"))); >>>>> >>>>> logger.warn("\ntransformed df printSchema()"); >>>>> >>>>> ret.printSchema(); >>>>> >>>>> ret.show(false); >>>>> >>>>> >>>>> >>>>> return ret; >>>>> >>>>> } >>>>> >>>>> >>>>> >>>>> org.apache.spark.sql.AnalysisException: cannot resolve '(tf * idf)' due to >>>>> data type mismatch: '(tf * idf)' requires numeric type, not vector; >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> private DataFrame createTrainingData() { >>>>> >>>>> // make sure we only use dictionarySize words >>>>> >>>>> JavaRDD<Row> rdd = javaSparkContext.parallelize(Arrays.asList( >>>>> >>>>> // 0 is Chinese >>>>> >>>>> // 1 in notChinese >>>>> >>>>> RowFactory.create(0, 0.0, Arrays.asList("Chinese", >>>>> "Beijing", "Chinese")), >>>>> >>>>> RowFactory.create(1, 0.0, Arrays.asList("Chinese", >>>>> "Chinese", "Shanghai")), >>>>> >>>>> RowFactory.create(2, 0.0, Arrays.asList("Chinese", >>>>> "Macao")), >>>>> >>>>> RowFactory.create(3, 1.0, Arrays.asList("Tokyo", "Japan", >>>>> "Chinese")))); >>>>> >>>>> >>>>> >>>>> return createData(rdd); >>>>> >>>>> } >>>>> >>>>> >>>>> >>>>> private DataFrame createTestData() { >>>>> >>>>> JavaRDD<Row> rdd = javaSparkContext.parallelize(Arrays.asList( >>>>> >>>>> // 0 is Chinese >>>>> >>>>> // 1 in notChinese >>>>> >>>>> // "bernoulli" requires label to be IntegerType >>>>> >>>>> RowFactory.create(4, 1.0, Arrays.asList("Chinese", >>>>> "Chinese", "Chinese", "Tokyo", "Japan")))); >>>>> >>>>> return createData(rdd); >>>>> >>>>> } >>> >> >>