Thanks for the reply.

@Jonathan,
I haven't worked with CUBE before.  I will try to learn it.  Thanks for the
tip.
Currently, to split the activity, I use something like this.

new_relation = FILTER relation BY activity == 'abc' or activity == 'def';

In some cases, it is a one to one mapping, but not always.  To my
understanding, the SPLIT keyword is doing exactly the same as the way I'm
doing, correct?

@Alan,
I haven't tried your method.  I didn't come up with the UDF way until I saw
my old script is taking too much time in the map phase - scanning the
source multiple times.  I will try your method.  I also attached my old
code at the end, just in case.

I set my reducer at about 90% of my reducer cap.  I think this is what is
recommended.

It takes about 10-15 waves.

My old script:
source = load ...;

activity_1 = FILTER source BY activity = 'abc' OR activity = 'def';
A_1 = foreach activity_1 generate dimA, dimB, userId;
B_1 = distinct A_1;
C_1 = group B_1 by (dimA, dimB);
D_1 = foreach C_1 generate FLATTEN(group), COUNT(C_1);
STORE...

-- repeat for activity_1, but for other dimension combinations;


activity_2 = FILTER source BY activity = 'abc';
-- repeat whatever activity_1 has been done

-- repeat other activities.

Thanks.

T.E.


On Mon, May 6, 2013 at 8:12 AM, Alan Gates <[email protected]> wrote:

> In the script you gave I'd be surprised if it's spending time in the map
> phase, as the map should be very simple.  It's the reduce phase I'd expect
> to be very expensive because your mapping UDF prevents Pig from using the
> algebraic nature of count (that is, it has to ship all of the records to
> reduce not just the number of records).  If your file is large this will be
> expensive.  What happens if you switch your script to:
>
> A = load ...
> B = foreach A generate dimA, dimB, udf.newUserIdForCategory1(userId,
> activity) as userId1, ...
> C = group B by dimA, dimB
> D = foreach C generate flatten(group), COUNT(userId1), ...
>
> When you said it was taking a long time in the map phase were you trying
> something like the above?  If so I'd check how long your UDF is taking.
>  Unless you're reading tons of data on a very small cluster the above
> should be very fast.  It definitely should not reread the input for each
> UDF.
>
> Other things to check:
> What's your parallel count set at?  That is, how many reducers are you
> running?
> How many waves of maps does this create?  That is, what's the number of
> maps this produces divided by the number of slots you get on your cluster
> to run it?
>
> Alan.
>
> On May 5, 2013, at 8:11 PM, Thomas Edison wrote:
>
> > Hi there,
> >
> > I have a huge input on an HDFS and I would like to use Pig to calculate
> > several unique metrics. To help explain the problem more easily, I assume
> > the input file has the following schema:
> >
> > userId:chararray, dimensionA_key:chararray, dimensionB_key:chararray,
> > dimensionC_key:chararray, activity:chararray, ...
> >
> > Each record represent an activity performed by that userId.
> >
> > Based on the value in the activity field, this activity record will be
> > mapped to 1 or more categories. There are about 10 categories in total.
> >
> > Now I need to count the number of unique users for different dimension
> > combinations (i.e. A, B, C, A+B, A+C, B+C, A+B+C) for each activity
> > category.
> >
> > What would be the best practices to perform such calculation?
> >
> > I have tried several ways. Although I can get the results I want, it
> takes
> > a very long time (i.e. days). What I found is most of the time is spent
> on
> > the map phase. It looks like the script tries to load the huge input file
> > every time it tries to calculate one unique count. Is there a way to
> > improve this behavior?
> >
> > I also tried something similar to below, but it looks like it reaches the
> > memory cap for a single reducer and just stuck at the last reducer step.
> >
> > source = load ... as (userId:chararray, dimensionA_key:chararray,
> > dimensionB_key:chararray, dimensionC_key:chararray,
> > activity:chararray, ...);
> > a = group source by (dimensionA_key, dimensionB_key);
> > b = foreach a {
> >    userId1 = udf.newUserIdForCategory1(userId, activity);
> >    -- this udf returns the original user id if the activity should be
> > mapped to Category1 and None otherwise
> >    userId2 = udf.newUserIdForCategory2(userId, activity);
> >    userId3 = udf.newUserIdForCategory3(userId, activity);
> >    ...
> >    userId10 = udf.newUserIdForCategory10(userId, activity);
> >    generate FLATTEN(group), COUNT(userId1), COUNT(userId2),
> > COUNT(userId3), ..., COUNT(userId10);
> > }
> > store b ...;
> >
> > Thanks.
> >
> > T.E.
>
>

Reply via email to