Shivaram, You should take a look at this patch which adds support for naming accumulators - this is likely to get merged in soon. I actually started this patch by supporting named TaskMetrics similar to what you have there, but then I realized there is too much semantic overlap with accumulators, so I just went that route.
For instance, it would be nice if any user-defined metrics are accessible at the driver program. https://github.com/apache/spark/pull/1309 In your example, you could just define an accumulator here on the RDD and you'd see the incremental update in the web UI automatically. - Patrick On Tue, Jul 22, 2014 at 2:09 PM, Shivaram Venkataraman <shiva...@eecs.berkeley.edu> wrote: > From reading Neil's first e-mail, I think the motivation is to get some > metrics in ADAM ? -- I've run into a similar use-case with having > user-defined metrics in long-running tasks and I think a nice way to solve > this would be to have user-defined TaskMetrics. > > To state my problem more clearly, lets say you have two functions you use > in a map call and want to measure how much time each of them takes. For > example, if you have a code block like the one below and you want to > measure how much time f1 takes as a fraction of the task. > > a.map { l => > val f = f1(l) > ... some work here ... > } > > It would be really cool if we could do something like > > a.map { l => > val start = System.nanoTime > val f = f1(l) > TaskMetrics.get("f1-time").add(System.nanoTime - start) > } > > These task metrics have a different purpose from Accumulators in the sense > that we don't need to track lineage, perform commutative operations etc. > Further we also have a bunch of code in place to aggregate task metrics > across a stage etc. So it would be great if we could also populate these in > the UI and show median/max etc. > I think counters [1] in Hadoop served a similar purpose. > > Thanks > Shivaram > > [1] > https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-8/counters > > > > On Tue, Jul 22, 2014 at 1:43 PM, Neil Ferguson <nfergu...@gmail.com> wrote: > >> Hi Reynold >> >> Thanks for your reply. >> >> Accumulators are, of course, stored in the Accumulators object as >> thread-local variables. However, the Accumulators object isn't public, so >> when a Task is executing there's no way to get the set of accumulators for >> the current thread -- accumulators still have to be passed to every method >> that needs them. >> >> Additionally, unless an accumulator is explicitly referenced it won't be >> serialized as part of a Task, and won't make it into the Accumulators >> object in the first place. >> >> I should also note that what I'm proposing is not specific to Accumulators >> -- I am proposing that any data can be stored in a thread-local variable. I >> think there are probably many other use cases other than my one. >> >> Neil >> >> >> On Tue, Jul 22, 2014 at 5:39 AM, Reynold Xin <r...@databricks.com> wrote: >> >> > Thanks for the thoughtful email, Neil and Christopher. >> > >> > If I understand this correctly, it seems like the dynamic variable is >> just >> > a variant of the accumulator (a static one since it is a global object). >> > Accumulators are already implemented using thread-local variables under >> the >> > hood. Am I misunderstanding something? >> > >> > >> > >> > On Mon, Jul 21, 2014 at 5:54 PM, Christopher Nguyen <c...@adatao.com> >> > wrote: >> > >> > > Hi Neil, first off, I'm generally a sympathetic advocate for making >> > changes >> > > to Spark internals to make it easier/better/faster/more awesome. >> > > >> > > In this case, I'm (a) not clear about what you're trying to accomplish, >> > and >> > > (b) a bit worried about the proposed solution. >> > > >> > > On (a): it is stated that you want to pass some Accumulators around. >> Yet >> > > the proposed solution is for some "shared" variable that may be set and >> > > "mapped out" and possibly "reduced back", but without any accompanying >> > > accumulation semantics. And yet it doesn't seem like you only want just >> > the >> > > broadcast property. Can you clarify the problem statement with some >> > > before/after client code examples? >> > > >> > > On (b): you're right that adding variables to SparkContext should be >> done >> > > with caution, as it may have unintended consequences beyond just serdes >> > > payload size. For example, there is a stated intention of supporting >> > > multiple SparkContexts in the future, and this proposed solution can >> make >> > > it a bigger challenge to do so. Indeed, we had a gut-wrenching call to >> > make >> > > a while back on a subject related to this (see >> > > https://github.com/mesos/spark/pull/779). Furthermore, even in a >> single >> > > SparkContext application, there may be multiple "clients" (of that >> > > application) whose intent to use the proposed "SparkDynamic" would not >> > > necessarily be coordinated. >> > > >> > > So, considering a ratio of a/b (benefit/cost), it's not clear to me >> that >> > > the benefits are significant enough to warrant the costs. Do I >> > > misunderstand that the benefit is to save one explicit parameter (the >> > > "context") in the signature/closure code? >> > > >> > > -- >> > > Christopher T. Nguyen >> > > Co-founder & CEO, Adatao <http://adatao.com> >> > > linkedin.com/in/ctnguyen >> > > >> > > >> > > >> > > On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson <nfergu...@gmail.com> >> > > wrote: >> > > >> > > > Hi all >> > > > >> > > > I have been adding some metrics to the ADAM project >> > > > https://github.com/bigdatagenomics/adam, which runs on Spark, and >> > have a >> > > > proposal for an enhancement to Spark that would make this work >> cleaner >> > > and >> > > > easier. >> > > > >> > > > I need to pass some Accumulators around, which will aggregate metrics >> > > > (timing stats and other metrics) across the cluster. However, it is >> > > > cumbersome to have to explicitly pass some "context" containing these >> > > > accumulators around everywhere that might need them. I can use Scala >> > > > implicits, which help slightly, but I'd still need to modify every >> > method >> > > > in the call stack to take an implicit variable. >> > > > >> > > > So, I'd like to propose that we add the ability to have "dynamic >> > > variables" >> > > > (basically thread-local variables) to Spark. This would avoid having >> to >> > > > pass the Accumulators around explicitly. >> > > > >> > > > My proposed approach is to add a method to the SparkContext class as >> > > > follows: >> > > > >> > > > /** >> > > > * Sets the value of a "dynamic variable". This value is made >> available >> > > to >> > > > jobs >> > > > * without having to be passed around explicitly. During execution >> of a >> > > > Spark job >> > > > * this value can be obtained from the [[SparkDynamic]] object. >> > > > */ >> > > > def setDynamicVariableValue(value: Any) >> > > > >> > > > Then, when a job is executing the SparkDynamic can be accessed to >> > obtain >> > > > the value of the dynamic variable. The implementation of this object >> is >> > > as >> > > > follows: >> > > > >> > > > object SparkDynamic { >> > > > private val dynamicVariable = new DynamicVariable[Any]() >> > > > /** >> > > > * Gets the value of the "dynamic variable" that has been set in >> the >> > > > [[SparkContext]] >> > > > */ >> > > > def getValue: Option[Any] = { >> > > > Option(dynamicVariable.value) >> > > > } >> > > > private[spark] def withValue[S](threadValue: Option[Any])(thunk: => >> > > S): S >> > > > = { >> > > > dynamicVariable.withValue(threadValue.orNull)(thunk) >> > > > } >> > > > } >> > > > >> > > > The change involves modifying the Task object to serialize the value >> of >> > > the >> > > > dynamic variable, and modifying the TaskRunner class to deserialize >> the >> > > > value and make it available in the thread that is running the task >> > (using >> > > > the SparkDynamic.withValue method). >> > > > >> > > > I have done a quick prototype of this in this commit: >> > > > >> > > > >> > > >> > >> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6 >> > > > and it seems to work fine in my (limited) testing. It needs more >> > testing, >> > > > tidy-up and documentation though. >> > > > >> > > > One drawback is that the dynamic variable will be serialized for >> every >> > > Task >> > > > whether it needs it or not. For my use case this might not be too >> much >> > > of a >> > > > problem, as serializing and deserializing Accumulators looks fairly >> > > > lightweight -- however we should certainly warn users against >> setting a >> > > > dynamic variable containing lots of data. I thought about using >> > broadcast >> > > > tables here, but I don't think it's possible to put Accumulators in a >> > > > broadcast table (as I understand it, they're intended for purely >> > > read-only >> > > > data). >> > > > >> > > > What do people think about this proposal? My use case aside, it seems >> > > like >> > > > it would be a generally useful enhancment to be able to pass certain >> > data >> > > > around without having to explicitly pass it everywhere. >> > > > >> > > > Neil >> > > > >> > > >> > >>