Am I correct in understanding that you want to read and iterate all the
data to be correct. For example if a user is already unsubscribed then you
want to ignore all the subsequent subscribe regardless

how often do you want to iterate through the full data. The frequency of
your analysis?

the issue I believe you may face as you go from t0-> t1-.tn you volume of
data is going to rise.

How about periodic storage of your analysis and working on deltas only
afterwards?

What sort of data is it? Is it typical web-users?

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 7 June 2016 at 22:54, Jeroen Miller <bluedasya...@gmail.com> wrote:

> Dear fellow Sparkers,
>
> I am a new Spark user and I am trying to solve a (conceptually simple)
> problem which may not be a good use case for Spark, at least for the RDD
> API. But before I turn my back on it, I would rather have the opinion of
> more knowledgeable developers than me, as it is highly likely that I am
> missing something.
>
> Here is my problem in a nutshell.
>
> I have numerous files where each line is an event of the form:
>
>     (subscribe|unsubscribe),<list_id>,<user_id>,<timestamp>
>
> I need to gather time-framed (for example, weekly or monthly) statistics
> of the kind:
>
>   <list_id>,
>   <num_current_users>,
>   <total_num_users_who_joined_from_dawn_of_time>,
>   <total_num_events_from_dawn_of_time>
>
> Ideally, I would need a Spark job that output these statistics for all
> time periods at once. The number of unique <list_id> is a about a few
> hundreds, the number of unique <user_id> is a few dozens of millions.
>
> The trouble is that the data is not "clean", in the sense that I can have
> 'unsubscribe' events for users that are not subscribed, or 'subscribe'
> events for users that are already subscribed.
>
> This means I have to keep in memory the complete list of
>
>     (subscribe|unsubscribe),<list_id>,<user_id>,<timestamp>
>
> keeping only the entry for the most recent <timestamp> for a given couple
> (list_id,user_id).
>
> If one is only interested in keeping the final statistics, this is
> relatively easy to do with reduceByKey and combineByKey on a properly keyed
> RDD containing all events.
>
> However I am struggling when it comes down to compute the "partial"
> statistics, as I obviously do not want to duplicate most of the
> computations done for period (i-1) when I am adding the events for period
> (i) as my reduceByKey/combineByKey approach will lead to.
>
> Sequentially, the problem is trivial: keep all events (with the latest
> 'valid' event for each couple (list_id,user_id)) in a huge hash table which
> can be used to decide whether to increment or decrement <num_current_users>
> (for example) and save the states of the current statistics whenever we are
> done dealing with period (i).
>
> I do not know how to efficiently solve this in Spark though.
>
> A naive idea would be to fetch the data for period(0) in an explicitly
> partitioned RDD (for example according to the last few characters of
> <user_id>) and proceed in a sequential fashion within a call to
> mapPartition.
>
> The trouble would then be how to process new data files for later periods.
> Suppose I store the event RDDs in an array 'data' (indexed by period
> number), all of them similarly partitioned, I am afraid something like this
> is not viable (please excuse pseudo-code):
>
>     data[0].mapPartitionWithIndex(
>
>       (index, iterator) => {
>             //
>             // 1. Initialize 'hashmap' keyed by (list_id,user_id) for the
> partition
>             //
>             val hashmap = new HashMap[(String, String), Event]
>
>             //
>             // 2. Iterate over events in data[0] rdd, update 'hashmap',
>             //    output stats for this partition and period.
>             //
>             while (iterator.hasNext) {
>                 //
>                 // Process entry, update 'hashmap', output stats
>                 // for the partition and period.
>                 //
>             }
>
>             //
>             // 3. Loop over all the periods.
>             //
>             for (period <- 1 until max) {
>                 val next = data[period].mapPartitionWithIndex(
>                     (index2, iterator2) => {
>                         if (index2 == index) {
>                             while (iterator2.hasNext) {
>                                 //
>                                 // Iterate over the elements of next (since
>                                 // the data should be on the same node, so
> no
>                                 // shuffling after the initial
> partitioning,
>                                 // right?), update 'hashmap', and output
> stats
>                                 // for this partition and period.
>                                 //
>                             }
>                         } else {
>                             iterator2
>                         }
>                     }
>                 )
>             }
>         }
>     )
>
> The trouble with this approach it that I am afraid the data files for
> period (i > 0) will be read as many times as there are partitions in
> data[0] unless I explicitly persist them maybe, which is inefficient. That
> said there is probably a (clumsy) way to unpersist them once I am sure I'm
> 100% done with them.
>
> All of this looks not only inelegant but shamefully un-spark like to me.
>
> Am I missing a trick here, maybe a well-known pattern? Are RDDs not the
> most appropriate API to handle this kind of tasks? If so, what do you
> suggest I could look into?
>
> Thank you for taking the time to read that overly long message ;-)
>
> Jeroen
>
>

Reply via email to