I've opened SPARK-3051 (https://issues.apache.org/jira/browse/SPARK-3051)
based on this thread.

Neil


On Thu, Jul 24, 2014 at 10:30 PM, Neil Ferguson <nfergu...@gmail.com> wrote:

> That would work well for me! Do you think it would be necessary to specify
> which accumulators should be available in the registry, or would we just
> broadcast all named accumulators registered in SparkContext and make them
> available in the registry?
>
> Anyway, I'm happy to make the necessary changes (unless someone else wants
> to).
>
>
> On Thu, Jul 24, 2014 at 10:17 PM, Patrick Wendell <pwend...@gmail.com>
> wrote:
>
>> What if we have a registry for accumulators, where you can access them
>> statically by name?
>>
>> - Patrick
>>
>> On Thu, Jul 24, 2014 at 1:51 PM, Neil Ferguson <nfergu...@gmail.com>
>> wrote:
>> > I realised that my last reply wasn't very clear -- let me try and
>> clarify.
>> >
>> > The patch for named accumulators looks very useful, however in
>> Shivaram's
>> > example he was able to retrieve the named task metrics (statically)
>> from a
>> > TaskMetrics object, as follows:
>> >
>> > TaskMetrics.get("f1-time")
>> >
>> > However, I don't think this would be possible with the named
>> accumulators
>> > -- I believe they'd need to be passed to every function that needs
>> them,
>> > which I think would be cumbersome in any application of reasonable
>> > complexity.
>> >
>> > This is what I was trying to solve with my proposal for dynamic
>> variables
>> > in Spark. However, the ability to retrieve named accumulators from a
>> > thread-local would work just as well for my use case. I'd be happy to
>> > implement either solution if there's interest.
>> >
>> > Alternatively, if I'm missing some other way to accomplish this please
>> let
>> > me know.
>> >
>> > On a (slight) aside, I now think it would be possible to implement
>> dynamic
>> > variables by broadcasting them. I was looking at Reynold's PR [1] to
>> > broadcast the RDD object, and I think it would be possible to take a
>> > similar approach -- that is, broadcast the serialized form, and
>> deserialize
>> > when executing each task.
>> >
>> > [1] https://github.com/apache/spark/pull/1498
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > On Wed, Jul 23, 2014 at 8:30 AM, Neil Ferguson <nfergu...@gmail.com>
>> wrote:
>> >
>> >> Hi Patrick.
>> >>
>> >> That looks very useful. The thing that seems to be missing from
>> Shivaram's
>> >> example is the ability to access TaskMetrics statically (this is the
>> same
>> >> problem that I am trying to solve with dynamic variables).
>> >>
>> >>
>> >>
>> >> You mention defining an accumulator on the RDD. Perhaps I am missing
>> >> something here, but my understanding was that accumulators are defined
>> in
>> >> SparkContext and are not part of the RDD. Is that correct?
>> >>
>> >> Neil
>> >>
>> >> On Tue, Jul 22, 2014 at 22:21, Patrick Wendell <pwend...@gmail.com
>> >> ="mailto:pwend...@gmail.com";>> wrote:
>> >>
>> >>> Shivaram,
>> >>>
>> >>> You should take a look at this patch which adds support for naming
>> >>> accumulators - this is likely to get merged in soon. I actually
>> >>> started this patch by supporting named TaskMetrics similar to what
>> you
>> >>> have there, but then I realized there is too much semantic overlap
>> >>> with accumulators, so I just went that route.
>> >>>
>> >>> For instance, it would be nice if any user-defined metrics are
>> >>> accessible at the driver program.
>> >>>
>> >>> https://github.com/apache/spark/pull/1309
>> >>>
>> >>> In your example, you could just define an accumulator here on the RDD
>> >>> and you'd see the incremental update in the web UI automatically.
>> >>>
>> >>> - Patrick
>> >>>
>> >>> On Tue, Jul 22, 2014 at 2:09 PM, Shivaram Venkataraman
>> >>> <shiva...@eecs.berkeley.edu> wrote:
>> >>> > From reading Neil's first e-mail, I think the motivation is to get
>> some
>> >>> > metrics in ADAM ? -- I've run into a similar use-case with having
>> >>> > user-defined metrics in long-running tasks and I think a nice way
>> to
>> >>> solve
>> >>> > this would be to have user-defined TaskMetrics.
>> >>> >
>> >>> > To state my problem more clearly, lets say you have two functions
>> you
>> >>> use
>> >>> > in a map call and want to measure how much time each of them takes.
>> For
>> >>> > example, if you have a code block like the one below and you want
>> to
>> >>> > measure how much time f1 takes as a fraction of the task.
>> >>> >
>> >>> > a.map { l =>
>> >>> > val f = f1(l)
>> >>> > ... some work here ...
>> >>> > }
>> >>> >
>> >>> > It would be really cool if we could do something like
>> >>> >
>> >>> > a.map { l =>
>> >>> > val start = System.nanoTime
>> >>> > val f = f1(l)
>> >>> > TaskMetrics.get("f1-time").add(System.nanoTime - start)
>> >>> > }
>> >>> >
>> >>> > These task metrics have a different purpose from Accumulators in
>> the
>> >>> sense
>> >>> > that we don't need to track lineage, perform commutative operations
>> >>> etc.
>> >>> > Further we also have a bunch of code in place to aggregate task
>> metrics
>> >>> > across a stage etc. So it would be great if we could also populate
>> >>> these in
>> >>> > the UI and show median/max etc.
>> >>> > I think counters [1] in Hadoop served a similar purpose.
>> >>> >
>> >>> > Thanks
>> >>> > Shivaram
>> >>> >
>> >>> > [1]
>> >>> >
>> >>>
>> https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-8/counters
>> >>> >
>> >>> >
>> >>> >
>> >>> > On Tue, Jul 22, 2014 at 1:43 PM, Neil Ferguson <nfergu...@gmail.com>
>>
>> >>> wrote:
>> >>> >
>> >>> >> Hi Reynold
>> >>> >>
>> >>> >> Thanks for your reply.
>> >>> >>
>> >>> >> Accumulators are, of course, stored in the Accumulators object as
>> >>> >> thread-local variables. However, the Accumulators object isn't
>> public,
>> >>> so
>> >>> >> when a Task is executing there's no way to get the set of
>> accumulators
>> >>> for
>> >>> >> the current thread -- accumulators still have to be passed to
>> every
>> >>> method
>> >>> >> that needs them.
>> >>> >>
>> >>> >> Additionally, unless an accumulator is explicitly referenced it
>> won't
>> >>> be
>> >>> >> serialized as part of a Task, and won't make it into the
>> Accumulators
>> >>> >> object in the first place.
>> >>> >>
>> >>> >> I should also note that what I'm proposing is not specific to
>> >>> Accumulators
>> >>> >> -- I am proposing that any data can be stored in a thread-local
>> >>> variable. I
>> >>> >> think there are probably many other use cases other than my one.
>> >>> >>
>> >>> >> Neil
>> >>> >>
>> >>> >>
>> >>> >> On Tue, Jul 22, 2014 at 5:39 AM, Reynold Xin <r...@databricks.com>
>>
>> >>> wrote:
>> >>> >>
>> >>> >> > Thanks for the thoughtful email, Neil and Christopher.
>> >>> >> >
>> >>> >> > If I understand this correctly, it seems like the dynamic
>> variable
>> >>> is
>> >>> >> just
>> >>> >> > a variant of the accumulator (a static one since it is a global
>> >>> object).
>> >>> >> > Accumulators are already implemented using thread-local
>> variables
>> >>> under
>> >>> >> the
>> >>> >> > hood. Am I misunderstanding something?
>> >>> >> >
>> >>> >> >
>> >>> >> >
>> >>> >> > On Mon, Jul 21, 2014 at 5:54 PM, Christopher Nguyen <
>> c...@adatao.com>
>> >>>
>> >>> >> > wrote:
>> >>> >> >
>> >>> >> > > Hi Neil, first off, I'm generally a sympathetic advocate for
>> >>> making
>> >>> >> > changes
>> >>> >> > > to Spark internals to make it easier/better/faster/more
>> awesome.
>> >>> >> > >
>> >>> >> > > In this case, I'm (a) not clear about what you're trying to
>> >>> accomplish,
>> >>> >> > and
>> >>> >> > > (b) a bit worried about the proposed solution.
>> >>> >> > >
>> >>> >> > > On (a): it is stated that you want to pass some Accumulators
>> >>> around.
>> >>> >> Yet
>> >>> >> > > the proposed solution is for some "shared" variable that may
>> be
>> >>> set and
>> >>> >> > > "mapped out" and possibly "reduced back", but without any
>> >>> accompanying
>> >>> >> > > accumulation semantics. And yet it doesn't seem like you only
>> want
>> >>> just
>> >>> >> > the
>> >>> >> > > broadcast property. Can you clarify the problem statement with
>> >>> some
>> >>> >> > > before/after client code examples?
>> >>> >> > >
>> >>> >> > > On (b): you're right that adding variables to SparkContext
>> should
>> >>> be
>> >>> >> done
>> >>> >> > > with caution, as it may have unintended consequences beyond
>> just
>> >>> serdes
>> >>> >> > > payload size. For example, there is a stated intention of
>> >>> supporting
>> >>> >> > > multiple SparkContexts in the future, and this proposed
>> solution
>> >>> can
>> >>> >> make
>> >>> >> > > it a bigger challenge to do so. Indeed, we had a gut-wrenching
>> >>> call to
>> >>> >> > make
>> >>> >> > > a while back on a subject related to this (see
>> >>> >> > > https://github.com/mesos/spark/pull/779). Furthermore, even
>> in a
>> >>> >> single
>> >>> >> > > SparkContext application, there may be multiple "clients" (of
>> that
>> >>> >> > > application) whose intent to use the proposed "SparkDynamic"
>> would
>> >>> not
>> >>> >> > > necessarily be coordinated.
>> >>> >> > >
>> >>> >> > > So, considering a ratio of a/b (benefit/cost), it's not clear
>> to
>> >>> me
>> >>> >> that
>> >>> >> > > the benefits are significant enough to warrant the costs. Do I
>> >>> >> > > misunderstand that the benefit is to save one explicit
>> parameter
>> >>> (the
>> >>> >> > > "context") in the signature/closure code?
>> >>> >> > >
>> >>> >> > > --
>> >>> >> > > Christopher T. Nguyen
>> >>> >> > > Co-founder & CEO, Adatao <http://adatao.com>
>> >>> >> > > linkedin.com/in/ctnguyen
>> >>> >> > >
>> >>> >> > >
>> >>> >> > >
>> >>> >> > > On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson <
>> >>> nfergu...@gmail.com>
>> >>> >> > > wrote:
>> >>> >> > >
>> >>> >> > > > Hi all
>> >>> >> > > >
>> >>> >> > > > I have been adding some metrics to the ADAM project
>> >>> >> > > > https://github.com/bigdatagenomics/adam, which runs on
>> Spark,
>> >>> and
>> >>> >> > have a
>> >>> >> > > > proposal for an enhancement to Spark that would make this
>> work
>> >>> >> cleaner
>> >>> >> > > and
>> >>> >> > > > easier.
>> >>> >> > > >
>> >>> >> > > > I need to pass some Accumulators around, which will
>> aggregate
>> >>> metrics
>> >>> >> > > > (timing stats and other metrics) across the cluster.
>> However, it
>> >>> is
>> >>> >> > > > cumbersome to have to explicitly pass some "context"
>> containing
>> >>> these
>> >>> >> > > > accumulators around everywhere that might need them. I can
>> use
>> >>> Scala
>> >>> >> > > > implicits, which help slightly, but I'd still need to modify
>> >>> every
>> >>> >> > method
>> >>> >> > > > in the call stack to take an implicit variable.
>> >>> >> > > >
>> >>> >> > > > So, I'd like to propose that we add the ability to have
>> "dynamic
>> >>> >> > > variables"
>> >>> >> > > > (basically thread-local variables) to Spark. This would
>> avoid
>> >>> having
>> >>> >> to
>> >>> >> > > > pass the Accumulators around explicitly.
>> >>> >> > > >
>> >>> >> > > > My proposed approach is to add a method to the SparkContext
>> >>> class as
>> >>> >> > > > follows:
>> >>> >> > > >
>> >>> >> > > > /**
>> >>> >> > > > * Sets the value of a "dynamic variable". This value is made
>> >>> >> available
>> >>> >> > > to
>> >>> >> > > > jobs
>> >>> >> > > > * without having to be passed around explicitly. During
>> >>> execution
>> >>> >> of a
>> >>> >> > > > Spark job
>> >>> >> > > > * this value can be obtained from the [[SparkDynamic]]
>> object.
>> >>> >> > > > */
>> >>> >> > > > def setDynamicVariableValue(value: Any)
>> >>> >> > > >
>> >>> >> > > > Then, when a job is executing the SparkDynamic can be
>> accessed
>> >>> to
>> >>> >> > obtain
>> >>> >> > > > the value of the dynamic variable. The implementation of
>> this
>> >>> object
>> >>> >> is
>> >>> >> > > as
>> >>> >> > > > follows:
>> >>> >> > > >
>> >>> >> > > > object SparkDynamic {
>> >>> >> > > > private val dynamicVariable = new DynamicVariable[Any]()
>> >>> >> > > > /**
>> >>> >> > > > * Gets the value of the "dynamic variable" that has been set
>> in
>> >>> >> the
>> >>> >> > > > [[SparkContext]]
>> >>> >> > > > */
>> >>> >> > > > def getValue: Option[Any] = {
>> >>> >> > > > Option(dynamicVariable.value)
>> >>> >> > > > }
>> >>> >> > > > private[spark] def withValue[S](threadValue:
>> Option[Any])(thunk:
>> >>> =>
>> >>> >> > > S): S
>> >>> >> > > > = {
>> >>> >> > > > dynamicVariable.withValue(threadValue.orNull)(thunk)
>> >>> >> > > > }
>> >>> >> > > > }
>> >>> >> > > >
>> >>> >> > > > The change involves modifying the Task object to serialize
>> the
>> >>> value
>> >>> >> of
>> >>> >> > > the
>> >>> >> > > > dynamic variable, and modifying the TaskRunner class to
>> >>> deserialize
>> >>> >> the
>> >>> >> > > > value and make it available in the thread that is running
>> the
>> >>> task
>> >>> >> > (using
>> >>> >> > > > the SparkDynamic.withValue method).
>> >>> >> > > >
>> >>> >> > > > I have done a quick prototype of this in this commit:
>> >>> >> > > >
>> >>> >> > > >
>> >>> >> > >
>> >>> >> >
>> >>> >>
>> >>>
>> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6
>> >>> >> > > > and it seems to work fine in my (limited) testing. It needs
>> more
>> >>> >> > testing,
>> >>> >> > > > tidy-up and documentation though.
>> >>> >> > > >
>> >>> >> > > > One drawback is that the dynamic variable will be serialized
>> for
>> >>> >> every
>> >>> >> > > Task
>> >>> >> > > > whether it needs it or not. For my use case this might not
>> be
>> >>> too
>> >>> >> much
>> >>> >> > > of a
>> >>> >> > > > problem, as serializing and deserializing Accumulators looks
>> >>> fairly
>> >>> >> > > > lightweight -- however we should certainly warn users
>> against
>> >>> >> setting a
>> >>> >> > > > dynamic variable containing lots of data. I thought about
>> using
>> >>> >> > broadcast
>> >>> >> > > > tables here, but I don't think it's possible to put
>> Accumulators
>> >>> in a
>> >>> >> > > > broadcast table (as I understand it, they're intended for
>> purely
>> >>> >> > > read-only
>> >>> >> > > > data).
>> >>> >> > > >
>> >>> >> > > > What do people think about this proposal? My use case aside,
>> it
>> >>> seems
>> >>> >> > > like
>> >>> >> > > > it would be a generally useful enhancment to be able to pass
>> >>> certain
>> >>> >> > data
>> >>> >> > > > around without having to explicitly pass it everywhere.
>> >>> >> > > >
>> >>> >> > > > Neil
>> >>> >> > > >
>> >>> >> > >
>> >>> >> >
>> >>> >>
>> >>>
>> >>
>>
>
>

Reply via email to