I realised that my last reply wasn't very clear -- let me try and clarify. The patch for named accumulators looks very useful, however in Shivaram's example he was able to retrieve the named task metrics (statically) from a TaskMetrics object, as follows:
TaskMetrics.get("f1-time") However, I don't think this would be possible with the named accumulators -- I believe they'd need to be passed to every function that needs them, which I think would be cumbersome in any application of reasonable complexity. This is what I was trying to solve with my proposal for dynamic variables in Spark. However, the ability to retrieve named accumulators from a thread-local would work just as well for my use case. I'd be happy to implement either solution if there's interest. Alternatively, if I'm missing some other way to accomplish this please let me know. On a (slight) aside, I now think it would be possible to implement dynamic variables by broadcasting them. I was looking at Reynold's PR [1] to broadcast the RDD object, and I think it would be possible to take a similar approach -- that is, broadcast the serialized form, and deserialize when executing each task. [1] https://github.com/apache/spark/pull/1498 On Wed, Jul 23, 2014 at 8:30 AM, Neil Ferguson <nfergu...@gmail.com> wrote: > Hi Patrick. > > That looks very useful. The thing that seems to be missing from Shivaram's > example is the ability to access TaskMetrics statically (this is the same > problem that I am trying to solve with dynamic variables). > > > > You mention defining an accumulator on the RDD. Perhaps I am missing > something here, but my understanding was that accumulators are defined in > SparkContext and are not part of the RDD. Is that correct? > > Neil > > On Tue, Jul 22, 2014 at 22:21, Patrick Wendell <pwend...@gmail.com > ="mailto:pwend...@gmail.com">> wrote: > >> Shivaram, >> >> You should take a look at this patch which adds support for naming >> accumulators - this is likely to get merged in soon. I actually >> started this patch by supporting named TaskMetrics similar to what you >> have there, but then I realized there is too much semantic overlap >> with accumulators, so I just went that route. >> >> For instance, it would be nice if any user-defined metrics are >> accessible at the driver program. >> >> https://github.com/apache/spark/pull/1309 >> >> In your example, you could just define an accumulator here on the RDD >> and you'd see the incremental update in the web UI automatically. >> >> - Patrick >> >> On Tue, Jul 22, 2014 at 2:09 PM, Shivaram Venkataraman >> <shiva...@eecs.berkeley.edu> wrote: >> > From reading Neil's first e-mail, I think the motivation is to get some >> > metrics in ADAM ? -- I've run into a similar use-case with having >> > user-defined metrics in long-running tasks and I think a nice way to >> solve >> > this would be to have user-defined TaskMetrics. >> > >> > To state my problem more clearly, lets say you have two functions you >> use >> > in a map call and want to measure how much time each of them takes. For >> > example, if you have a code block like the one below and you want to >> > measure how much time f1 takes as a fraction of the task. >> > >> > a.map { l => >> > val f = f1(l) >> > ... some work here ... >> > } >> > >> > It would be really cool if we could do something like >> > >> > a.map { l => >> > val start = System.nanoTime >> > val f = f1(l) >> > TaskMetrics.get("f1-time").add(System.nanoTime - start) >> > } >> > >> > These task metrics have a different purpose from Accumulators in the >> sense >> > that we don't need to track lineage, perform commutative operations >> etc. >> > Further we also have a bunch of code in place to aggregate task metrics >> > across a stage etc. So it would be great if we could also populate >> these in >> > the UI and show median/max etc. >> > I think counters [1] in Hadoop served a similar purpose. >> > >> > Thanks >> > Shivaram >> > >> > [1] >> > >> https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-8/counters >> > >> > >> > >> > On Tue, Jul 22, 2014 at 1:43 PM, Neil Ferguson <nfergu...@gmail.com> >> wrote: >> > >> >> Hi Reynold >> >> >> >> Thanks for your reply. >> >> >> >> Accumulators are, of course, stored in the Accumulators object as >> >> thread-local variables. However, the Accumulators object isn't public, >> so >> >> when a Task is executing there's no way to get the set of accumulators >> for >> >> the current thread -- accumulators still have to be passed to every >> method >> >> that needs them. >> >> >> >> Additionally, unless an accumulator is explicitly referenced it won't >> be >> >> serialized as part of a Task, and won't make it into the Accumulators >> >> object in the first place. >> >> >> >> I should also note that what I'm proposing is not specific to >> Accumulators >> >> -- I am proposing that any data can be stored in a thread-local >> variable. I >> >> think there are probably many other use cases other than my one. >> >> >> >> Neil >> >> >> >> >> >> On Tue, Jul 22, 2014 at 5:39 AM, Reynold Xin <r...@databricks.com> >> wrote: >> >> >> >> > Thanks for the thoughtful email, Neil and Christopher. >> >> > >> >> > If I understand this correctly, it seems like the dynamic variable >> is >> >> just >> >> > a variant of the accumulator (a static one since it is a global >> object). >> >> > Accumulators are already implemented using thread-local variables >> under >> >> the >> >> > hood. Am I misunderstanding something? >> >> > >> >> > >> >> > >> >> > On Mon, Jul 21, 2014 at 5:54 PM, Christopher Nguyen <c...@adatao.com> >> >> >> > wrote: >> >> > >> >> > > Hi Neil, first off, I'm generally a sympathetic advocate for >> making >> >> > changes >> >> > > to Spark internals to make it easier/better/faster/more awesome. >> >> > > >> >> > > In this case, I'm (a) not clear about what you're trying to >> accomplish, >> >> > and >> >> > > (b) a bit worried about the proposed solution. >> >> > > >> >> > > On (a): it is stated that you want to pass some Accumulators >> around. >> >> Yet >> >> > > the proposed solution is for some "shared" variable that may be >> set and >> >> > > "mapped out" and possibly "reduced back", but without any >> accompanying >> >> > > accumulation semantics. And yet it doesn't seem like you only want >> just >> >> > the >> >> > > broadcast property. Can you clarify the problem statement with >> some >> >> > > before/after client code examples? >> >> > > >> >> > > On (b): you're right that adding variables to SparkContext should >> be >> >> done >> >> > > with caution, as it may have unintended consequences beyond just >> serdes >> >> > > payload size. For example, there is a stated intention of >> supporting >> >> > > multiple SparkContexts in the future, and this proposed solution >> can >> >> make >> >> > > it a bigger challenge to do so. Indeed, we had a gut-wrenching >> call to >> >> > make >> >> > > a while back on a subject related to this (see >> >> > > https://github.com/mesos/spark/pull/779). Furthermore, even in a >> >> single >> >> > > SparkContext application, there may be multiple "clients" (of that >> >> > > application) whose intent to use the proposed "SparkDynamic" would >> not >> >> > > necessarily be coordinated. >> >> > > >> >> > > So, considering a ratio of a/b (benefit/cost), it's not clear to >> me >> >> that >> >> > > the benefits are significant enough to warrant the costs. Do I >> >> > > misunderstand that the benefit is to save one explicit parameter >> (the >> >> > > "context") in the signature/closure code? >> >> > > >> >> > > -- >> >> > > Christopher T. Nguyen >> >> > > Co-founder & CEO, Adatao <http://adatao.com> >> >> > > linkedin.com/in/ctnguyen >> >> > > >> >> > > >> >> > > >> >> > > On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson < >> nfergu...@gmail.com> >> >> > > wrote: >> >> > > >> >> > > > Hi all >> >> > > > >> >> > > > I have been adding some metrics to the ADAM project >> >> > > > https://github.com/bigdatagenomics/adam, which runs on Spark, >> and >> >> > have a >> >> > > > proposal for an enhancement to Spark that would make this work >> >> cleaner >> >> > > and >> >> > > > easier. >> >> > > > >> >> > > > I need to pass some Accumulators around, which will aggregate >> metrics >> >> > > > (timing stats and other metrics) across the cluster. However, it >> is >> >> > > > cumbersome to have to explicitly pass some "context" containing >> these >> >> > > > accumulators around everywhere that might need them. I can use >> Scala >> >> > > > implicits, which help slightly, but I'd still need to modify >> every >> >> > method >> >> > > > in the call stack to take an implicit variable. >> >> > > > >> >> > > > So, I'd like to propose that we add the ability to have "dynamic >> >> > > variables" >> >> > > > (basically thread-local variables) to Spark. This would avoid >> having >> >> to >> >> > > > pass the Accumulators around explicitly. >> >> > > > >> >> > > > My proposed approach is to add a method to the SparkContext >> class as >> >> > > > follows: >> >> > > > >> >> > > > /** >> >> > > > * Sets the value of a "dynamic variable". This value is made >> >> available >> >> > > to >> >> > > > jobs >> >> > > > * without having to be passed around explicitly. During >> execution >> >> of a >> >> > > > Spark job >> >> > > > * this value can be obtained from the [[SparkDynamic]] object. >> >> > > > */ >> >> > > > def setDynamicVariableValue(value: Any) >> >> > > > >> >> > > > Then, when a job is executing the SparkDynamic can be accessed >> to >> >> > obtain >> >> > > > the value of the dynamic variable. The implementation of this >> object >> >> is >> >> > > as >> >> > > > follows: >> >> > > > >> >> > > > object SparkDynamic { >> >> > > > private val dynamicVariable = new DynamicVariable[Any]() >> >> > > > /** >> >> > > > * Gets the value of the "dynamic variable" that has been set in >> >> the >> >> > > > [[SparkContext]] >> >> > > > */ >> >> > > > def getValue: Option[Any] = { >> >> > > > Option(dynamicVariable.value) >> >> > > > } >> >> > > > private[spark] def withValue[S](threadValue: Option[Any])(thunk: >> => >> >> > > S): S >> >> > > > = { >> >> > > > dynamicVariable.withValue(threadValue.orNull)(thunk) >> >> > > > } >> >> > > > } >> >> > > > >> >> > > > The change involves modifying the Task object to serialize the >> value >> >> of >> >> > > the >> >> > > > dynamic variable, and modifying the TaskRunner class to >> deserialize >> >> the >> >> > > > value and make it available in the thread that is running the >> task >> >> > (using >> >> > > > the SparkDynamic.withValue method). >> >> > > > >> >> > > > I have done a quick prototype of this in this commit: >> >> > > > >> >> > > > >> >> > > >> >> > >> >> >> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6 >> >> > > > and it seems to work fine in my (limited) testing. It needs more >> >> > testing, >> >> > > > tidy-up and documentation though. >> >> > > > >> >> > > > One drawback is that the dynamic variable will be serialized for >> >> every >> >> > > Task >> >> > > > whether it needs it or not. For my use case this might not be >> too >> >> much >> >> > > of a >> >> > > > problem, as serializing and deserializing Accumulators looks >> fairly >> >> > > > lightweight -- however we should certainly warn users against >> >> setting a >> >> > > > dynamic variable containing lots of data. I thought about using >> >> > broadcast >> >> > > > tables here, but I don't think it's possible to put Accumulators >> in a >> >> > > > broadcast table (as I understand it, they're intended for purely >> >> > > read-only >> >> > > > data). >> >> > > > >> >> > > > What do people think about this proposal? My use case aside, it >> seems >> >> > > like >> >> > > > it would be a generally useful enhancment to be able to pass >> certain >> >> > data >> >> > > > around without having to explicitly pass it everywhere. >> >> > > > >> >> > > > Neil >> >> > > > >> >> > > >> >> > >> >> >> >