I've opened SPARK-3051 (https://issues.apache.org/jira/browse/SPARK-3051) based on this thread.
Neil On Thu, Jul 24, 2014 at 10:30 PM, Neil Ferguson <nfergu...@gmail.com> wrote: > That would work well for me! Do you think it would be necessary to specify > which accumulators should be available in the registry, or would we just > broadcast all named accumulators registered in SparkContext and make them > available in the registry? > > Anyway, I'm happy to make the necessary changes (unless someone else wants > to). > > > On Thu, Jul 24, 2014 at 10:17 PM, Patrick Wendell <pwend...@gmail.com> > wrote: > >> What if we have a registry for accumulators, where you can access them >> statically by name? >> >> - Patrick >> >> On Thu, Jul 24, 2014 at 1:51 PM, Neil Ferguson <nfergu...@gmail.com> >> wrote: >> > I realised that my last reply wasn't very clear -- let me try and >> clarify. >> > >> > The patch for named accumulators looks very useful, however in >> Shivaram's >> > example he was able to retrieve the named task metrics (statically) >> from a >> > TaskMetrics object, as follows: >> > >> > TaskMetrics.get("f1-time") >> > >> > However, I don't think this would be possible with the named >> accumulators >> > -- I believe they'd need to be passed to every function that needs >> them, >> > which I think would be cumbersome in any application of reasonable >> > complexity. >> > >> > This is what I was trying to solve with my proposal for dynamic >> variables >> > in Spark. However, the ability to retrieve named accumulators from a >> > thread-local would work just as well for my use case. I'd be happy to >> > implement either solution if there's interest. >> > >> > Alternatively, if I'm missing some other way to accomplish this please >> let >> > me know. >> > >> > On a (slight) aside, I now think it would be possible to implement >> dynamic >> > variables by broadcasting them. I was looking at Reynold's PR [1] to >> > broadcast the RDD object, and I think it would be possible to take a >> > similar approach -- that is, broadcast the serialized form, and >> deserialize >> > when executing each task. >> > >> > [1] https://github.com/apache/spark/pull/1498 >> > >> > >> > >> > >> > >> > >> > >> > On Wed, Jul 23, 2014 at 8:30 AM, Neil Ferguson <nfergu...@gmail.com> >> wrote: >> > >> >> Hi Patrick. >> >> >> >> That looks very useful. The thing that seems to be missing from >> Shivaram's >> >> example is the ability to access TaskMetrics statically (this is the >> same >> >> problem that I am trying to solve with dynamic variables). >> >> >> >> >> >> >> >> You mention defining an accumulator on the RDD. Perhaps I am missing >> >> something here, but my understanding was that accumulators are defined >> in >> >> SparkContext and are not part of the RDD. Is that correct? >> >> >> >> Neil >> >> >> >> On Tue, Jul 22, 2014 at 22:21, Patrick Wendell <pwend...@gmail.com >> >> ="mailto:pwend...@gmail.com">> wrote: >> >> >> >>> Shivaram, >> >>> >> >>> You should take a look at this patch which adds support for naming >> >>> accumulators - this is likely to get merged in soon. I actually >> >>> started this patch by supporting named TaskMetrics similar to what >> you >> >>> have there, but then I realized there is too much semantic overlap >> >>> with accumulators, so I just went that route. >> >>> >> >>> For instance, it would be nice if any user-defined metrics are >> >>> accessible at the driver program. >> >>> >> >>> https://github.com/apache/spark/pull/1309 >> >>> >> >>> In your example, you could just define an accumulator here on the RDD >> >>> and you'd see the incremental update in the web UI automatically. >> >>> >> >>> - Patrick >> >>> >> >>> On Tue, Jul 22, 2014 at 2:09 PM, Shivaram Venkataraman >> >>> <shiva...@eecs.berkeley.edu> wrote: >> >>> > From reading Neil's first e-mail, I think the motivation is to get >> some >> >>> > metrics in ADAM ? -- I've run into a similar use-case with having >> >>> > user-defined metrics in long-running tasks and I think a nice way >> to >> >>> solve >> >>> > this would be to have user-defined TaskMetrics. >> >>> > >> >>> > To state my problem more clearly, lets say you have two functions >> you >> >>> use >> >>> > in a map call and want to measure how much time each of them takes. >> For >> >>> > example, if you have a code block like the one below and you want >> to >> >>> > measure how much time f1 takes as a fraction of the task. >> >>> > >> >>> > a.map { l => >> >>> > val f = f1(l) >> >>> > ... some work here ... >> >>> > } >> >>> > >> >>> > It would be really cool if we could do something like >> >>> > >> >>> > a.map { l => >> >>> > val start = System.nanoTime >> >>> > val f = f1(l) >> >>> > TaskMetrics.get("f1-time").add(System.nanoTime - start) >> >>> > } >> >>> > >> >>> > These task metrics have a different purpose from Accumulators in >> the >> >>> sense >> >>> > that we don't need to track lineage, perform commutative operations >> >>> etc. >> >>> > Further we also have a bunch of code in place to aggregate task >> metrics >> >>> > across a stage etc. So it would be great if we could also populate >> >>> these in >> >>> > the UI and show median/max etc. >> >>> > I think counters [1] in Hadoop served a similar purpose. >> >>> > >> >>> > Thanks >> >>> > Shivaram >> >>> > >> >>> > [1] >> >>> > >> >>> >> https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-8/counters >> >>> > >> >>> > >> >>> > >> >>> > On Tue, Jul 22, 2014 at 1:43 PM, Neil Ferguson <nfergu...@gmail.com> >> >> >>> wrote: >> >>> > >> >>> >> Hi Reynold >> >>> >> >> >>> >> Thanks for your reply. >> >>> >> >> >>> >> Accumulators are, of course, stored in the Accumulators object as >> >>> >> thread-local variables. However, the Accumulators object isn't >> public, >> >>> so >> >>> >> when a Task is executing there's no way to get the set of >> accumulators >> >>> for >> >>> >> the current thread -- accumulators still have to be passed to >> every >> >>> method >> >>> >> that needs them. >> >>> >> >> >>> >> Additionally, unless an accumulator is explicitly referenced it >> won't >> >>> be >> >>> >> serialized as part of a Task, and won't make it into the >> Accumulators >> >>> >> object in the first place. >> >>> >> >> >>> >> I should also note that what I'm proposing is not specific to >> >>> Accumulators >> >>> >> -- I am proposing that any data can be stored in a thread-local >> >>> variable. I >> >>> >> think there are probably many other use cases other than my one. >> >>> >> >> >>> >> Neil >> >>> >> >> >>> >> >> >>> >> On Tue, Jul 22, 2014 at 5:39 AM, Reynold Xin <r...@databricks.com> >> >> >>> wrote: >> >>> >> >> >>> >> > Thanks for the thoughtful email, Neil and Christopher. >> >>> >> > >> >>> >> > If I understand this correctly, it seems like the dynamic >> variable >> >>> is >> >>> >> just >> >>> >> > a variant of the accumulator (a static one since it is a global >> >>> object). >> >>> >> > Accumulators are already implemented using thread-local >> variables >> >>> under >> >>> >> the >> >>> >> > hood. Am I misunderstanding something? >> >>> >> > >> >>> >> > >> >>> >> > >> >>> >> > On Mon, Jul 21, 2014 at 5:54 PM, Christopher Nguyen < >> c...@adatao.com> >> >>> >> >>> >> > wrote: >> >>> >> > >> >>> >> > > Hi Neil, first off, I'm generally a sympathetic advocate for >> >>> making >> >>> >> > changes >> >>> >> > > to Spark internals to make it easier/better/faster/more >> awesome. >> >>> >> > > >> >>> >> > > In this case, I'm (a) not clear about what you're trying to >> >>> accomplish, >> >>> >> > and >> >>> >> > > (b) a bit worried about the proposed solution. >> >>> >> > > >> >>> >> > > On (a): it is stated that you want to pass some Accumulators >> >>> around. >> >>> >> Yet >> >>> >> > > the proposed solution is for some "shared" variable that may >> be >> >>> set and >> >>> >> > > "mapped out" and possibly "reduced back", but without any >> >>> accompanying >> >>> >> > > accumulation semantics. And yet it doesn't seem like you only >> want >> >>> just >> >>> >> > the >> >>> >> > > broadcast property. Can you clarify the problem statement with >> >>> some >> >>> >> > > before/after client code examples? >> >>> >> > > >> >>> >> > > On (b): you're right that adding variables to SparkContext >> should >> >>> be >> >>> >> done >> >>> >> > > with caution, as it may have unintended consequences beyond >> just >> >>> serdes >> >>> >> > > payload size. For example, there is a stated intention of >> >>> supporting >> >>> >> > > multiple SparkContexts in the future, and this proposed >> solution >> >>> can >> >>> >> make >> >>> >> > > it a bigger challenge to do so. Indeed, we had a gut-wrenching >> >>> call to >> >>> >> > make >> >>> >> > > a while back on a subject related to this (see >> >>> >> > > https://github.com/mesos/spark/pull/779). Furthermore, even >> in a >> >>> >> single >> >>> >> > > SparkContext application, there may be multiple "clients" (of >> that >> >>> >> > > application) whose intent to use the proposed "SparkDynamic" >> would >> >>> not >> >>> >> > > necessarily be coordinated. >> >>> >> > > >> >>> >> > > So, considering a ratio of a/b (benefit/cost), it's not clear >> to >> >>> me >> >>> >> that >> >>> >> > > the benefits are significant enough to warrant the costs. Do I >> >>> >> > > misunderstand that the benefit is to save one explicit >> parameter >> >>> (the >> >>> >> > > "context") in the signature/closure code? >> >>> >> > > >> >>> >> > > -- >> >>> >> > > Christopher T. Nguyen >> >>> >> > > Co-founder & CEO, Adatao <http://adatao.com> >> >>> >> > > linkedin.com/in/ctnguyen >> >>> >> > > >> >>> >> > > >> >>> >> > > >> >>> >> > > On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson < >> >>> nfergu...@gmail.com> >> >>> >> > > wrote: >> >>> >> > > >> >>> >> > > > Hi all >> >>> >> > > > >> >>> >> > > > I have been adding some metrics to the ADAM project >> >>> >> > > > https://github.com/bigdatagenomics/adam, which runs on >> Spark, >> >>> and >> >>> >> > have a >> >>> >> > > > proposal for an enhancement to Spark that would make this >> work >> >>> >> cleaner >> >>> >> > > and >> >>> >> > > > easier. >> >>> >> > > > >> >>> >> > > > I need to pass some Accumulators around, which will >> aggregate >> >>> metrics >> >>> >> > > > (timing stats and other metrics) across the cluster. >> However, it >> >>> is >> >>> >> > > > cumbersome to have to explicitly pass some "context" >> containing >> >>> these >> >>> >> > > > accumulators around everywhere that might need them. I can >> use >> >>> Scala >> >>> >> > > > implicits, which help slightly, but I'd still need to modify >> >>> every >> >>> >> > method >> >>> >> > > > in the call stack to take an implicit variable. >> >>> >> > > > >> >>> >> > > > So, I'd like to propose that we add the ability to have >> "dynamic >> >>> >> > > variables" >> >>> >> > > > (basically thread-local variables) to Spark. This would >> avoid >> >>> having >> >>> >> to >> >>> >> > > > pass the Accumulators around explicitly. >> >>> >> > > > >> >>> >> > > > My proposed approach is to add a method to the SparkContext >> >>> class as >> >>> >> > > > follows: >> >>> >> > > > >> >>> >> > > > /** >> >>> >> > > > * Sets the value of a "dynamic variable". This value is made >> >>> >> available >> >>> >> > > to >> >>> >> > > > jobs >> >>> >> > > > * without having to be passed around explicitly. During >> >>> execution >> >>> >> of a >> >>> >> > > > Spark job >> >>> >> > > > * this value can be obtained from the [[SparkDynamic]] >> object. >> >>> >> > > > */ >> >>> >> > > > def setDynamicVariableValue(value: Any) >> >>> >> > > > >> >>> >> > > > Then, when a job is executing the SparkDynamic can be >> accessed >> >>> to >> >>> >> > obtain >> >>> >> > > > the value of the dynamic variable. The implementation of >> this >> >>> object >> >>> >> is >> >>> >> > > as >> >>> >> > > > follows: >> >>> >> > > > >> >>> >> > > > object SparkDynamic { >> >>> >> > > > private val dynamicVariable = new DynamicVariable[Any]() >> >>> >> > > > /** >> >>> >> > > > * Gets the value of the "dynamic variable" that has been set >> in >> >>> >> the >> >>> >> > > > [[SparkContext]] >> >>> >> > > > */ >> >>> >> > > > def getValue: Option[Any] = { >> >>> >> > > > Option(dynamicVariable.value) >> >>> >> > > > } >> >>> >> > > > private[spark] def withValue[S](threadValue: >> Option[Any])(thunk: >> >>> => >> >>> >> > > S): S >> >>> >> > > > = { >> >>> >> > > > dynamicVariable.withValue(threadValue.orNull)(thunk) >> >>> >> > > > } >> >>> >> > > > } >> >>> >> > > > >> >>> >> > > > The change involves modifying the Task object to serialize >> the >> >>> value >> >>> >> of >> >>> >> > > the >> >>> >> > > > dynamic variable, and modifying the TaskRunner class to >> >>> deserialize >> >>> >> the >> >>> >> > > > value and make it available in the thread that is running >> the >> >>> task >> >>> >> > (using >> >>> >> > > > the SparkDynamic.withValue method). >> >>> >> > > > >> >>> >> > > > I have done a quick prototype of this in this commit: >> >>> >> > > > >> >>> >> > > > >> >>> >> > > >> >>> >> > >> >>> >> >> >>> >> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6 >> >>> >> > > > and it seems to work fine in my (limited) testing. It needs >> more >> >>> >> > testing, >> >>> >> > > > tidy-up and documentation though. >> >>> >> > > > >> >>> >> > > > One drawback is that the dynamic variable will be serialized >> for >> >>> >> every >> >>> >> > > Task >> >>> >> > > > whether it needs it or not. For my use case this might not >> be >> >>> too >> >>> >> much >> >>> >> > > of a >> >>> >> > > > problem, as serializing and deserializing Accumulators looks >> >>> fairly >> >>> >> > > > lightweight -- however we should certainly warn users >> against >> >>> >> setting a >> >>> >> > > > dynamic variable containing lots of data. I thought about >> using >> >>> >> > broadcast >> >>> >> > > > tables here, but I don't think it's possible to put >> Accumulators >> >>> in a >> >>> >> > > > broadcast table (as I understand it, they're intended for >> purely >> >>> >> > > read-only >> >>> >> > > > data). >> >>> >> > > > >> >>> >> > > > What do people think about this proposal? My use case aside, >> it >> >>> seems >> >>> >> > > like >> >>> >> > > > it would be a generally useful enhancment to be able to pass >> >>> certain >> >>> >> > data >> >>> >> > > > around without having to explicitly pass it everywhere. >> >>> >> > > > >> >>> >> > > > Neil >> >>> >> > > > >> >>> >> > > >> >>> >> > >> >>> >> >> >>> >> >> >> > >