On 12 May 2016, at 04:44, Brian Cho <chobr...@gmail.com<mailto:chobr...@gmail.com>> wrote:
Hi Kay, Thank you for the detailed explanation. If I understand correctly, I *could* time each record processing time by measuring the time in reader.next, but this would add overhead for every single record. And this is the method that was abandoned because of performance regressions. The other possibility is changing HDFS first. This method looks promising even if it takes some time. I'll play around with it a bit for now. Thanks again! -Brian On Wed, May 11, 2016 at 4:45 PM, Kay Ousterhout <k...@eecs.berkeley.edu<mailto:k...@eecs.berkeley.edu>> wrote: Hi Brian, Unfortunately it's not possible to do this in Spark for two reasons. First, we read records from Spark one at a time (e.g., if you're reading a HDFS file and performing some map function, one record will be read from HDFS, then the map function will be applied, then the next record will be read, etc.). The relevant code is here<https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L209>: we create an iterator that's then passed on to other downstream RDDs. As a result, we'd need to time each record's processing, which adds too much overhead. The other potential issue is that we use the RecordReader interface, which means that we get deserialized and decompressed records, so any time we measured would include time to read the data from disk and decompress/deserialize it (not sure if you're trying to isolate the disk time). Measuring decompression overhead alone is interesting. Indeed, with encryption at rest and erasure coding in hadoop, you'd think about isolating work there too, to see where the bottlenecks move to after a switch to SSDs. It *is* possible to do this instrumentation for disk read time in HDFS, because HDFS reads larger blocks from disk (and then passes them to Spark one by one), and I did that (in a hacky way) in the most recent commits in this Hadoop branch<https://github.com/kayousterhout/hadoop-common/commits/2.0.2-instrumented>. I filed a Hadoop JIRA <https://issues.apache.org/jira/browse/HADOOP-11873> to add this (in a less hacky way, using FileSystem.Statistics) but haven't submitted a patch for it. If there's sufficient interest, I could properly implement the metrics and see if it could be merged into Hadoop, at which point Spark could start reading those metrics (unfortunately, the delay for this would be pretty significant because we'd need to wait for a new Hadoop version and then a new Spark version, and it would only be available in newer versions of Hadoop). The metrics API changed 19 hours ago into something more sophisticated, though it doesn't measure timings. https://issues.apache.org/jira/browse/HADOOP-13065 it's designed to be more extensible; you'll ask for a metric by name, not compile-time field...this will let different filesystems add different values A few minutes ago, https://issues.apache.org/jira/browse/HADOOP-13028 went in to do some metric work for spark, and there the stats can be printed in logs, because the filesystem and inputStream toString() operators return the metrics. That's for people: not machines; the text may break without warning. But you can at least dump the metrics in your logs to see what's going on. That stuff can be seen in downstream tests, but not directly published as metrics. The aggregate stats are also collected as metrics2 stats, which should somehow be convertible to Coda Hale metrics, and hence with the rest of Spark's monitoring. A more straightforward action might just be for spark itself to subclass FilterFileSystem and implement operation timing there, both for operations and any input/output streams returned in create & open.