On 12 May 2016, at 04:44, Brian Cho 
<chobr...@gmail.com<mailto:chobr...@gmail.com>> wrote:

Hi Kay,

Thank you for the detailed explanation.

If I understand correctly, I *could* time each record processing time by 
measuring the time in reader.next, but this would add overhead for every single 
record. And this is the method that was abandoned because of performance 
regressions.

The other possibility is changing HDFS first. This method looks promising even 
if it takes some time. I'll play around with it a bit for now. Thanks again!

-Brian

On Wed, May 11, 2016 at 4:45 PM, Kay Ousterhout 
<k...@eecs.berkeley.edu<mailto:k...@eecs.berkeley.edu>> wrote:
Hi Brian,

Unfortunately it's not possible to do this in Spark for two reasons.  First, we 
read records from Spark one at a time (e.g., if you're reading a HDFS file and 
performing some map function, one record will be read from HDFS, then the map 
function will be applied, then the next record will be read, etc.). The 
relevant code is 
here<https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L209>:
 we create an iterator that's then passed on to other downstream RDDs.  As a 
result, we'd need to time each record's processing, which adds too much 
overhead.

The other potential issue is that we use the RecordReader interface, which 
means that we get deserialized and decompressed records, so any time we 
measured would include time to read the data from disk and 
decompress/deserialize it (not sure if you're trying to isolate the disk time).

Measuring decompression overhead alone is interesting. Indeed, with encryption 
at rest and erasure coding in hadoop, you'd think about isolating work there 
too, to see where the bottlenecks move to after a switch to SSDs.


It *is* possible to do this instrumentation for disk read time in HDFS, because 
HDFS reads larger blocks from disk (and then passes them to Spark one by one), 
and I did that (in a hacky way) in the most recent commits in this Hadoop 
branch<https://github.com/kayousterhout/hadoop-common/commits/2.0.2-instrumented>.
  I filed a Hadoop JIRA <https://issues.apache.org/jira/browse/HADOOP-11873> to 
add this (in a less hacky way, using FileSystem.Statistics) but haven't 
submitted a patch for it.  If there's sufficient interest, I could properly 
implement the metrics and see if it could be merged into Hadoop, at which point 
Spark could start reading those metrics (unfortunately, the delay for this 
would be pretty significant because we'd need to wait for a new Hadoop version 
and then a new Spark version, and it would only be available in newer versions 
of Hadoop).

The metrics API changed 19 hours ago into something more sophisticated, though 
it doesn't measure timings.

https://issues.apache.org/jira/browse/HADOOP-13065

it's designed to be more extensible; you'll ask for a metric by name, not 
compile-time field...this will let different filesystems add different values

A few minutes ago, https://issues.apache.org/jira/browse/HADOOP-13028 went in 
to do some metric work for spark, and there the stats can be printed in logs, 
because the filesystem and inputStream toString() operators return the metrics. 
That's for people: not machines; the text may break without warning. But you 
can at least dump the metrics in your logs to see what's going on. That stuff 
can be seen in downstream tests, but not directly published as metrics. The 
aggregate stats are also collected as metrics2 stats, which should somehow be 
convertible to Coda Hale metrics, and hence with the rest of Spark's monitoring.


A more straightforward action might just be for spark itself to subclass 
FilterFileSystem and implement operation timing there, both for operations and 
any input/output streams returned in create & open.

Reply via email to