@Alan
I just tried your method as shown below.  The script is stuck at the last
reducer even for a relative small set of the data and less combinations.  I
suspect it's an out of memory issue.  If I remember correctly, to use
nested foreach to calculate the unique counts is not a very good idea.  Any
suggestions?  Thanks.

T.E.

source = load ...;
A = foreach each source generate dimA, dimB, userId,
    udf.getActivity1UserId(userId, activity) as activity1_userId,
    udf.getActivity2UserId(userId, activity) as activity2_userId,
    udf.getActivity3UserId(userId, activity) as activity3_userId,
    ...
    udf.getActivity10UserId(userId, activity) as activity10_userId;

B = group A by (dimA, dimB);

C = foreach B {
    unique_activity1 = distinct A.activity1_userId;
    unique_activity2 = distinct A.activity2_userId;
    unique_activity3 = distinct A.activity3_userId;
    ...
    unique_activity10 = distinct A.activity10_userId;
    generate FLATTEN(group), COUNT(unique_activity1),
COUNT(unique_activity2), COUNT(unique_activity3), ...,
COUNT(unique_activity10);
}

STORE C...;


On Mon, May 6, 2013 at 8:41 AM, Thomas Edison <
[email protected]> wrote:

> Thanks for the reply.
>
> @Jonathan,
> I haven't worked with CUBE before.  I will try to learn it.  Thanks for
> the tip.
> Currently, to split the activity, I use something like this.
>
> new_relation = FILTER relation BY activity == 'abc' or activity == 'def';
>
> In some cases, it is a one to one mapping, but not always.  To my
> understanding, the SPLIT keyword is doing exactly the same as the way I'm
> doing, correct?
>
> @Alan,
> I haven't tried your method.  I didn't come up with the UDF way until I
> saw my old script is taking too much time in the map phase - scanning the
> source multiple times.  I will try your method.  I also attached my old
> code at the end, just in case.
>
> I set my reducer at about 90% of my reducer cap.  I think this is what is
> recommended.
>
> It takes about 10-15 waves.
>
> My old script:
> source = load ...;
>
> activity_1 = FILTER source BY activity = 'abc' OR activity = 'def';
> A_1 = foreach activity_1 generate dimA, dimB, userId;
> B_1 = distinct A_1;
> C_1 = group B_1 by (dimA, dimB);
> D_1 = foreach C_1 generate FLATTEN(group), COUNT(C_1);
> STORE...
>
> -- repeat for activity_1, but for other dimension combinations;
>
>
> activity_2 = FILTER source BY activity = 'abc';
> -- repeat whatever activity_1 has been done
>
> -- repeat other activities.
>
> Thanks.
>
> T.E.
>
>
> On Mon, May 6, 2013 at 8:12 AM, Alan Gates <[email protected]> wrote:
>
>> In the script you gave I'd be surprised if it's spending time in the map
>> phase, as the map should be very simple.  It's the reduce phase I'd expect
>> to be very expensive because your mapping UDF prevents Pig from using the
>> algebraic nature of count (that is, it has to ship all of the records to
>> reduce not just the number of records).  If your file is large this will be
>> expensive.  What happens if you switch your script to:
>>
>> A = load ...
>> B = foreach A generate dimA, dimB, udf.newUserIdForCategory1(userId,
>> activity) as userId1, ...
>> C = group B by dimA, dimB
>> D = foreach C generate flatten(group), COUNT(userId1), ...
>>
>> When you said it was taking a long time in the map phase were you trying
>> something like the above?  If so I'd check how long your UDF is taking.
>>  Unless you're reading tons of data on a very small cluster the above
>> should be very fast.  It definitely should not reread the input for each
>> UDF.
>>
>> Other things to check:
>> What's your parallel count set at?  That is, how many reducers are you
>> running?
>> How many waves of maps does this create?  That is, what's the number of
>> maps this produces divided by the number of slots you get on your cluster
>> to run it?
>>
>> Alan.
>>
>> On May 5, 2013, at 8:11 PM, Thomas Edison wrote:
>>
>> > Hi there,
>> >
>> > I have a huge input on an HDFS and I would like to use Pig to calculate
>> > several unique metrics. To help explain the problem more easily, I
>> assume
>> > the input file has the following schema:
>> >
>> > userId:chararray, dimensionA_key:chararray, dimensionB_key:chararray,
>> > dimensionC_key:chararray, activity:chararray, ...
>> >
>> > Each record represent an activity performed by that userId.
>> >
>> > Based on the value in the activity field, this activity record will be
>> > mapped to 1 or more categories. There are about 10 categories in total.
>> >
>> > Now I need to count the number of unique users for different dimension
>> > combinations (i.e. A, B, C, A+B, A+C, B+C, A+B+C) for each activity
>> > category.
>> >
>> > What would be the best practices to perform such calculation?
>> >
>> > I have tried several ways. Although I can get the results I want, it
>> takes
>> > a very long time (i.e. days). What I found is most of the time is spent
>> on
>> > the map phase. It looks like the script tries to load the huge input
>> file
>> > every time it tries to calculate one unique count. Is there a way to
>> > improve this behavior?
>> >
>> > I also tried something similar to below, but it looks like it reaches
>> the
>> > memory cap for a single reducer and just stuck at the last reducer step.
>> >
>> > source = load ... as (userId:chararray, dimensionA_key:chararray,
>> > dimensionB_key:chararray, dimensionC_key:chararray,
>> > activity:chararray, ...);
>> > a = group source by (dimensionA_key, dimensionB_key);
>> > b = foreach a {
>> >    userId1 = udf.newUserIdForCategory1(userId, activity);
>> >    -- this udf returns the original user id if the activity should be
>> > mapped to Category1 and None otherwise
>> >    userId2 = udf.newUserIdForCategory2(userId, activity);
>> >    userId3 = udf.newUserIdForCategory3(userId, activity);
>> >    ...
>> >    userId10 = udf.newUserIdForCategory10(userId, activity);
>> >    generate FLATTEN(group), COUNT(userId1), COUNT(userId2),
>> > COUNT(userId3), ..., COUNT(userId10);
>> > }
>> > store b ...;
>> >
>> > Thanks.
>> >
>> > T.E.
>>
>>
>

Reply via email to