Thanks for the reply. @Jonathan, I haven't worked with CUBE before. I will try to learn it. Thanks for the tip. Currently, to split the activity, I use something like this.
new_relation = FILTER relation BY activity == 'abc' or activity == 'def'; In some cases, it is a one to one mapping, but not always. To my understanding, the SPLIT keyword is doing exactly the same as the way I'm doing, correct? @Alan, I haven't tried your method. I didn't come up with the UDF way until I saw my old script is taking too much time in the map phase - scanning the source multiple times. I will try your method. I also attached my old code at the end, just in case. I set my reducer at about 90% of my reducer cap. I think this is what is recommended. It takes about 10-15 waves. My old script: source = load ...; activity_1 = FILTER source BY activity = 'abc' OR activity = 'def'; A_1 = foreach activity_1 generate dimA, dimB, userId; B_1 = distinct A_1; C_1 = group B_1 by (dimA, dimB); D_1 = foreach C_1 generate FLATTEN(group), COUNT(C_1); STORE... -- repeat for activity_1, but for other dimension combinations; activity_2 = FILTER source BY activity = 'abc'; -- repeat whatever activity_1 has been done -- repeat other activities. Thanks. T.E. On Mon, May 6, 2013 at 8:12 AM, Alan Gates <[email protected]> wrote: > In the script you gave I'd be surprised if it's spending time in the map > phase, as the map should be very simple. It's the reduce phase I'd expect > to be very expensive because your mapping UDF prevents Pig from using the > algebraic nature of count (that is, it has to ship all of the records to > reduce not just the number of records). If your file is large this will be > expensive. What happens if you switch your script to: > > A = load ... > B = foreach A generate dimA, dimB, udf.newUserIdForCategory1(userId, > activity) as userId1, ... > C = group B by dimA, dimB > D = foreach C generate flatten(group), COUNT(userId1), ... > > When you said it was taking a long time in the map phase were you trying > something like the above? If so I'd check how long your UDF is taking. > Unless you're reading tons of data on a very small cluster the above > should be very fast. It definitely should not reread the input for each > UDF. > > Other things to check: > What's your parallel count set at? That is, how many reducers are you > running? > How many waves of maps does this create? That is, what's the number of > maps this produces divided by the number of slots you get on your cluster > to run it? > > Alan. > > On May 5, 2013, at 8:11 PM, Thomas Edison wrote: > > > Hi there, > > > > I have a huge input on an HDFS and I would like to use Pig to calculate > > several unique metrics. To help explain the problem more easily, I assume > > the input file has the following schema: > > > > userId:chararray, dimensionA_key:chararray, dimensionB_key:chararray, > > dimensionC_key:chararray, activity:chararray, ... > > > > Each record represent an activity performed by that userId. > > > > Based on the value in the activity field, this activity record will be > > mapped to 1 or more categories. There are about 10 categories in total. > > > > Now I need to count the number of unique users for different dimension > > combinations (i.e. A, B, C, A+B, A+C, B+C, A+B+C) for each activity > > category. > > > > What would be the best practices to perform such calculation? > > > > I have tried several ways. Although I can get the results I want, it > takes > > a very long time (i.e. days). What I found is most of the time is spent > on > > the map phase. It looks like the script tries to load the huge input file > > every time it tries to calculate one unique count. Is there a way to > > improve this behavior? > > > > I also tried something similar to below, but it looks like it reaches the > > memory cap for a single reducer and just stuck at the last reducer step. > > > > source = load ... as (userId:chararray, dimensionA_key:chararray, > > dimensionB_key:chararray, dimensionC_key:chararray, > > activity:chararray, ...); > > a = group source by (dimensionA_key, dimensionB_key); > > b = foreach a { > > userId1 = udf.newUserIdForCategory1(userId, activity); > > -- this udf returns the original user id if the activity should be > > mapped to Category1 and None otherwise > > userId2 = udf.newUserIdForCategory2(userId, activity); > > userId3 = udf.newUserIdForCategory3(userId, activity); > > ... > > userId10 = udf.newUserIdForCategory10(userId, activity); > > generate FLATTEN(group), COUNT(userId1), COUNT(userId2), > > COUNT(userId3), ..., COUNT(userId10); > > } > > store b ...; > > > > Thanks. > > > > T.E. > >
