Hi Reynold

Thanks for your reply.

Accumulators are, of course, stored in the Accumulators object as
thread-local variables. However, the Accumulators object isn't public, so
when a Task is executing there's no way to get the set of accumulators for
the current thread -- accumulators still have to be passed to every method
that needs them.

Additionally, unless an accumulator is explicitly referenced it won't be
serialized as part of a Task, and won't make it into the Accumulators
object in the first place.

I should also note that what I'm proposing is not specific to Accumulators
-- I am proposing that any data can be stored in a thread-local variable. I
think there are probably many other use cases other than my one.

Neil


On Tue, Jul 22, 2014 at 5:39 AM, Reynold Xin <r...@databricks.com> wrote:

> Thanks for the thoughtful email, Neil and Christopher.
>
> If I understand this correctly, it seems like the dynamic variable is just
> a variant of the accumulator (a static one since it is a global object).
> Accumulators are already implemented using thread-local variables under the
> hood. Am I misunderstanding something?
>
>
>
> On Mon, Jul 21, 2014 at 5:54 PM, Christopher Nguyen <c...@adatao.com>
> wrote:
>
> > Hi Neil, first off, I'm generally a sympathetic advocate for making
> changes
> > to Spark internals to make it easier/better/faster/more awesome.
> >
> > In this case, I'm (a) not clear about what you're trying to accomplish,
> and
> > (b) a bit worried about the proposed solution.
> >
> > On (a): it is stated that you want to pass some Accumulators around. Yet
> > the proposed solution is for some "shared" variable that may be set and
> > "mapped out" and possibly "reduced back", but without any accompanying
> > accumulation semantics. And yet it doesn't seem like you only want just
> the
> > broadcast property. Can you clarify the problem statement with some
> > before/after client code examples?
> >
> > On (b): you're right that adding variables to SparkContext should be done
> > with caution, as it may have unintended consequences beyond just serdes
> > payload size. For example, there is a stated intention of supporting
> > multiple SparkContexts in the future, and this proposed solution can make
> > it a bigger challenge to do so. Indeed, we had a gut-wrenching call to
> make
> > a while back on a subject related to this (see
> > https://github.com/mesos/spark/pull/779). Furthermore, even in a single
> > SparkContext application, there may be multiple "clients" (of that
> > application) whose intent to use the proposed "SparkDynamic" would not
> > necessarily be coordinated.
> >
> > So, considering a ratio of a/b (benefit/cost), it's not clear to me that
> > the benefits are significant enough to warrant the costs. Do I
> > misunderstand that the benefit is to save one explicit parameter (the
> > "context") in the signature/closure code?
> >
> > --
> > Christopher T. Nguyen
> > Co-founder & CEO, Adatao <http://adatao.com>
> > linkedin.com/in/ctnguyen
> >
> >
> >
> > On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson <nfergu...@gmail.com>
> > wrote:
> >
> > > Hi all
> > >
> > > I have been adding some metrics to the ADAM project
> > > https://github.com/bigdatagenomics/adam, which runs on Spark, and
> have a
> > > proposal for an enhancement to Spark that would make this work cleaner
> > and
> > > easier.
> > >
> > > I need to pass some Accumulators around, which will aggregate metrics
> > > (timing stats and other metrics) across the cluster. However, it is
> > > cumbersome to have to explicitly pass some "context" containing these
> > > accumulators around everywhere that might need them. I can use Scala
> > > implicits, which help slightly, but I'd still need to modify every
> method
> > > in the call stack to take an implicit variable.
> > >
> > > So, I'd like to propose that we add the ability to have "dynamic
> > variables"
> > > (basically thread-local variables) to Spark. This would avoid having to
> > > pass the Accumulators around explicitly.
> > >
> > > My proposed approach is to add a method to the SparkContext class as
> > > follows:
> > >
> > > /**
> > >  * Sets the value of a "dynamic variable". This value is made available
> > to
> > > jobs
> > >  * without having to be passed around explicitly. During execution of a
> > > Spark job
> > >  * this value can be obtained from the [[SparkDynamic]] object.
> > >  */
> > > def setDynamicVariableValue(value: Any)
> > >
> > > Then, when a job is executing the SparkDynamic can be accessed to
> obtain
> > > the value of the dynamic variable. The implementation of this object is
> > as
> > > follows:
> > >
> > > object SparkDynamic {
> > >   private val dynamicVariable = new DynamicVariable[Any]()
> > >   /**
> > >    * Gets the value of the "dynamic variable" that has been set in the
> > > [[SparkContext]]
> > >    */
> > >   def getValue: Option[Any] = {
> > >     Option(dynamicVariable.value)
> > >   }
> > >   private[spark] def withValue[S](threadValue: Option[Any])(thunk: =>
> > S): S
> > > = {
> > >     dynamicVariable.withValue(threadValue.orNull)(thunk)
> > >   }
> > > }
> > >
> > > The change involves modifying the Task object to serialize the value of
> > the
> > > dynamic variable, and modifying the TaskRunner class to deserialize the
> > > value and make it available in the thread that is running the task
> (using
> > > the SparkDynamic.withValue method).
> > >
> > > I have done a quick prototype of this in this commit:
> > >
> > >
> >
> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6
> > > and it seems to work fine in my (limited) testing. It needs more
> testing,
> > > tidy-up and documentation though.
> > >
> > > One drawback is that the dynamic variable will be serialized for every
> > Task
> > > whether it needs it or not. For my use case this might not be too much
> > of a
> > > problem, as serializing and deserializing Accumulators looks fairly
> > > lightweight -- however we should certainly warn users against setting a
> > > dynamic variable containing lots of data. I thought about using
> broadcast
> > > tables here, but I don't think it's possible to put Accumulators in a
> > > broadcast table (as I understand it, they're intended for purely
> > read-only
> > > data).
> > >
> > > What do people think about this proposal? My use case aside, it seems
> > like
> > > it would be a generally useful enhancment to be able to pass certain
> data
> > > around without having to explicitly pass it everywhere.
> > >
> > > Neil
> > >
> >
>

Reply via email to