Hi,

I'm having trouble building a recommender and would appreciate a few
pointers.

I have 350,000,000 events which are stored in roughly 500,000 S3 files and
are formatted as semi-structured JSON. These events are not all relevant to
making recommendations.

My code is (roughly):

case class Event(id: String, eventType: String, line: JsonNode)

val raw = sc.textFile("s3n://bucket/path/dt=*/*")  // Files stored by
Hive-style daily partitions

val parsed = raw.map(json => {
    val obj = (new ObjectMapper()).readTree(json);

    Event(obj.get("_id").asText, obj.get("event").asText, obj);   // Parse
events into Event objects, keeping parse JSON around for later step
})

val downloads = parsed.filter(_.eventType == "download")

val ratings = downloads.map(event => {
    // ... extract userid and assetid (product) from JSON - code elided for
brevity ...
    Rating(userId, assetId, 1)
}).repartition(2048)

ratings.cache

val model = ALS.trainImplicit(ratings, 10, 10, 0.1, 0.8)

This gets me to a model in around 20-25 minutes, which is actually pretty
impressive. But, to get this far in a reasonable time I need to use a fair
amount of compute power. I've found I need something like 16 x c3.4xl AWS
instances for the workers (16 cores, 30 GB, SSD storage) and an r3.2xl (8
cores, 60 GB, SSD storage) for the master. Oddly, the cached Rating objects
only take a bit under 2GB of RAM.

I'm developing in a shell at the moment, started like this:

spark-shell --master yarn-client --executor-cores 16 --executor-memory 23G
--driver-memory 48G

--executor-cores: 16 because workers have 16 cores
--executor-memory: 23GB because that's about the most I can safely allocate
on a 30GB machine
--driver-memory: 48GB to make use of the memory on the driver

I found that if I didn't put the driver/master on a big box with lots of
RAM I had issues calculating the model, even though the ratings are only
taking about 2GB of RAM.

I'm also setting spark.driver.maxResultSize to 40GB.

If I don't repartition, I end up with 500,000 or so partitions (= number of
S3 files) and the model doesn't build in any reasonable timescale.

Now I've got a model, I'm trying (using 1.4.0-rc1 - I can't upgrade to
1.4.0 yet):

val recommendations = model.recommendProductsForUsers(5)
recommendations.cache
recommendations.first

This invariably crashes with various memory errors - typically GC errors,
or errors saying that I'm exceeding the "spark.akka.frameSize". Increasing
this seems to only prolong my agony.

I would appreciate any advice you can offer. Whilst I appreciate this
requires a fair amount of CPU, it also seems to need an infeasible amount
of RAM. To be honest, I probably have far too much because of limitations
around how I can size EC2 instances in order to get the CPU I need.

But I've been at this for 3 days now and still haven't actually managed to
build any recommendations...

Thanks in advance,

Danny

Reply via email to