Unless I am mistaken, in a group by operation, it spills to disk in case
values for a key don't fit in memory.

Thanks,
Aniket

On Mon, Sep 21, 2015 at 10:43 AM Huy Banh <huy.b...@gmail.com> wrote:

> Hi,
>
> If your input format is user -> comment, then you could:
>
> val comments = sc.parallelize(List(("u1", "one two one"), ("u2", "three
> four three")))
> val wordCounts = comments.
>    flatMap({case (user, comment) =>
>         for (word <- comment.split(" ")) yield(((user, word), 1)) }).
>    reduceByKey(_ + _)
>
> val output = wordCounts.
>    map({case ((user, word), count) => (user, (word, count))}).
>    groupByKey()
>
> By Aniket, if we group by user first, it could run out of memory when
> spark tries to put all words in a single sequence, couldn't it?
>
> On Sat, Sep 19, 2015 at 11:05 PM Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>> Using scala API, you can first group by user and then use combineByKey.
>>
>> Thanks,
>> Aniket
>>
>> On Sat, Sep 19, 2015, 6:41 PM kali.tumm...@gmail.com <
>> kali.tumm...@gmail.com> wrote:
>>
>>> Hi All,
>>> I would like to achieve this below output using spark , I managed to
>>> write
>>> in Hive and call it in spark but not in just spark (scala), how to group
>>> word counts on particular user (column) for example.
>>> Imagine users and their given tweets I want to do word count based on
>>> user
>>> name.
>>>
>>> Input:-
>>> kali    A,B,A,B,B
>>> james B,A,A,A,B
>>>
>>> Output:-
>>> kali A [Count] B [Count]
>>> James A [Count] B [Count]
>>>
>>> My Hive Answer:-
>>> CREATE EXTERNAL TABLE  TEST
>>> (
>>>      user_name                     string                   ,
>>>      COMMENTS                      STRING
>>>
>>> )  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'  STORED AS TEXTFILE
>>> LOCATION '/data/kali/test';  ----  HDFS FOLDER (create hdfs folder and
>>> create a text file with data mentioned in the email)
>>>
>>> use default;select user_name,COLLECT_SET(text) from (select
>>> user_name,concat(sub,' ',count(comments)) as text  from test LATERAL VIEW
>>> explode(split(comments,',')) subView AS sub group by user_name,sub)w
>>> group
>>> by user_name;
>>>
>>> Spark With Hive:-
>>> package com.examples
>>>
>>> /**
>>>  * Created by kalit_000 on 17/09/2015.
>>>  */
>>> import org.apache.log4j.Logger
>>> import org.apache.log4j.Level
>>> import org.apache.spark.sql.SQLContext
>>> import org.apache.spark.sql.hive.HiveContext
>>> import org.apache.spark.{SparkContext, SparkConf}
>>> import org.apache.spark.SparkContext._
>>>
>>>
>>> object HiveWordCount {
>>>
>>>   def main(args: Array[String]): Unit =
>>>   {
>>>     Logger.getLogger("org").setLevel(Level.WARN)
>>>     Logger.getLogger("akka").setLevel(Level.WARN)
>>>
>>>     val conf = new
>>>
>>> SparkConf().setMaster("local").setAppName("HiveWordCount").set("spark.executor.memory",
>>> "1g")
>>>     val sc = new SparkContext(conf)
>>>     val sqlContext= new SQLContext(sc)
>>>
>>>     val hc=new HiveContext(sc)
>>>
>>>     hc.sql("CREATE EXTERNAL TABLE IF NOT EXISTS default.TEST  (user_name
>>> string ,COMMENTS STRING )ROW FORMAT DELIMITED FIELDS TERMINATED BY '001'
>>> STORED AS TEXTFILE LOCATION '/data/kali/test' ")
>>>
>>>     val op=hc.sql("select user_name,COLLECT_SET(text) from (select
>>> user_name,concat(sub,' ',count(comments)) as text  from default.test
>>> LATERAL
>>> VIEW explode(split(comments,',')) subView AS sub group by user_name,sub)w
>>> group by user_name")
>>>
>>>     op.collect.foreach(println)
>>>
>>>
>>>   }
>>>
>>>
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/word-count-group-by-users-in-spark-tp24748.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>

Reply via email to