Hi all I have been adding some metrics to the ADAM project https://github.com/bigdatagenomics/adam, which runs on Spark, and have a proposal for an enhancement to Spark that would make this work cleaner and easier.
I need to pass some Accumulators around, which will aggregate metrics (timing stats and other metrics) across the cluster. However, it is cumbersome to have to explicitly pass some "context" containing these accumulators around everywhere that might need them. I can use Scala implicits, which help slightly, but I'd still need to modify every method in the call stack to take an implicit variable. So, I'd like to propose that we add the ability to have "dynamic variables" (basically thread-local variables) to Spark. This would avoid having to pass the Accumulators around explicitly. My proposed approach is to add a method to the SparkContext class as follows: /** * Sets the value of a "dynamic variable". This value is made available to jobs * without having to be passed around explicitly. During execution of a Spark job * this value can be obtained from the [[SparkDynamic]] object. */ def setDynamicVariableValue(value: Any) Then, when a job is executing the SparkDynamic can be accessed to obtain the value of the dynamic variable. The implementation of this object is as follows: object SparkDynamic { private val dynamicVariable = new DynamicVariable[Any]() /** * Gets the value of the "dynamic variable" that has been set in the [[SparkContext]] */ def getValue: Option[Any] = { Option(dynamicVariable.value) } private[spark] def withValue[S](threadValue: Option[Any])(thunk: => S): S = { dynamicVariable.withValue(threadValue.orNull)(thunk) } } The change involves modifying the Task object to serialize the value of the dynamic variable, and modifying the TaskRunner class to deserialize the value and make it available in the thread that is running the task (using the SparkDynamic.withValue method). I have done a quick prototype of this in this commit: https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6 and it seems to work fine in my (limited) testing. It needs more testing, tidy-up and documentation though. One drawback is that the dynamic variable will be serialized for every Task whether it needs it or not. For my use case this might not be too much of a problem, as serializing and deserializing Accumulators looks fairly lightweight -- however we should certainly warn users against setting a dynamic variable containing lots of data. I thought about using broadcast tables here, but I don't think it's possible to put Accumulators in a broadcast table (as I understand it, they're intended for purely read-only data). What do people think about this proposal? My use case aside, it seems like it would be a generally useful enhancment to be able to pass certain data around without having to explicitly pass it everywhere. Neil