Hi all

I have been adding some metrics to the ADAM project
https://github.com/bigdatagenomics/adam, which runs on Spark, and have a
proposal for an enhancement to Spark that would make this work cleaner and
easier.

I need to pass some Accumulators around, which will aggregate metrics
(timing stats and other metrics) across the cluster. However, it is
cumbersome to have to explicitly pass some "context" containing these
accumulators around everywhere that might need them. I can use Scala
implicits, which help slightly, but I'd still need to modify every method
in the call stack to take an implicit variable.

So, I'd like to propose that we add the ability to have "dynamic variables"
(basically thread-local variables) to Spark. This would avoid having to
pass the Accumulators around explicitly.

My proposed approach is to add a method to the SparkContext class as
follows:

/**
 * Sets the value of a "dynamic variable". This value is made available to
jobs
 * without having to be passed around explicitly. During execution of a
Spark job
 * this value can be obtained from the [[SparkDynamic]] object.
 */
def setDynamicVariableValue(value: Any)

Then, when a job is executing the SparkDynamic can be accessed to obtain
the value of the dynamic variable. The implementation of this object is as
follows:

object SparkDynamic {
  private val dynamicVariable = new DynamicVariable[Any]()
  /**
   * Gets the value of the "dynamic variable" that has been set in the
[[SparkContext]]
   */
  def getValue: Option[Any] = {
    Option(dynamicVariable.value)
  }
  private[spark] def withValue[S](threadValue: Option[Any])(thunk: => S): S
= {
    dynamicVariable.withValue(threadValue.orNull)(thunk)
  }
}

The change involves modifying the Task object to serialize the value of the
dynamic variable, and modifying the TaskRunner class to deserialize the
value and make it available in the thread that is running the task (using
the SparkDynamic.withValue method).

I have done a quick prototype of this in this commit:
https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6
and it seems to work fine in my (limited) testing. It needs more testing,
tidy-up and documentation though.

One drawback is that the dynamic variable will be serialized for every Task
whether it needs it or not. For my use case this might not be too much of a
problem, as serializing and deserializing Accumulators looks fairly
lightweight -- however we should certainly warn users against setting a
dynamic variable containing lots of data. I thought about using broadcast
tables here, but I don't think it's possible to put Accumulators in a
broadcast table (as I understand it, they're intended for purely read-only
data).

What do people think about this proposal? My use case aside, it seems like
it would be a generally useful enhancment to be able to pass certain data
around without having to explicitly pass it everywhere.

Neil

Reply via email to