Delaying failed task retries + giving failing tasks to different nodes

2015-04-02 Thread Stephen Merity
Hi there,

I've been using Spark for processing 33,000 gzipped files that contain
billions of JSON records (the metadata [WAT] dataset from Common Crawl).
I've hit a few issues and have not yet found the answers from the
documentation / search. This may well just be me not finding the right
pages though I promise I've attempted to RTFM thoroughly!

Is there any way to (a) ensure retry attempts are done on different nodes
and/or (b) ensure there's a delay between retrying a failing task (similar
to spark.shuffle.io.retryWait)?

Optimally when a task fails it should be given to different executors*.
This is not the case that I've seen. With maxFailures set to 16, the task
is handed back to the same executor 16 times, even though there are 89
other nodes.

The retry attempts are incredibly fast. The transient issue disappears
quickly (DNS resolution fails to an Amazon bucket) but the 16 retry
attempts take less than a second, all run on the same flaky node.

For now I've set the maxFailures to an absurdly high number and that has
worked around the issue -- the DNS error disappears on the specified
machine after ~22 seconds (~360 task attempts) -- but that's obviously
suboptimal.

Additionally, are there other options for handling node failures? Other
than maxFailures I've only seen things relating to shuffle failures? In the
one instance I've had a node lose communication, it killed the job. I'd
assumed the RDD would reconstruct. For now I've tried to work around it by
persisting to multiple machines (MEMORY_AND_DISK_SER_2).

Thanks! ^_^

-- 
Regards,
Stephen Merity
Data Scientist @ Common Crawl


GraphX for large scale PageRank (~4 billion nodes, ~128 billion edges)

2014-12-12 Thread Stephen Merity
for insanely large
graphs are coming in but I'm potentially a little early?
(b) Any improvements / optimizations for running Spark and GraphX at this
scale?
(c) Are there any other example scripts in the wild, especially of good
techniques or optimized Spark/GraphX usage? I'm new to both Scala, Spark,
and GraphX, so I'm always looking for good resources.

[1]: I think it's related to *Starting httpd: httpd: Syntax error on line
153 of /etc/httpd/conf/httpd.conf: Cannot load modules/mod_authn_alias.so
into server: /etc/httpd/modules/mod_authn_alias.so: cannot open shared
object file: No such file or directory *but I've not spent time
investigating as Ganglia is less of a concern for me right now.

[2]: java.lang.ClassCastException: java.lang.Long cannot be cast to
scala.Tuple2

org.apache.spark.graphx.impl.RoutingTableMessageSerializer$$anon$1$$anon$2.writeObject(Serializers.scala:39)

org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)

org.apache.spark.util.collection.ExternalSorter.spillToMergeableFile(ExternalSorter.scala:350)

org.apache.spark.util.collection.ExternalSorter.spill(ExternalSorter.scala:285)

org.apache.spark.util.collection.ExternalSorter.maybeSpill(ExternalSorter.scala:262)

org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:233)

org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:74)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:54)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

-- 
Regards,
Stephen Merity
Data Scientist @ Common Crawl


GraphX for large scale PageRank (~4 billion nodes, ~128 billion edges)

2014-12-12 Thread Stephen Merity
Hi!

tldr; We're looking at potentially using Spark+GraphX to compute PageRank
over a 4 billion node + 128 billion edge graph on a regular (monthly) basis,
possibly growing larger in size over time. If anyone has hints / tips /
upcoming optimizations I should test use (or wants to contribute -- we'll
pay the EC2 credits!) for running large scale graph processing with
Spark+GraphX on EC2+S3, I'd love to hear it! =]

First, I must say, I'm quite excited for the rise in the Spark ecosystem --
Spark makes life so much easier -- and it's for that very reason I'm looking
to use it for some of our processing work at the tech non-profit
http://commoncrawl.org/. To improve our crawl, we're aiming to run PageRank
monthly on our crawl archives. As we can run the whole pipeline in
Spark+GraphX, it's really quite a tempting proposition for us.

For that reason, I've been looking at replicating the existing experiments
from the "GraphX: Unifying Data-Parallel and Graph-Parallel Analytics" paper
to make sure my general setup and experimental methodology are sound before
attempting to scale up to the larger dataset. One issue is that the paper
states the hardware, but not the convergence tolerance or number of
iterations for PageRank. I've read a separate figure in a Spark presentation
(http://www.graphanalysis.org/IPDPS2014-workshop/Gonzales.pdf) that reports
68 seconds for 10 iterations, but no clue if 10 iterations are comparable.
I'm using the Spark EC2 spin up scripts, and other than some minor issues
such as Ganglia failing[1], getting a cluster running has been positive and
smooth sailing.

===

Hardware:
All experiments were done with either 16 m2.4xlarge nodes or 32 r3.xlarge
machines. That has comparable RAM and CPU to the machines used in the paper
whilst also having SSD instead of magnetic disk. I've found more machines
can work better for downloading large amounts of data from S3, where our
"target" dataset will be stored.

===

Replicating experiments:
I've been able to replicate what I believe to be the loose results for the
LiveJournal and Twitter graphs, assuming the paper was run for 10
iterations. LiveJournal runs 6 seconds per iteration and Twitter runs 47
seconds per iteration for example. I've also tested the Twitter graph with
serialization and compressed RDD (snappy) is turned on, which bumps it up to
~100 seconds per iteration.

On the larger graphs, I start hitting trouble. Using portions of the 4
billion node graph, specifically 157 million nodes + 3.6 billion edges and
326 million nodes + 9.1 billion edges, things start getting more complicated
however.

The first and most persistent is Java heap space exceptions. OpenHashSet in
EdgePartitionBuilder seems to be a common culprit -- possibly as there could
still be millions or even billions of unique nodes in a single partition.

There is also a consistent "java.lang.Long cannot be cast to scala.Tuple2"
class cast exceptions [2] produced by the SortShuffleWriter / ExternalSorter
but it's slowly getting done - primarily as some of the partitions don't
seem to need the external sort or don't trigger the error. Potentially due
to using SORT instead of HASH, but I've not tested that yet.

===

Code:
I've put the code up on GitHub under the terrible name graphx-prank.
https://github.com/Smerity/graphx-prank

The given code allows me to run large graphs locally on my laptop for
testing, whereas the GraphX PageRank example tends to fail at the memory
intensive parts. I can run LiveJournal on my underpowered laptop using my
code, for example, but not using the GraphX PageRank example.

The aim is for the code to be used as part of a pipeline at Common Crawl for
extracting a hyperlink graph, computing PageRank over it, then storing the
results to determine what we crawl next.

===

General questions and comments:

(a) Is this size of graph sane to use with GraphX yet with 50 m2.4xlarge
nodes or 100 r3.xlarge machines? I know optimizations for insanely large
graphs are coming in but I'm potentially a little early?
(b) Any improvements / optimizations for running Spark and GraphX at this
scale?
(c) Are there any other example scripts in the wild, especially of good
techniques or optimized Spark/GraphX usage? I'm new to both Scala, Spark,
and GraphX, so I'm always looking for good resources.

P.S. The email was more easily readable before I had to remove the HTML to
get it past the mailing list spam filter. Sorry.

===

[1]: I think it's related to Starting httpd: httpd: Syntax error on line 153
of /etc/httpd/conf/httpd.conf: Cannot load modules/mod_authn_alias.so into
server: /etc/httpd/modules/mod_authn_alias.so: cannot open shared object
file: No such file or directory but I've not spent time investigating as
Ganglia is less of a concern for me right now.

[2]: java.lang.ClassCastException: java.lang.Long cannot be cast to
scala.Tuple2
   
org.apache.spark.graphx.impl.RoutingTableMessageSerializer$$anon$1$$anon$2.writeObject(Serializers.scala:39)