Hi Reynold Thanks for your reply.
Accumulators are, of course, stored in the Accumulators object as thread-local variables. However, the Accumulators object isn't public, so when a Task is executing there's no way to get the set of accumulators for the current thread -- accumulators still have to be passed to every method that needs them. Additionally, unless an accumulator is explicitly referenced it won't be serialized as part of a Task, and won't make it into the Accumulators object in the first place. I should also note that what I'm proposing is not specific to Accumulators -- I am proposing that any data can be stored in a thread-local variable. I think there are probably many other use cases other than my one. Neil On Tue, Jul 22, 2014 at 5:39 AM, Reynold Xin <r...@databricks.com> wrote: > Thanks for the thoughtful email, Neil and Christopher. > > If I understand this correctly, it seems like the dynamic variable is just > a variant of the accumulator (a static one since it is a global object). > Accumulators are already implemented using thread-local variables under the > hood. Am I misunderstanding something? > > > > On Mon, Jul 21, 2014 at 5:54 PM, Christopher Nguyen <c...@adatao.com> > wrote: > > > Hi Neil, first off, I'm generally a sympathetic advocate for making > changes > > to Spark internals to make it easier/better/faster/more awesome. > > > > In this case, I'm (a) not clear about what you're trying to accomplish, > and > > (b) a bit worried about the proposed solution. > > > > On (a): it is stated that you want to pass some Accumulators around. Yet > > the proposed solution is for some "shared" variable that may be set and > > "mapped out" and possibly "reduced back", but without any accompanying > > accumulation semantics. And yet it doesn't seem like you only want just > the > > broadcast property. Can you clarify the problem statement with some > > before/after client code examples? > > > > On (b): you're right that adding variables to SparkContext should be done > > with caution, as it may have unintended consequences beyond just serdes > > payload size. For example, there is a stated intention of supporting > > multiple SparkContexts in the future, and this proposed solution can make > > it a bigger challenge to do so. Indeed, we had a gut-wrenching call to > make > > a while back on a subject related to this (see > > https://github.com/mesos/spark/pull/779). Furthermore, even in a single > > SparkContext application, there may be multiple "clients" (of that > > application) whose intent to use the proposed "SparkDynamic" would not > > necessarily be coordinated. > > > > So, considering a ratio of a/b (benefit/cost), it's not clear to me that > > the benefits are significant enough to warrant the costs. Do I > > misunderstand that the benefit is to save one explicit parameter (the > > "context") in the signature/closure code? > > > > -- > > Christopher T. Nguyen > > Co-founder & CEO, Adatao <http://adatao.com> > > linkedin.com/in/ctnguyen > > > > > > > > On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson <nfergu...@gmail.com> > > wrote: > > > > > Hi all > > > > > > I have been adding some metrics to the ADAM project > > > https://github.com/bigdatagenomics/adam, which runs on Spark, and > have a > > > proposal for an enhancement to Spark that would make this work cleaner > > and > > > easier. > > > > > > I need to pass some Accumulators around, which will aggregate metrics > > > (timing stats and other metrics) across the cluster. However, it is > > > cumbersome to have to explicitly pass some "context" containing these > > > accumulators around everywhere that might need them. I can use Scala > > > implicits, which help slightly, but I'd still need to modify every > method > > > in the call stack to take an implicit variable. > > > > > > So, I'd like to propose that we add the ability to have "dynamic > > variables" > > > (basically thread-local variables) to Spark. This would avoid having to > > > pass the Accumulators around explicitly. > > > > > > My proposed approach is to add a method to the SparkContext class as > > > follows: > > > > > > /** > > > * Sets the value of a "dynamic variable". This value is made available > > to > > > jobs > > > * without having to be passed around explicitly. During execution of a > > > Spark job > > > * this value can be obtained from the [[SparkDynamic]] object. > > > */ > > > def setDynamicVariableValue(value: Any) > > > > > > Then, when a job is executing the SparkDynamic can be accessed to > obtain > > > the value of the dynamic variable. The implementation of this object is > > as > > > follows: > > > > > > object SparkDynamic { > > > private val dynamicVariable = new DynamicVariable[Any]() > > > /** > > > * Gets the value of the "dynamic variable" that has been set in the > > > [[SparkContext]] > > > */ > > > def getValue: Option[Any] = { > > > Option(dynamicVariable.value) > > > } > > > private[spark] def withValue[S](threadValue: Option[Any])(thunk: => > > S): S > > > = { > > > dynamicVariable.withValue(threadValue.orNull)(thunk) > > > } > > > } > > > > > > The change involves modifying the Task object to serialize the value of > > the > > > dynamic variable, and modifying the TaskRunner class to deserialize the > > > value and make it available in the thread that is running the task > (using > > > the SparkDynamic.withValue method). > > > > > > I have done a quick prototype of this in this commit: > > > > > > > > > https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6 > > > and it seems to work fine in my (limited) testing. It needs more > testing, > > > tidy-up and documentation though. > > > > > > One drawback is that the dynamic variable will be serialized for every > > Task > > > whether it needs it or not. For my use case this might not be too much > > of a > > > problem, as serializing and deserializing Accumulators looks fairly > > > lightweight -- however we should certainly warn users against setting a > > > dynamic variable containing lots of data. I thought about using > broadcast > > > tables here, but I don't think it's possible to put Accumulators in a > > > broadcast table (as I understand it, they're intended for purely > > read-only > > > data). > > > > > > What do people think about this proposal? My use case aside, it seems > > like > > > it would be a generally useful enhancment to be able to pass certain > data > > > around without having to explicitly pass it everywhere. > > > > > > Neil > > > > > >