Hi,

If your input format is user -> comment, then you could:

val comments = sc.parallelize(List(("u1", "one two one"), ("u2", "three
four three")))
val wordCounts = comments.
   flatMap({case (user, comment) =>
        for (word <- comment.split(" ")) yield(((user, word), 1)) }).
   reduceByKey(_ + _)

val output = wordCounts.
   map({case ((user, word), count) => (user, (word, count))}).
   groupByKey()

By Aniket, if we group by user first, it could run out of memory when spark
tries to put all words in a single sequence, couldn't it?

On Sat, Sep 19, 2015 at 11:05 PM Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> Using scala API, you can first group by user and then use combineByKey.
>
> Thanks,
> Aniket
>
> On Sat, Sep 19, 2015, 6:41 PM kali.tumm...@gmail.com <
> kali.tumm...@gmail.com> wrote:
>
>> Hi All,
>> I would like to achieve this below output using spark , I managed to write
>> in Hive and call it in spark but not in just spark (scala), how to group
>> word counts on particular user (column) for example.
>> Imagine users and their given tweets I want to do word count based on user
>> name.
>>
>> Input:-
>> kali    A,B,A,B,B
>> james B,A,A,A,B
>>
>> Output:-
>> kali A [Count] B [Count]
>> James A [Count] B [Count]
>>
>> My Hive Answer:-
>> CREATE EXTERNAL TABLE  TEST
>> (
>>      user_name                     string                   ,
>>      COMMENTS                      STRING
>>
>> )  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'  STORED AS TEXTFILE
>> LOCATION '/data/kali/test';  ----  HDFS FOLDER (create hdfs folder and
>> create a text file with data mentioned in the email)
>>
>> use default;select user_name,COLLECT_SET(text) from (select
>> user_name,concat(sub,' ',count(comments)) as text  from test LATERAL VIEW
>> explode(split(comments,',')) subView AS sub group by user_name,sub)w group
>> by user_name;
>>
>> Spark With Hive:-
>> package com.examples
>>
>> /**
>>  * Created by kalit_000 on 17/09/2015.
>>  */
>> import org.apache.log4j.Logger
>> import org.apache.log4j.Level
>> import org.apache.spark.sql.SQLContext
>> import org.apache.spark.sql.hive.HiveContext
>> import org.apache.spark.{SparkContext, SparkConf}
>> import org.apache.spark.SparkContext._
>>
>>
>> object HiveWordCount {
>>
>>   def main(args: Array[String]): Unit =
>>   {
>>     Logger.getLogger("org").setLevel(Level.WARN)
>>     Logger.getLogger("akka").setLevel(Level.WARN)
>>
>>     val conf = new
>>
>> SparkConf().setMaster("local").setAppName("HiveWordCount").set("spark.executor.memory",
>> "1g")
>>     val sc = new SparkContext(conf)
>>     val sqlContext= new SQLContext(sc)
>>
>>     val hc=new HiveContext(sc)
>>
>>     hc.sql("CREATE EXTERNAL TABLE IF NOT EXISTS default.TEST  (user_name
>> string ,COMMENTS STRING )ROW FORMAT DELIMITED FIELDS TERMINATED BY '001'
>> STORED AS TEXTFILE LOCATION '/data/kali/test' ")
>>
>>     val op=hc.sql("select user_name,COLLECT_SET(text) from (select
>> user_name,concat(sub,' ',count(comments)) as text  from default.test
>> LATERAL
>> VIEW explode(split(comments,',')) subView AS sub group by user_name,sub)w
>> group by user_name")
>>
>>     op.collect.foreach(println)
>>
>>
>>   }
>>
>>
>>
>>
>> Thanks
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/word-count-group-by-users-in-spark-tp24748.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>

Reply via email to