Hi, I have a 40G file which is a concatenation of multiple documents, I
want to extract two features (title and tables) from each doc, so the
program is like this:
-------------------------------------------------------------
val file = sc.textFile("/path/to/40G/file")
//file.cache() //to enable or disable cache
val titles = file.map(line => (doc_key, getTitle()) // reduce 1; here I
use text utility functions written in Java
{
}).reduceByKey(_ + _,1)
val tables = file.flatMap(line => {
for (table <- all_tables)
yield (doc_key, getTableTitle()) // reduce 2; here I use text
utility functions written in Java
}).reduceByKey(_ + _,1)
titles.saveAsTextFile("titles.out") //save_1, will trigger reduce_1
tables.saveAsTextFile("tables.out") //save_2, will trigger reduce_2
-------------------------------------------------------------
I expect that with file.cache(), (the later) reduce_2 should be faster
since it will read from cached data. However, results repeatedly shows
that, reduce_2 takes 3 min when with cache and 1.4 min without cache. Why
reading from cache does not help in this case?
Stage GUI shows that, with cache, reduce_2 always has a wave of "outlier
tasks", where the median latency is 2s but max is 1.7 min.
Metric
Min
25th percentile
Median
75th percentile
Max
Result serialization time
0 ms
0 ms
0 ms
0 ms
1 ms
Duration
0.6 s
2 s
2 s
2 s
1.7 min
But these tasks are not with a long GC pause (26 ms as shown)
173
1210
SUCCESS
PROCESS_LOCAL
localhost
2014/06/17 17:49:43
1.7 min
26 ms
9.4 KB
BTW: it is a single machine with 32 cores, 192 GB RAM, SSD, with these
lines in spark-env.sh
SPARK_WORKER_MEMORY=180g
SPARK_MEM=180g
SPARK_JAVA_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=500
-XX:MaxPermSize=256m"
Thanks,
Wei
---------------------------------
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan