Hi, If your input format is user -> comment, then you could:
val comments = sc.parallelize(List(("u1", "one two one"), ("u2", "three four three"))) val wordCounts = comments. flatMap({case (user, comment) => for (word <- comment.split(" ")) yield(((user, word), 1)) }). reduceByKey(_ + _) val output = wordCounts. map({case ((user, word), count) => (user, (word, count))}). groupByKey() By Aniket, if we group by user first, it could run out of memory when spark tries to put all words in a single sequence, couldn't it? On Sat, Sep 19, 2015 at 11:05 PM Aniket Bhatnagar < aniket.bhatna...@gmail.com> wrote: > Using scala API, you can first group by user and then use combineByKey. > > Thanks, > Aniket > > On Sat, Sep 19, 2015, 6:41 PM kali.tumm...@gmail.com < > kali.tumm...@gmail.com> wrote: > >> Hi All, >> I would like to achieve this below output using spark , I managed to write >> in Hive and call it in spark but not in just spark (scala), how to group >> word counts on particular user (column) for example. >> Imagine users and their given tweets I want to do word count based on user >> name. >> >> Input:- >> kali A,B,A,B,B >> james B,A,A,A,B >> >> Output:- >> kali A [Count] B [Count] >> James A [Count] B [Count] >> >> My Hive Answer:- >> CREATE EXTERNAL TABLE TEST >> ( >> user_name string , >> COMMENTS STRING >> >> ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' STORED AS TEXTFILE >> LOCATION '/data/kali/test'; ---- HDFS FOLDER (create hdfs folder and >> create a text file with data mentioned in the email) >> >> use default;select user_name,COLLECT_SET(text) from (select >> user_name,concat(sub,' ',count(comments)) as text from test LATERAL VIEW >> explode(split(comments,',')) subView AS sub group by user_name,sub)w group >> by user_name; >> >> Spark With Hive:- >> package com.examples >> >> /** >> * Created by kalit_000 on 17/09/2015. >> */ >> import org.apache.log4j.Logger >> import org.apache.log4j.Level >> import org.apache.spark.sql.SQLContext >> import org.apache.spark.sql.hive.HiveContext >> import org.apache.spark.{SparkContext, SparkConf} >> import org.apache.spark.SparkContext._ >> >> >> object HiveWordCount { >> >> def main(args: Array[String]): Unit = >> { >> Logger.getLogger("org").setLevel(Level.WARN) >> Logger.getLogger("akka").setLevel(Level.WARN) >> >> val conf = new >> >> SparkConf().setMaster("local").setAppName("HiveWordCount").set("spark.executor.memory", >> "1g") >> val sc = new SparkContext(conf) >> val sqlContext= new SQLContext(sc) >> >> val hc=new HiveContext(sc) >> >> hc.sql("CREATE EXTERNAL TABLE IF NOT EXISTS default.TEST (user_name >> string ,COMMENTS STRING )ROW FORMAT DELIMITED FIELDS TERMINATED BY '001' >> STORED AS TEXTFILE LOCATION '/data/kali/test' ") >> >> val op=hc.sql("select user_name,COLLECT_SET(text) from (select >> user_name,concat(sub,' ',count(comments)) as text from default.test >> LATERAL >> VIEW explode(split(comments,',')) subView AS sub group by user_name,sub)w >> group by user_name") >> >> op.collect.foreach(println) >> >> >> } >> >> >> >> >> Thanks >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/word-count-group-by-users-in-spark-tp24748.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >>