Hi,
If your input format is user -> comment, then you could:
val comments = sc.parallelize(List(("u1", "one two one"), ("u2", "three
four three")))
val wordCounts = comments.
flatMap({case (user, comment) =>
for (word <- comment.split(" ")) yield(((user, word), 1)) }).
reduceByKey(_ + _)
val output = wordCounts.
map({case ((user, word), count) => (user, (word, count))}).
groupByKey()
By Aniket, if we group by user first, it could run out of memory when spark
tries to put all words in a single sequence, couldn't it?
On Sat, Sep 19, 2015 at 11:05 PM Aniket Bhatnagar <
[email protected]> wrote:
> Using scala API, you can first group by user and then use combineByKey.
>
> Thanks,
> Aniket
>
> On Sat, Sep 19, 2015, 6:41 PM [email protected] <
> [email protected]> wrote:
>
>> Hi All,
>> I would like to achieve this below output using spark , I managed to write
>> in Hive and call it in spark but not in just spark (scala), how to group
>> word counts on particular user (column) for example.
>> Imagine users and their given tweets I want to do word count based on user
>> name.
>>
>> Input:-
>> kali A,B,A,B,B
>> james B,A,A,A,B
>>
>> Output:-
>> kali A [Count] B [Count]
>> James A [Count] B [Count]
>>
>> My Hive Answer:-
>> CREATE EXTERNAL TABLE TEST
>> (
>> user_name string ,
>> COMMENTS STRING
>>
>> ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' STORED AS TEXTFILE
>> LOCATION '/data/kali/test'; ---- HDFS FOLDER (create hdfs folder and
>> create a text file with data mentioned in the email)
>>
>> use default;select user_name,COLLECT_SET(text) from (select
>> user_name,concat(sub,' ',count(comments)) as text from test LATERAL VIEW
>> explode(split(comments,',')) subView AS sub group by user_name,sub)w group
>> by user_name;
>>
>> Spark With Hive:-
>> package com.examples
>>
>> /**
>> * Created by kalit_000 on 17/09/2015.
>> */
>> import org.apache.log4j.Logger
>> import org.apache.log4j.Level
>> import org.apache.spark.sql.SQLContext
>> import org.apache.spark.sql.hive.HiveContext
>> import org.apache.spark.{SparkContext, SparkConf}
>> import org.apache.spark.SparkContext._
>>
>>
>> object HiveWordCount {
>>
>> def main(args: Array[String]): Unit =
>> {
>> Logger.getLogger("org").setLevel(Level.WARN)
>> Logger.getLogger("akka").setLevel(Level.WARN)
>>
>> val conf = new
>>
>> SparkConf().setMaster("local").setAppName("HiveWordCount").set("spark.executor.memory",
>> "1g")
>> val sc = new SparkContext(conf)
>> val sqlContext= new SQLContext(sc)
>>
>> val hc=new HiveContext(sc)
>>
>> hc.sql("CREATE EXTERNAL TABLE IF NOT EXISTS default.TEST (user_name
>> string ,COMMENTS STRING )ROW FORMAT DELIMITED FIELDS TERMINATED BY '001'
>> STORED AS TEXTFILE LOCATION '/data/kali/test' ")
>>
>> val op=hc.sql("select user_name,COLLECT_SET(text) from (select
>> user_name,concat(sub,' ',count(comments)) as text from default.test
>> LATERAL
>> VIEW explode(split(comments,',')) subView AS sub group by user_name,sub)w
>> group by user_name")
>>
>> op.collect.foreach(println)
>>
>>
>> }
>>
>>
>>
>>
>> Thanks
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/word-count-group-by-users-in-spark-tp24748.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [email protected]
>> For additional commands, e-mail: [email protected]
>>
>>