What if we have a registry for accumulators, where you can access them
statically by name?

- Patrick

On Thu, Jul 24, 2014 at 1:51 PM, Neil Ferguson <nfergu...@gmail.com> wrote:
> I realised that my last reply wasn't very clear -- let me try and clarify.
>
> The patch for named accumulators looks very useful, however in Shivaram's
> example he was able to retrieve the named task metrics (statically) from a
> TaskMetrics object, as follows:
>
> TaskMetrics.get("f1-time")
>
> However, I don't think this would be possible with the named accumulators
> -- I believe they'd need to be passed to every function that needs them,
> which I think would be cumbersome in any application of reasonable
> complexity.
>
> This is what I was trying to solve with my proposal for dynamic variables
> in Spark. However, the ability to retrieve named accumulators from a
> thread-local would work just as well for my use case. I'd be happy to
> implement either solution if there's interest.
>
> Alternatively, if I'm missing some other way to accomplish this please let
> me know.
>
> On a (slight) aside, I now think it would be possible to implement dynamic
> variables by broadcasting them. I was looking at Reynold's PR [1] to
> broadcast the RDD object, and I think it would be possible to take a
> similar approach -- that is, broadcast the serialized form, and deserialize
> when executing each task.
>
> [1] https://github.com/apache/spark/pull/1498
>
>
>
>
>
>
>
> On Wed, Jul 23, 2014 at 8:30 AM, Neil Ferguson <nfergu...@gmail.com> wrote:
>
>>  Hi Patrick.
>>
>> That looks very useful. The thing that seems to be missing from Shivaram's
>> example is the ability to access TaskMetrics statically (this is the same
>> problem that I am trying to solve with dynamic variables).
>>
>>
>>
>> You mention defining an accumulator on the RDD. Perhaps I am missing
>> something here, but my understanding was that accumulators are defined in
>> SparkContext and are not part of the RDD. Is that correct?
>>
>> Neil
>>
>> On Tue, Jul 22, 2014 at 22:21, Patrick Wendell <pwend...@gmail.com
>> ="mailto:pwend...@gmail.com";>> wrote:
>>
>>> Shivaram,
>>>
>>> You should take a look at this patch which adds support for naming
>>> accumulators - this is likely to get merged in soon. I actually
>>> started this patch by supporting named TaskMetrics similar to what you
>>> have there, but then I realized there is too much semantic overlap
>>> with accumulators, so I just went that route.
>>>
>>> For instance, it would be nice if any user-defined metrics are
>>> accessible at the driver program.
>>>
>>> https://github.com/apache/spark/pull/1309
>>>
>>> In your example, you could just define an accumulator here on the RDD
>>> and you'd see the incremental update in the web UI automatically.
>>>
>>> - Patrick
>>>
>>> On Tue, Jul 22, 2014 at 2:09 PM, Shivaram Venkataraman
>>> <shiva...@eecs.berkeley.edu> wrote:
>>> > From reading Neil's first e-mail, I think the motivation is to get some
>>> > metrics in ADAM ? -- I've run into a similar use-case with having
>>> > user-defined metrics in long-running tasks and I think a nice way to
>>> solve
>>> > this would be to have user-defined TaskMetrics.
>>> >
>>> > To state my problem more clearly, lets say you have two functions you
>>> use
>>> > in a map call and want to measure how much time each of them takes. For
>>> > example, if you have a code block like the one below and you want to
>>> > measure how much time f1 takes as a fraction of the task.
>>> >
>>> > a.map { l =>
>>> > val f = f1(l)
>>> > ... some work here ...
>>> > }
>>> >
>>> > It would be really cool if we could do something like
>>> >
>>> > a.map { l =>
>>> > val start = System.nanoTime
>>> > val f = f1(l)
>>> > TaskMetrics.get("f1-time").add(System.nanoTime - start)
>>> > }
>>> >
>>> > These task metrics have a different purpose from Accumulators in the
>>> sense
>>> > that we don't need to track lineage, perform commutative operations
>>> etc.
>>> > Further we also have a bunch of code in place to aggregate task metrics
>>> > across a stage etc. So it would be great if we could also populate
>>> these in
>>> > the UI and show median/max etc.
>>> > I think counters [1] in Hadoop served a similar purpose.
>>> >
>>> > Thanks
>>> > Shivaram
>>> >
>>> > [1]
>>> >
>>> https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-8/counters
>>> >
>>> >
>>> >
>>> > On Tue, Jul 22, 2014 at 1:43 PM, Neil Ferguson <nfergu...@gmail.com>
>>> wrote:
>>> >
>>> >> Hi Reynold
>>> >>
>>> >> Thanks for your reply.
>>> >>
>>> >> Accumulators are, of course, stored in the Accumulators object as
>>> >> thread-local variables. However, the Accumulators object isn't public,
>>> so
>>> >> when a Task is executing there's no way to get the set of accumulators
>>> for
>>> >> the current thread -- accumulators still have to be passed to every
>>> method
>>> >> that needs them.
>>> >>
>>> >> Additionally, unless an accumulator is explicitly referenced it won't
>>> be
>>> >> serialized as part of a Task, and won't make it into the Accumulators
>>> >> object in the first place.
>>> >>
>>> >> I should also note that what I'm proposing is not specific to
>>> Accumulators
>>> >> -- I am proposing that any data can be stored in a thread-local
>>> variable. I
>>> >> think there are probably many other use cases other than my one.
>>> >>
>>> >> Neil
>>> >>
>>> >>
>>> >> On Tue, Jul 22, 2014 at 5:39 AM, Reynold Xin <r...@databricks.com>
>>> wrote:
>>> >>
>>> >> > Thanks for the thoughtful email, Neil and Christopher.
>>> >> >
>>> >> > If I understand this correctly, it seems like the dynamic variable
>>> is
>>> >> just
>>> >> > a variant of the accumulator (a static one since it is a global
>>> object).
>>> >> > Accumulators are already implemented using thread-local variables
>>> under
>>> >> the
>>> >> > hood. Am I misunderstanding something?
>>> >> >
>>> >> >
>>> >> >
>>> >> > On Mon, Jul 21, 2014 at 5:54 PM, Christopher Nguyen <c...@adatao.com>
>>>
>>> >> > wrote:
>>> >> >
>>> >> > > Hi Neil, first off, I'm generally a sympathetic advocate for
>>> making
>>> >> > changes
>>> >> > > to Spark internals to make it easier/better/faster/more awesome.
>>> >> > >
>>> >> > > In this case, I'm (a) not clear about what you're trying to
>>> accomplish,
>>> >> > and
>>> >> > > (b) a bit worried about the proposed solution.
>>> >> > >
>>> >> > > On (a): it is stated that you want to pass some Accumulators
>>> around.
>>> >> Yet
>>> >> > > the proposed solution is for some "shared" variable that may be
>>> set and
>>> >> > > "mapped out" and possibly "reduced back", but without any
>>> accompanying
>>> >> > > accumulation semantics. And yet it doesn't seem like you only want
>>> just
>>> >> > the
>>> >> > > broadcast property. Can you clarify the problem statement with
>>> some
>>> >> > > before/after client code examples?
>>> >> > >
>>> >> > > On (b): you're right that adding variables to SparkContext should
>>> be
>>> >> done
>>> >> > > with caution, as it may have unintended consequences beyond just
>>> serdes
>>> >> > > payload size. For example, there is a stated intention of
>>> supporting
>>> >> > > multiple SparkContexts in the future, and this proposed solution
>>> can
>>> >> make
>>> >> > > it a bigger challenge to do so. Indeed, we had a gut-wrenching
>>> call to
>>> >> > make
>>> >> > > a while back on a subject related to this (see
>>> >> > > https://github.com/mesos/spark/pull/779). Furthermore, even in a
>>> >> single
>>> >> > > SparkContext application, there may be multiple "clients" (of that
>>> >> > > application) whose intent to use the proposed "SparkDynamic" would
>>> not
>>> >> > > necessarily be coordinated.
>>> >> > >
>>> >> > > So, considering a ratio of a/b (benefit/cost), it's not clear to
>>> me
>>> >> that
>>> >> > > the benefits are significant enough to warrant the costs. Do I
>>> >> > > misunderstand that the benefit is to save one explicit parameter
>>> (the
>>> >> > > "context") in the signature/closure code?
>>> >> > >
>>> >> > > --
>>> >> > > Christopher T. Nguyen
>>> >> > > Co-founder & CEO, Adatao <http://adatao.com>
>>> >> > > linkedin.com/in/ctnguyen
>>> >> > >
>>> >> > >
>>> >> > >
>>> >> > > On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson <
>>> nfergu...@gmail.com>
>>> >> > > wrote:
>>> >> > >
>>> >> > > > Hi all
>>> >> > > >
>>> >> > > > I have been adding some metrics to the ADAM project
>>> >> > > > https://github.com/bigdatagenomics/adam, which runs on Spark,
>>> and
>>> >> > have a
>>> >> > > > proposal for an enhancement to Spark that would make this work
>>> >> cleaner
>>> >> > > and
>>> >> > > > easier.
>>> >> > > >
>>> >> > > > I need to pass some Accumulators around, which will aggregate
>>> metrics
>>> >> > > > (timing stats and other metrics) across the cluster. However, it
>>> is
>>> >> > > > cumbersome to have to explicitly pass some "context" containing
>>> these
>>> >> > > > accumulators around everywhere that might need them. I can use
>>> Scala
>>> >> > > > implicits, which help slightly, but I'd still need to modify
>>> every
>>> >> > method
>>> >> > > > in the call stack to take an implicit variable.
>>> >> > > >
>>> >> > > > So, I'd like to propose that we add the ability to have "dynamic
>>> >> > > variables"
>>> >> > > > (basically thread-local variables) to Spark. This would avoid
>>> having
>>> >> to
>>> >> > > > pass the Accumulators around explicitly.
>>> >> > > >
>>> >> > > > My proposed approach is to add a method to the SparkContext
>>> class as
>>> >> > > > follows:
>>> >> > > >
>>> >> > > > /**
>>> >> > > > * Sets the value of a "dynamic variable". This value is made
>>> >> available
>>> >> > > to
>>> >> > > > jobs
>>> >> > > > * without having to be passed around explicitly. During
>>> execution
>>> >> of a
>>> >> > > > Spark job
>>> >> > > > * this value can be obtained from the [[SparkDynamic]] object.
>>> >> > > > */
>>> >> > > > def setDynamicVariableValue(value: Any)
>>> >> > > >
>>> >> > > > Then, when a job is executing the SparkDynamic can be accessed
>>> to
>>> >> > obtain
>>> >> > > > the value of the dynamic variable. The implementation of this
>>> object
>>> >> is
>>> >> > > as
>>> >> > > > follows:
>>> >> > > >
>>> >> > > > object SparkDynamic {
>>> >> > > > private val dynamicVariable = new DynamicVariable[Any]()
>>> >> > > > /**
>>> >> > > > * Gets the value of the "dynamic variable" that has been set in
>>> >> the
>>> >> > > > [[SparkContext]]
>>> >> > > > */
>>> >> > > > def getValue: Option[Any] = {
>>> >> > > > Option(dynamicVariable.value)
>>> >> > > > }
>>> >> > > > private[spark] def withValue[S](threadValue: Option[Any])(thunk:
>>> =>
>>> >> > > S): S
>>> >> > > > = {
>>> >> > > > dynamicVariable.withValue(threadValue.orNull)(thunk)
>>> >> > > > }
>>> >> > > > }
>>> >> > > >
>>> >> > > > The change involves modifying the Task object to serialize the
>>> value
>>> >> of
>>> >> > > the
>>> >> > > > dynamic variable, and modifying the TaskRunner class to
>>> deserialize
>>> >> the
>>> >> > > > value and make it available in the thread that is running the
>>> task
>>> >> > (using
>>> >> > > > the SparkDynamic.withValue method).
>>> >> > > >
>>> >> > > > I have done a quick prototype of this in this commit:
>>> >> > > >
>>> >> > > >
>>> >> > >
>>> >> >
>>> >>
>>> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6
>>> >> > > > and it seems to work fine in my (limited) testing. It needs more
>>> >> > testing,
>>> >> > > > tidy-up and documentation though.
>>> >> > > >
>>> >> > > > One drawback is that the dynamic variable will be serialized for
>>> >> every
>>> >> > > Task
>>> >> > > > whether it needs it or not. For my use case this might not be
>>> too
>>> >> much
>>> >> > > of a
>>> >> > > > problem, as serializing and deserializing Accumulators looks
>>> fairly
>>> >> > > > lightweight -- however we should certainly warn users against
>>> >> setting a
>>> >> > > > dynamic variable containing lots of data. I thought about using
>>> >> > broadcast
>>> >> > > > tables here, but I don't think it's possible to put Accumulators
>>> in a
>>> >> > > > broadcast table (as I understand it, they're intended for purely
>>> >> > > read-only
>>> >> > > > data).
>>> >> > > >
>>> >> > > > What do people think about this proposal? My use case aside, it
>>> seems
>>> >> > > like
>>> >> > > > it would be a generally useful enhancment to be able to pass
>>> certain
>>> >> > data
>>> >> > > > around without having to explicitly pass it everywhere.
>>> >> > > >
>>> >> > > > Neil
>>> >> > > >
>>> >> > >
>>> >> >
>>> >>
>>>
>>

Reply via email to