Off the top of my head, I'm not sure, but it looks like virtually all the extra time between each stage is accounted for with T_{io} in your plot, which I'm guessing is time spent communicating results over the network? Is your driver running on the master or is it on a different node? If you look at the code for treeAggregate, the last stage uses a .reduce() for the final combination, which happens on the driver. In this case, the size of the gradients is O(1GB) so if you've got to go over a slow link for the last portion this could really make a difference.
On Sat, Sep 26, 2015 at 10:20 AM, Mike Hynes <91m...@gmail.com> wrote: > Hi Evan, > > (I just realized my initial email was a reply to the wrong thread; I'm > very sorry about this). > > Thanks for your email, and your thoughts on the sampling. That the > gradient computations are essentially the cost of a pass through each > element of the partition makes sense, especially given the sparsity of > the feature vectors. > > Would you have any idea why the communication time is so much larger > in the final level of the aggregation, however? I can't immediately > see why it should take longer to transfer the local gradient vectors > in that level, since they are dense in every level. Furthermore, the > driver is receiving the result of only 4 tasks, which is relatively > small. > > Mike > > > On 9/26/15, Evan R. Sparks <evan.spa...@gmail.com> wrote: > > Mike, > > > > I believe the reason you're seeing near identical performance on the > > gradient computations is twofold > > 1) Gradient computations for GLM models are computationally pretty cheap > > from a FLOPs/byte read perspective. They are essentially a BLAS "gemv" > call > > in the dense case, which is well known to be bound by memory bandwidth on > > modern processors. So, you're basically paying the cost of a scan of the > > points you've sampled to do the gradient computation. > > 2) The default sampling mechanism used by the GradientDescent optimizer > in > > MLlib is implemented via RDD.sample, which does reservoir sampling on > each > > partition. This requires a full scan of each partition at every iteration > > to collect the samples. > > > > So - you're going to pay the cost of a scan to do the sampling anyway, > and > > the gradient computation is essentially free at this point (and can be > > pipelined, etc.). > > > > It is quite possible to improve #2 by coming up with a better sampling > > algorithm. One easy algorithm would be to assume the data is already > > randomly shuffled (or do that once) and then use the first > > miniBatchFraction*partitionSize records on the first iteration, the > second > > set on the second set on the second iteration, and so on. You could > > protoype this algorithm pretty easily by converting your data to an > > RDD[Array[DenseVector]] and doing some bookkeeping at each iteration. > > > > That said - eventually the overheads of the platform catch up to you. As > a > > rule of thumb I estimate about 50ms/iteration as a floor for things like > > task serialization and other platform overheads. You've got to balance > how > > much computation you want to do vs. the amount of time you want to spend > > waiting for the platform. > > > > - Evan > > > > On Sat, Sep 26, 2015 at 9:27 AM, Mike Hynes <91m...@gmail.com> wrote: > > > >> Hello Devs, > >> > >> This email concerns some timing results for a treeAggregate in > >> computing a (stochastic) gradient over an RDD of labelled points, as > >> is currently done in the MLlib optimization routine for SGD. > >> > >> In SGD, the underlying RDD is downsampled by a fraction f \in (0,1], > >> and the subgradients over all the instances in the downsampled RDD are > >> aggregated to the driver as a dense vector. However, we have noticed > >> some unusual behaviour when f < 1: it takes the same amount of time to > >> compute the stochastic gradient for a stochastic minibatch as it does > >> for a full batch (f = 1). > >> > >> Attached are two plots of the mean task timing metrics for each level > >> in the aggregation, which has been performed with 4 levels (level 4 is > >> the final level, in which the results are communicated to the driver). > >> 16 nodes are used, and the RDD has 256 partitions. We run in (client) > >> standalone mode. Here, the total time for the tasks is shown (\tau) > >> alongside the execution time (not counting GC), > >> serialization/deserialization time, the GC time, and the difference > >> between tau and all other times, assumed to be variable > >> IO/communication/waiting time. The RDD in this case is a labelled > >> point representation of the KDD Bridge to Algebra dataset, with 20M > >> (sparse) instances and a problem dimension of 30M. The sparsity of the > >> instances is very high---each individual instance vector may have only > >> a hundred nonzeros. All metrics have been taken from the JSON Spark > >> event logs. > >> > >> The plot gradient_f1.pdf shows the times for a gradient computation > >> with f = 1, and gradient_f-3.pdf shows the same metrics with f = 1e-3. > >> For other f values in {1e-1 1e-2 ... 1e-5}, the same effect is > >> observed. > >> > >> What I would like to mention about these plots, and ask if anyone has > >> experience with, is the following: > >> 1. The times are essentially identical; I would have thought that > >> downsampling the RDD before aggregating the subgradients would at > >> least reduce the execution time required, if not the > >> communication/serialization times. > >> 2. The serialization time in level 4 is almost entirely from the > >> result serialization to the driver, and not the task deserialization. > >> In each level of the treeAggregation, however, the local (dense) > >> gradients have to be communicated between compute nodes, so I am > >> surprised that it takes so much longer to return the vectors to the > >> driver. > >> > >> I initially wondered if the large IO overhead in the last stage had > >> anything to do with client mode vs cluster mode, since, from what I > >> understand, only a single core is allocated to the driver thread in > >> client mode. However, when running tests in the two modes, I have > >> previously seen no appreciable difference in the running time for > >> other (admittedly smaller) problems. Furthermore, I am still very > >> confused about why the execution time for each task is just as large > >> for the downsampled RDD. It seems unlikely that sampling each > >> partition would be as expensive as the gradient computations, even for > >> sparse feature vectors. > >> > >> If anyone has experience working with the sampling in minibatch SGD or > >> has tested the scalability of the treeAggregation operation for > >> vectors, I'd really appreciate your thoughts. > >> > >> Thanks, > >> Mike > >> > >> > >> --------------------------------------------------------------------- > >> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org > >> For additional commands, e-mail: dev-h...@spark.apache.org > >> > > > > > -- > Thanks, > Mike >