Shivaram,

You should take a look at this patch which adds support for naming
accumulators - this is likely to get merged in soon. I actually
started this patch by supporting named TaskMetrics similar to what you
have there, but then I realized there is too much semantic overlap
with accumulators, so I just went that route.

For instance, it would be nice if any user-defined metrics are
accessible at the driver program.

https://github.com/apache/spark/pull/1309

In your example, you could just define an accumulator here on the RDD
and you'd see the incremental update in the web UI automatically.

- Patrick

On Tue, Jul 22, 2014 at 2:09 PM, Shivaram Venkataraman
<shiva...@eecs.berkeley.edu> wrote:
> From reading Neil's first e-mail, I think the motivation is to get some
> metrics in ADAM ? --  I've run into a similar use-case with having
> user-defined metrics in long-running tasks and I think a nice way to solve
> this would be to have user-defined TaskMetrics.
>
> To state my problem more clearly, lets say you have two functions you use
> in a map call and want to measure how much time each of them takes. For
> example, if you have a code block like the one below and you want to
> measure how much time f1 takes as a fraction of the task.
>
> a.map { l =>
>    val f = f1(l)
>    ... some work here ...
> }
>
> It would be really cool if we could do something like
>
> a.map { l =>
>    val start = System.nanoTime
>    val f = f1(l)
>    TaskMetrics.get("f1-time").add(System.nanoTime - start)
> }
>
> These task metrics have a different purpose from Accumulators in the sense
> that we don't need to track lineage, perform commutative operations etc.
>  Further we also have a bunch of code in place to aggregate task metrics
> across a stage etc. So it would be great if we could also populate these in
> the UI and show median/max etc.
> I think counters [1] in Hadoop served a similar purpose.
>
> Thanks
> Shivaram
>
> [1]
> https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-8/counters
>
>
>
> On Tue, Jul 22, 2014 at 1:43 PM, Neil Ferguson <nfergu...@gmail.com> wrote:
>
>> Hi Reynold
>>
>> Thanks for your reply.
>>
>> Accumulators are, of course, stored in the Accumulators object as
>> thread-local variables. However, the Accumulators object isn't public, so
>> when a Task is executing there's no way to get the set of accumulators for
>> the current thread -- accumulators still have to be passed to every method
>> that needs them.
>>
>> Additionally, unless an accumulator is explicitly referenced it won't be
>> serialized as part of a Task, and won't make it into the Accumulators
>> object in the first place.
>>
>> I should also note that what I'm proposing is not specific to Accumulators
>> -- I am proposing that any data can be stored in a thread-local variable. I
>> think there are probably many other use cases other than my one.
>>
>> Neil
>>
>>
>> On Tue, Jul 22, 2014 at 5:39 AM, Reynold Xin <r...@databricks.com> wrote:
>>
>> > Thanks for the thoughtful email, Neil and Christopher.
>> >
>> > If I understand this correctly, it seems like the dynamic variable is
>> just
>> > a variant of the accumulator (a static one since it is a global object).
>> > Accumulators are already implemented using thread-local variables under
>> the
>> > hood. Am I misunderstanding something?
>> >
>> >
>> >
>> > On Mon, Jul 21, 2014 at 5:54 PM, Christopher Nguyen <c...@adatao.com>
>> > wrote:
>> >
>> > > Hi Neil, first off, I'm generally a sympathetic advocate for making
>> > changes
>> > > to Spark internals to make it easier/better/faster/more awesome.
>> > >
>> > > In this case, I'm (a) not clear about what you're trying to accomplish,
>> > and
>> > > (b) a bit worried about the proposed solution.
>> > >
>> > > On (a): it is stated that you want to pass some Accumulators around.
>> Yet
>> > > the proposed solution is for some "shared" variable that may be set and
>> > > "mapped out" and possibly "reduced back", but without any accompanying
>> > > accumulation semantics. And yet it doesn't seem like you only want just
>> > the
>> > > broadcast property. Can you clarify the problem statement with some
>> > > before/after client code examples?
>> > >
>> > > On (b): you're right that adding variables to SparkContext should be
>> done
>> > > with caution, as it may have unintended consequences beyond just serdes
>> > > payload size. For example, there is a stated intention of supporting
>> > > multiple SparkContexts in the future, and this proposed solution can
>> make
>> > > it a bigger challenge to do so. Indeed, we had a gut-wrenching call to
>> > make
>> > > a while back on a subject related to this (see
>> > > https://github.com/mesos/spark/pull/779). Furthermore, even in a
>> single
>> > > SparkContext application, there may be multiple "clients" (of that
>> > > application) whose intent to use the proposed "SparkDynamic" would not
>> > > necessarily be coordinated.
>> > >
>> > > So, considering a ratio of a/b (benefit/cost), it's not clear to me
>> that
>> > > the benefits are significant enough to warrant the costs. Do I
>> > > misunderstand that the benefit is to save one explicit parameter (the
>> > > "context") in the signature/closure code?
>> > >
>> > > --
>> > > Christopher T. Nguyen
>> > > Co-founder & CEO, Adatao <http://adatao.com>
>> > > linkedin.com/in/ctnguyen
>> > >
>> > >
>> > >
>> > > On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson <nfergu...@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi all
>> > > >
>> > > > I have been adding some metrics to the ADAM project
>> > > > https://github.com/bigdatagenomics/adam, which runs on Spark, and
>> > have a
>> > > > proposal for an enhancement to Spark that would make this work
>> cleaner
>> > > and
>> > > > easier.
>> > > >
>> > > > I need to pass some Accumulators around, which will aggregate metrics
>> > > > (timing stats and other metrics) across the cluster. However, it is
>> > > > cumbersome to have to explicitly pass some "context" containing these
>> > > > accumulators around everywhere that might need them. I can use Scala
>> > > > implicits, which help slightly, but I'd still need to modify every
>> > method
>> > > > in the call stack to take an implicit variable.
>> > > >
>> > > > So, I'd like to propose that we add the ability to have "dynamic
>> > > variables"
>> > > > (basically thread-local variables) to Spark. This would avoid having
>> to
>> > > > pass the Accumulators around explicitly.
>> > > >
>> > > > My proposed approach is to add a method to the SparkContext class as
>> > > > follows:
>> > > >
>> > > > /**
>> > > >  * Sets the value of a "dynamic variable". This value is made
>> available
>> > > to
>> > > > jobs
>> > > >  * without having to be passed around explicitly. During execution
>> of a
>> > > > Spark job
>> > > >  * this value can be obtained from the [[SparkDynamic]] object.
>> > > >  */
>> > > > def setDynamicVariableValue(value: Any)
>> > > >
>> > > > Then, when a job is executing the SparkDynamic can be accessed to
>> > obtain
>> > > > the value of the dynamic variable. The implementation of this object
>> is
>> > > as
>> > > > follows:
>> > > >
>> > > > object SparkDynamic {
>> > > >   private val dynamicVariable = new DynamicVariable[Any]()
>> > > >   /**
>> > > >    * Gets the value of the "dynamic variable" that has been set in
>> the
>> > > > [[SparkContext]]
>> > > >    */
>> > > >   def getValue: Option[Any] = {
>> > > >     Option(dynamicVariable.value)
>> > > >   }
>> > > >   private[spark] def withValue[S](threadValue: Option[Any])(thunk: =>
>> > > S): S
>> > > > = {
>> > > >     dynamicVariable.withValue(threadValue.orNull)(thunk)
>> > > >   }
>> > > > }
>> > > >
>> > > > The change involves modifying the Task object to serialize the value
>> of
>> > > the
>> > > > dynamic variable, and modifying the TaskRunner class to deserialize
>> the
>> > > > value and make it available in the thread that is running the task
>> > (using
>> > > > the SparkDynamic.withValue method).
>> > > >
>> > > > I have done a quick prototype of this in this commit:
>> > > >
>> > > >
>> > >
>> >
>> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6
>> > > > and it seems to work fine in my (limited) testing. It needs more
>> > testing,
>> > > > tidy-up and documentation though.
>> > > >
>> > > > One drawback is that the dynamic variable will be serialized for
>> every
>> > > Task
>> > > > whether it needs it or not. For my use case this might not be too
>> much
>> > > of a
>> > > > problem, as serializing and deserializing Accumulators looks fairly
>> > > > lightweight -- however we should certainly warn users against
>> setting a
>> > > > dynamic variable containing lots of data. I thought about using
>> > broadcast
>> > > > tables here, but I don't think it's possible to put Accumulators in a
>> > > > broadcast table (as I understand it, they're intended for purely
>> > > read-only
>> > > > data).
>> > > >
>> > > > What do people think about this proposal? My use case aside, it seems
>> > > like
>> > > > it would be a generally useful enhancment to be able to pass certain
>> > data
>> > > > around without having to explicitly pass it everywhere.
>> > > >
>> > > > Neil
>> > > >
>> > >
>> >
>>

Reply via email to