Hi Sri,
The yield() is from scala. In a for comprehension, it creates a returned
collection, and yield adds elements to that return collection.
To output strings instead of tuple data, you could use aggregateByKey,
instead of groupByKey:
val output = wordCounts.
map({case ((user, word), count) => (user, (word, count))}).
aggregateByKey("")(
seqOp = (s: String, t: (String, Int)) => s + t._1 + " [" + t._2 + "] ",
combOp = _ + _ ).
map({case (user, result) => user + " " + result})
Hope it helps,
Huy.
On Tue, Sep 22, 2015 at 2:35 AM sri hari kali charan Tummala <
[email protected]> wrote:
> Hi Huy,
>
> That worked like a charm, can we remove CompactBuffer from the output.
>
> (u2,CompactBuffer((three,2), (four,1)))
> (u1,CompactBuffer((one,2), (two,1)))
>
> what yield does in spark ? can you please explain...
>
> Thanks
> Sri
>
>
>
> On Mon, Sep 21, 2015 at 6:13 AM, Huy Banh <[email protected]> wrote:
>
>> Hi,
>>
>> If your input format is user -> comment, then you could:
>>
>> val comments = sc.parallelize(List(("u1", "one two one"), ("u2", "three
>> four three")))
>> val wordCounts = comments.
>> flatMap({case (user, comment) =>
>> for (word <- comment.split(" ")) yield(((user, word), 1)) }).
>> reduceByKey(_ + _)
>>
>> val output = wordCounts.
>> map({case ((user, word), count) => (user, (word, count))}).
>> groupByKey()
>>
>> By Aniket, if we group by user first, it could run out of memory when
>> spark tries to put all words in a single sequence, couldn't it?
>>
>> On Sat, Sep 19, 2015 at 11:05 PM Aniket Bhatnagar <
>> [email protected]> wrote:
>>
>>> Using scala API, you can first group by user and then use combineByKey.
>>>
>>> Thanks,
>>> Aniket
>>>
>>> On Sat, Sep 19, 2015, 6:41 PM [email protected] <
>>> [email protected]> wrote:
>>>
>>>> Hi All,
>>>> I would like to achieve this below output using spark , I managed to
>>>> write
>>>> in Hive and call it in spark but not in just spark (scala), how to group
>>>> word counts on particular user (column) for example.
>>>> Imagine users and their given tweets I want to do word count based on
>>>> user
>>>> name.
>>>>
>>>> Input:-
>>>> kali A,B,A,B,B
>>>> james B,A,A,A,B
>>>>
>>>> Output:-
>>>> kali A [Count] B [Count]
>>>> James A [Count] B [Count]
>>>>
>>>> My Hive Answer:-
>>>> CREATE EXTERNAL TABLE TEST
>>>> (
>>>> user_name string ,
>>>> COMMENTS STRING
>>>>
>>>> ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' STORED AS TEXTFILE
>>>> LOCATION '/data/kali/test'; ---- HDFS FOLDER (create hdfs folder and
>>>> create a text file with data mentioned in the email)
>>>>
>>>> use default;select user_name,COLLECT_SET(text) from (select
>>>> user_name,concat(sub,' ',count(comments)) as text from test LATERAL
>>>> VIEW
>>>> explode(split(comments,',')) subView AS sub group by user_name,sub)w
>>>> group
>>>> by user_name;
>>>>
>>>> Spark With Hive:-
>>>> package com.examples
>>>>
>>>> /**
>>>> * Created by kalit_000 on 17/09/2015.
>>>> */
>>>> import org.apache.log4j.Logger
>>>> import org.apache.log4j.Level
>>>> import org.apache.spark.sql.SQLContext
>>>> import org.apache.spark.sql.hive.HiveContext
>>>> import org.apache.spark.{SparkContext, SparkConf}
>>>> import org.apache.spark.SparkContext._
>>>>
>>>>
>>>> object HiveWordCount {
>>>>
>>>> def main(args: Array[String]): Unit =
>>>> {
>>>> Logger.getLogger("org").setLevel(Level.WARN)
>>>> Logger.getLogger("akka").setLevel(Level.WARN)
>>>>
>>>> val conf = new
>>>>
>>>> SparkConf().setMaster("local").setAppName("HiveWordCount").set("spark.executor.memory",
>>>> "1g")
>>>> val sc = new SparkContext(conf)
>>>> val sqlContext= new SQLContext(sc)
>>>>
>>>> val hc=new HiveContext(sc)
>>>>
>>>> hc.sql("CREATE EXTERNAL TABLE IF NOT EXISTS default.TEST (user_name
>>>> string ,COMMENTS STRING )ROW FORMAT DELIMITED FIELDS TERMINATED BY '001'
>>>> STORED AS TEXTFILE LOCATION '/data/kali/test' ")
>>>>
>>>> val op=hc.sql("select user_name,COLLECT_SET(text) from (select
>>>> user_name,concat(sub,' ',count(comments)) as text from default.test
>>>> LATERAL
>>>> VIEW explode(split(comments,',')) subView AS sub group by
>>>> user_name,sub)w
>>>> group by user_name")
>>>>
>>>> op.collect.foreach(println)
>>>>
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/word-count-group-by-users-in-spark-tp24748.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: [email protected]
>>>> For additional commands, e-mail: [email protected]
>>>>
>>>>
>
>
> --
> Thanks & Regards
> Sri Tummala
>
>