What if we have a registry for accumulators, where you can access them statically by name?
- Patrick On Thu, Jul 24, 2014 at 1:51 PM, Neil Ferguson <nfergu...@gmail.com> wrote: > I realised that my last reply wasn't very clear -- let me try and clarify. > > The patch for named accumulators looks very useful, however in Shivaram's > example he was able to retrieve the named task metrics (statically) from a > TaskMetrics object, as follows: > > TaskMetrics.get("f1-time") > > However, I don't think this would be possible with the named accumulators > -- I believe they'd need to be passed to every function that needs them, > which I think would be cumbersome in any application of reasonable > complexity. > > This is what I was trying to solve with my proposal for dynamic variables > in Spark. However, the ability to retrieve named accumulators from a > thread-local would work just as well for my use case. I'd be happy to > implement either solution if there's interest. > > Alternatively, if I'm missing some other way to accomplish this please let > me know. > > On a (slight) aside, I now think it would be possible to implement dynamic > variables by broadcasting them. I was looking at Reynold's PR [1] to > broadcast the RDD object, and I think it would be possible to take a > similar approach -- that is, broadcast the serialized form, and deserialize > when executing each task. > > [1] https://github.com/apache/spark/pull/1498 > > > > > > > > On Wed, Jul 23, 2014 at 8:30 AM, Neil Ferguson <nfergu...@gmail.com> wrote: > >> Hi Patrick. >> >> That looks very useful. The thing that seems to be missing from Shivaram's >> example is the ability to access TaskMetrics statically (this is the same >> problem that I am trying to solve with dynamic variables). >> >> >> >> You mention defining an accumulator on the RDD. Perhaps I am missing >> something here, but my understanding was that accumulators are defined in >> SparkContext and are not part of the RDD. Is that correct? >> >> Neil >> >> On Tue, Jul 22, 2014 at 22:21, Patrick Wendell <pwend...@gmail.com >> ="mailto:pwend...@gmail.com">> wrote: >> >>> Shivaram, >>> >>> You should take a look at this patch which adds support for naming >>> accumulators - this is likely to get merged in soon. I actually >>> started this patch by supporting named TaskMetrics similar to what you >>> have there, but then I realized there is too much semantic overlap >>> with accumulators, so I just went that route. >>> >>> For instance, it would be nice if any user-defined metrics are >>> accessible at the driver program. >>> >>> https://github.com/apache/spark/pull/1309 >>> >>> In your example, you could just define an accumulator here on the RDD >>> and you'd see the incremental update in the web UI automatically. >>> >>> - Patrick >>> >>> On Tue, Jul 22, 2014 at 2:09 PM, Shivaram Venkataraman >>> <shiva...@eecs.berkeley.edu> wrote: >>> > From reading Neil's first e-mail, I think the motivation is to get some >>> > metrics in ADAM ? -- I've run into a similar use-case with having >>> > user-defined metrics in long-running tasks and I think a nice way to >>> solve >>> > this would be to have user-defined TaskMetrics. >>> > >>> > To state my problem more clearly, lets say you have two functions you >>> use >>> > in a map call and want to measure how much time each of them takes. For >>> > example, if you have a code block like the one below and you want to >>> > measure how much time f1 takes as a fraction of the task. >>> > >>> > a.map { l => >>> > val f = f1(l) >>> > ... some work here ... >>> > } >>> > >>> > It would be really cool if we could do something like >>> > >>> > a.map { l => >>> > val start = System.nanoTime >>> > val f = f1(l) >>> > TaskMetrics.get("f1-time").add(System.nanoTime - start) >>> > } >>> > >>> > These task metrics have a different purpose from Accumulators in the >>> sense >>> > that we don't need to track lineage, perform commutative operations >>> etc. >>> > Further we also have a bunch of code in place to aggregate task metrics >>> > across a stage etc. So it would be great if we could also populate >>> these in >>> > the UI and show median/max etc. >>> > I think counters [1] in Hadoop served a similar purpose. >>> > >>> > Thanks >>> > Shivaram >>> > >>> > [1] >>> > >>> https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-8/counters >>> > >>> > >>> > >>> > On Tue, Jul 22, 2014 at 1:43 PM, Neil Ferguson <nfergu...@gmail.com> >>> wrote: >>> > >>> >> Hi Reynold >>> >> >>> >> Thanks for your reply. >>> >> >>> >> Accumulators are, of course, stored in the Accumulators object as >>> >> thread-local variables. However, the Accumulators object isn't public, >>> so >>> >> when a Task is executing there's no way to get the set of accumulators >>> for >>> >> the current thread -- accumulators still have to be passed to every >>> method >>> >> that needs them. >>> >> >>> >> Additionally, unless an accumulator is explicitly referenced it won't >>> be >>> >> serialized as part of a Task, and won't make it into the Accumulators >>> >> object in the first place. >>> >> >>> >> I should also note that what I'm proposing is not specific to >>> Accumulators >>> >> -- I am proposing that any data can be stored in a thread-local >>> variable. I >>> >> think there are probably many other use cases other than my one. >>> >> >>> >> Neil >>> >> >>> >> >>> >> On Tue, Jul 22, 2014 at 5:39 AM, Reynold Xin <r...@databricks.com> >>> wrote: >>> >> >>> >> > Thanks for the thoughtful email, Neil and Christopher. >>> >> > >>> >> > If I understand this correctly, it seems like the dynamic variable >>> is >>> >> just >>> >> > a variant of the accumulator (a static one since it is a global >>> object). >>> >> > Accumulators are already implemented using thread-local variables >>> under >>> >> the >>> >> > hood. Am I misunderstanding something? >>> >> > >>> >> > >>> >> > >>> >> > On Mon, Jul 21, 2014 at 5:54 PM, Christopher Nguyen <c...@adatao.com> >>> >>> >> > wrote: >>> >> > >>> >> > > Hi Neil, first off, I'm generally a sympathetic advocate for >>> making >>> >> > changes >>> >> > > to Spark internals to make it easier/better/faster/more awesome. >>> >> > > >>> >> > > In this case, I'm (a) not clear about what you're trying to >>> accomplish, >>> >> > and >>> >> > > (b) a bit worried about the proposed solution. >>> >> > > >>> >> > > On (a): it is stated that you want to pass some Accumulators >>> around. >>> >> Yet >>> >> > > the proposed solution is for some "shared" variable that may be >>> set and >>> >> > > "mapped out" and possibly "reduced back", but without any >>> accompanying >>> >> > > accumulation semantics. And yet it doesn't seem like you only want >>> just >>> >> > the >>> >> > > broadcast property. Can you clarify the problem statement with >>> some >>> >> > > before/after client code examples? >>> >> > > >>> >> > > On (b): you're right that adding variables to SparkContext should >>> be >>> >> done >>> >> > > with caution, as it may have unintended consequences beyond just >>> serdes >>> >> > > payload size. For example, there is a stated intention of >>> supporting >>> >> > > multiple SparkContexts in the future, and this proposed solution >>> can >>> >> make >>> >> > > it a bigger challenge to do so. Indeed, we had a gut-wrenching >>> call to >>> >> > make >>> >> > > a while back on a subject related to this (see >>> >> > > https://github.com/mesos/spark/pull/779). Furthermore, even in a >>> >> single >>> >> > > SparkContext application, there may be multiple "clients" (of that >>> >> > > application) whose intent to use the proposed "SparkDynamic" would >>> not >>> >> > > necessarily be coordinated. >>> >> > > >>> >> > > So, considering a ratio of a/b (benefit/cost), it's not clear to >>> me >>> >> that >>> >> > > the benefits are significant enough to warrant the costs. Do I >>> >> > > misunderstand that the benefit is to save one explicit parameter >>> (the >>> >> > > "context") in the signature/closure code? >>> >> > > >>> >> > > -- >>> >> > > Christopher T. Nguyen >>> >> > > Co-founder & CEO, Adatao <http://adatao.com> >>> >> > > linkedin.com/in/ctnguyen >>> >> > > >>> >> > > >>> >> > > >>> >> > > On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson < >>> nfergu...@gmail.com> >>> >> > > wrote: >>> >> > > >>> >> > > > Hi all >>> >> > > > >>> >> > > > I have been adding some metrics to the ADAM project >>> >> > > > https://github.com/bigdatagenomics/adam, which runs on Spark, >>> and >>> >> > have a >>> >> > > > proposal for an enhancement to Spark that would make this work >>> >> cleaner >>> >> > > and >>> >> > > > easier. >>> >> > > > >>> >> > > > I need to pass some Accumulators around, which will aggregate >>> metrics >>> >> > > > (timing stats and other metrics) across the cluster. However, it >>> is >>> >> > > > cumbersome to have to explicitly pass some "context" containing >>> these >>> >> > > > accumulators around everywhere that might need them. I can use >>> Scala >>> >> > > > implicits, which help slightly, but I'd still need to modify >>> every >>> >> > method >>> >> > > > in the call stack to take an implicit variable. >>> >> > > > >>> >> > > > So, I'd like to propose that we add the ability to have "dynamic >>> >> > > variables" >>> >> > > > (basically thread-local variables) to Spark. This would avoid >>> having >>> >> to >>> >> > > > pass the Accumulators around explicitly. >>> >> > > > >>> >> > > > My proposed approach is to add a method to the SparkContext >>> class as >>> >> > > > follows: >>> >> > > > >>> >> > > > /** >>> >> > > > * Sets the value of a "dynamic variable". This value is made >>> >> available >>> >> > > to >>> >> > > > jobs >>> >> > > > * without having to be passed around explicitly. During >>> execution >>> >> of a >>> >> > > > Spark job >>> >> > > > * this value can be obtained from the [[SparkDynamic]] object. >>> >> > > > */ >>> >> > > > def setDynamicVariableValue(value: Any) >>> >> > > > >>> >> > > > Then, when a job is executing the SparkDynamic can be accessed >>> to >>> >> > obtain >>> >> > > > the value of the dynamic variable. The implementation of this >>> object >>> >> is >>> >> > > as >>> >> > > > follows: >>> >> > > > >>> >> > > > object SparkDynamic { >>> >> > > > private val dynamicVariable = new DynamicVariable[Any]() >>> >> > > > /** >>> >> > > > * Gets the value of the "dynamic variable" that has been set in >>> >> the >>> >> > > > [[SparkContext]] >>> >> > > > */ >>> >> > > > def getValue: Option[Any] = { >>> >> > > > Option(dynamicVariable.value) >>> >> > > > } >>> >> > > > private[spark] def withValue[S](threadValue: Option[Any])(thunk: >>> => >>> >> > > S): S >>> >> > > > = { >>> >> > > > dynamicVariable.withValue(threadValue.orNull)(thunk) >>> >> > > > } >>> >> > > > } >>> >> > > > >>> >> > > > The change involves modifying the Task object to serialize the >>> value >>> >> of >>> >> > > the >>> >> > > > dynamic variable, and modifying the TaskRunner class to >>> deserialize >>> >> the >>> >> > > > value and make it available in the thread that is running the >>> task >>> >> > (using >>> >> > > > the SparkDynamic.withValue method). >>> >> > > > >>> >> > > > I have done a quick prototype of this in this commit: >>> >> > > > >>> >> > > > >>> >> > > >>> >> > >>> >> >>> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6 >>> >> > > > and it seems to work fine in my (limited) testing. It needs more >>> >> > testing, >>> >> > > > tidy-up and documentation though. >>> >> > > > >>> >> > > > One drawback is that the dynamic variable will be serialized for >>> >> every >>> >> > > Task >>> >> > > > whether it needs it or not. For my use case this might not be >>> too >>> >> much >>> >> > > of a >>> >> > > > problem, as serializing and deserializing Accumulators looks >>> fairly >>> >> > > > lightweight -- however we should certainly warn users against >>> >> setting a >>> >> > > > dynamic variable containing lots of data. I thought about using >>> >> > broadcast >>> >> > > > tables here, but I don't think it's possible to put Accumulators >>> in a >>> >> > > > broadcast table (as I understand it, they're intended for purely >>> >> > > read-only >>> >> > > > data). >>> >> > > > >>> >> > > > What do people think about this proposal? My use case aside, it >>> seems >>> >> > > like >>> >> > > > it would be a generally useful enhancment to be able to pass >>> certain >>> >> > data >>> >> > > > around without having to explicitly pass it everywhere. >>> >> > > > >>> >> > > > Neil >>> >> > > > >>> >> > > >>> >> > >>> >> >>> >>