Unless I am mistaken, in a group by operation, it spills to disk in case values for a key don't fit in memory.
Thanks, Aniket On Mon, Sep 21, 2015 at 10:43 AM Huy Banh <huy.b...@gmail.com> wrote: > Hi, > > If your input format is user -> comment, then you could: > > val comments = sc.parallelize(List(("u1", "one two one"), ("u2", "three > four three"))) > val wordCounts = comments. > flatMap({case (user, comment) => > for (word <- comment.split(" ")) yield(((user, word), 1)) }). > reduceByKey(_ + _) > > val output = wordCounts. > map({case ((user, word), count) => (user, (word, count))}). > groupByKey() > > By Aniket, if we group by user first, it could run out of memory when > spark tries to put all words in a single sequence, couldn't it? > > On Sat, Sep 19, 2015 at 11:05 PM Aniket Bhatnagar < > aniket.bhatna...@gmail.com> wrote: > >> Using scala API, you can first group by user and then use combineByKey. >> >> Thanks, >> Aniket >> >> On Sat, Sep 19, 2015, 6:41 PM kali.tumm...@gmail.com < >> kali.tumm...@gmail.com> wrote: >> >>> Hi All, >>> I would like to achieve this below output using spark , I managed to >>> write >>> in Hive and call it in spark but not in just spark (scala), how to group >>> word counts on particular user (column) for example. >>> Imagine users and their given tweets I want to do word count based on >>> user >>> name. >>> >>> Input:- >>> kali A,B,A,B,B >>> james B,A,A,A,B >>> >>> Output:- >>> kali A [Count] B [Count] >>> James A [Count] B [Count] >>> >>> My Hive Answer:- >>> CREATE EXTERNAL TABLE TEST >>> ( >>> user_name string , >>> COMMENTS STRING >>> >>> ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' STORED AS TEXTFILE >>> LOCATION '/data/kali/test'; ---- HDFS FOLDER (create hdfs folder and >>> create a text file with data mentioned in the email) >>> >>> use default;select user_name,COLLECT_SET(text) from (select >>> user_name,concat(sub,' ',count(comments)) as text from test LATERAL VIEW >>> explode(split(comments,',')) subView AS sub group by user_name,sub)w >>> group >>> by user_name; >>> >>> Spark With Hive:- >>> package com.examples >>> >>> /** >>> * Created by kalit_000 on 17/09/2015. >>> */ >>> import org.apache.log4j.Logger >>> import org.apache.log4j.Level >>> import org.apache.spark.sql.SQLContext >>> import org.apache.spark.sql.hive.HiveContext >>> import org.apache.spark.{SparkContext, SparkConf} >>> import org.apache.spark.SparkContext._ >>> >>> >>> object HiveWordCount { >>> >>> def main(args: Array[String]): Unit = >>> { >>> Logger.getLogger("org").setLevel(Level.WARN) >>> Logger.getLogger("akka").setLevel(Level.WARN) >>> >>> val conf = new >>> >>> SparkConf().setMaster("local").setAppName("HiveWordCount").set("spark.executor.memory", >>> "1g") >>> val sc = new SparkContext(conf) >>> val sqlContext= new SQLContext(sc) >>> >>> val hc=new HiveContext(sc) >>> >>> hc.sql("CREATE EXTERNAL TABLE IF NOT EXISTS default.TEST (user_name >>> string ,COMMENTS STRING )ROW FORMAT DELIMITED FIELDS TERMINATED BY '001' >>> STORED AS TEXTFILE LOCATION '/data/kali/test' ") >>> >>> val op=hc.sql("select user_name,COLLECT_SET(text) from (select >>> user_name,concat(sub,' ',count(comments)) as text from default.test >>> LATERAL >>> VIEW explode(split(comments,',')) subView AS sub group by user_name,sub)w >>> group by user_name") >>> >>> op.collect.foreach(println) >>> >>> >>> } >>> >>> >>> >>> >>> Thanks >>> >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/word-count-group-by-users-in-spark-tp24748.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>>