Kay -- we would like to add the read metrics (in a compatible way) into our internal DFS at Facebook and then call that method from Spark. In parallel if you can finish up HADOOP-11873 :) , then we could add hooks to those metrics in Spark. What do you think? Does this look like a feasible plan to getting the metrics in?
Thanks, Brian On Thu, May 12, 2016 at 12:12 PM, Steve Loughran <ste...@hortonworks.com> wrote: > > On 12 May 2016, at 04:44, Brian Cho <chobr...@gmail.com> wrote: > > Hi Kay, > > Thank you for the detailed explanation. > > If I understand correctly, I *could* time each record processing time by > measuring the time in reader.next, but this would add overhead for every > single record. And this is the method that was abandoned because of > performance regressions. > > The other possibility is changing HDFS first. This method looks promising > even if it takes some time. I'll play around with it a bit for now. Thanks > again! > > -Brian > > On Wed, May 11, 2016 at 4:45 PM, Kay Ousterhout <k...@eecs.berkeley.edu> > wrote: > >> Hi Brian, >> >> Unfortunately it's not possible to do this in Spark for two reasons. >> First, we read records from Spark one at a time (e.g., if you're reading a >> HDFS file and performing some map function, one record will be read from >> HDFS, then the map function will be applied, then the next record will be >> read, etc.). The relevant code is here >> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L209>: >> we create an iterator that's then passed on to other downstream RDDs. As a >> result, we'd need to time each record's processing, which adds too much >> overhead. >> >> The other potential issue is that we use the RecordReader interface, >> which means that we get deserialized and decompressed records, so any time >> we measured would include time to read the data from disk and >> decompress/deserialize it (not sure if you're trying to isolate the disk >> time). >> > > Measuring decompression overhead alone is interesting. Indeed, with > encryption at rest and erasure coding in hadoop, you'd think about > isolating work there too, to see where the bottlenecks move to after a > switch to SSDs. > > >> It *is* possible to do this instrumentation for disk read time in HDFS, >> because HDFS reads larger blocks from disk (and then passes them to Spark >> one by one), and I did that (in a hacky way) in the most recent commits >> in this Hadoop branch >> <https://github.com/kayousterhout/hadoop-common/commits/2.0.2-instrumented>. >> I filed a Hadoop JIRA >> <https://issues.apache.org/jira/browse/HADOOP-11873>to add this (in a >> less hacky way, using FileSystem.Statistics) but haven't submitted a patch >> for it. If there's sufficient interest, I could properly implement the >> metrics and see if it could be merged into Hadoop, at which point Spark >> could start reading those metrics (unfortunately, the delay for this would >> be pretty significant because we'd need to wait for a new Hadoop version >> and then a new Spark version, and it would only be available in newer >> versions of Hadoop). >> > > The metrics API changed 19 hours ago into something more sophisticated, > though it doesn't measure timings. > > https://issues.apache.org/jira/browse/HADOOP-13065 > > it's designed to be more extensible; you'll ask for a metric by name, not > compile-time field...this will let different filesystems add different > values > > A few minutes ago, https://issues.apache.org/jira/browse/HADOOP-13028 went > in to do some metric work for spark, and there the stats can be printed in > logs, because the filesystem and inputStream toString() operators return > the metrics. That's for people: not machines; the text may break without > warning. But you can at least dump the metrics in your logs to see what's > going on. That stuff can be seen in downstream tests, but not directly > published as metrics. The aggregate stats are also collected as metrics2 > stats, which should somehow be convertible to Coda Hale metrics, and hence > with the rest of Spark's monitoring. > > > A more straightforward action might just be for spark itself to > subclass FilterFileSystem and implement operation timing there, both for > operations and any input/output streams returned in create & open. > >