Kay -- we would like to add the read metrics (in a compatible way) into our
internal DFS at Facebook and then call that method from Spark. In parallel
if you can finish up HADOOP-11873 :) , then we could add hooks to those
metrics in Spark. What do you think? Does this look like a feasible plan to
getting the metrics in?

Thanks,
Brian

On Thu, May 12, 2016 at 12:12 PM, Steve Loughran <ste...@hortonworks.com>
wrote:

>
> On 12 May 2016, at 04:44, Brian Cho <chobr...@gmail.com> wrote:
>
> Hi Kay,
>
> Thank you for the detailed explanation.
>
> If I understand correctly, I *could* time each record processing time by
> measuring the time in reader.next, but this would add overhead for every
> single record. And this is the method that was abandoned because of
> performance regressions.
>
> The other possibility is changing HDFS first. This method looks promising
> even if it takes some time. I'll play around with it a bit for now. Thanks
> again!
>
> -Brian
>
> On Wed, May 11, 2016 at 4:45 PM, Kay Ousterhout <k...@eecs.berkeley.edu>
> wrote:
>
>> Hi Brian,
>>
>> Unfortunately it's not possible to do this in Spark for two reasons.
>> First, we read records from Spark one at a time (e.g., if you're reading a
>> HDFS file and performing some map function, one record will be read from
>> HDFS, then the map function will be applied, then the next record will be
>> read, etc.). The relevant code is here
>> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L209>:
>> we create an iterator that's then passed on to other downstream RDDs.  As a
>> result, we'd need to time each record's processing, which adds too much
>> overhead.
>>
>> The other potential issue is that we use the RecordReader interface,
>> which means that we get deserialized and decompressed records, so any time
>> we measured would include time to read the data from disk and
>> decompress/deserialize it (not sure if you're trying to isolate the disk
>> time).
>>
>
> Measuring decompression overhead alone is interesting. Indeed, with
> encryption at rest and erasure coding in hadoop, you'd think about
> isolating work there too, to see where the bottlenecks move to after a
> switch to SSDs.
>
>
>> It *is* possible to do this instrumentation for disk read time in HDFS,
>> because HDFS reads larger blocks from disk (and then passes them to Spark
>> one by one), and I did that (in a hacky way) in the most recent commits
>> in this Hadoop branch
>> <https://github.com/kayousterhout/hadoop-common/commits/2.0.2-instrumented>.
>> I filed a Hadoop JIRA
>> <https://issues.apache.org/jira/browse/HADOOP-11873>to add this (in a
>> less hacky way, using FileSystem.Statistics) but haven't submitted a patch
>> for it.  If there's sufficient interest, I could properly implement the
>> metrics and see if it could be merged into Hadoop, at which point Spark
>> could start reading those metrics (unfortunately, the delay for this would
>> be pretty significant because we'd need to wait for a new Hadoop version
>> and then a new Spark version, and it would only be available in newer
>> versions of Hadoop).
>>
>
> The metrics API changed 19 hours ago into something more sophisticated,
> though it doesn't measure timings.
>
> https://issues.apache.org/jira/browse/HADOOP-13065
>
> it's designed to be more extensible; you'll ask for a metric by name, not
> compile-time field...this will let different filesystems add different
> values
>
> A few minutes ago, https://issues.apache.org/jira/browse/HADOOP-13028 went
> in to do some metric work for spark, and there the stats can be printed in
> logs, because the filesystem and inputStream toString() operators return
> the metrics. That's for people: not machines; the text may break without
> warning. But you can at least dump the metrics in your logs to see what's
> going on. That stuff can be seen in downstream tests, but not directly
> published as metrics. The aggregate stats are also collected as metrics2
> stats, which should somehow be convertible to Coda Hale metrics, and hence
> with the rest of Spark's monitoring.
>
>
> A more straightforward action might just be for spark itself to
> subclass FilterFileSystem and implement operation timing there, both for
> operations and any input/output streams returned in create & open.
>
>

Reply via email to