That is an interesting point; I run the driver as a background process on the master node so that I can still pipe the stdout/stderr filestreams to the (network) filesystem. I should mention that the master is connected to the slaves with a 10 Gb link on the same managed switch that the slaves use.
On 9/26/15, Evan R. Sparks <evan.spa...@gmail.com> wrote: > Off the top of my head, I'm not sure, but it looks like virtually all the > extra time between each stage is accounted for with T_{io} in your plot, > which I'm guessing is time spent communicating results over the network? Is > your driver running on the master or is it on a different node? If you look > at the code for treeAggregate, the last stage uses a .reduce() for the > final combination, which happens on the driver. In this case, the size of > the gradients is O(1GB) so if you've got to go over a slow link for the > last portion this could really make a difference. > > On Sat, Sep 26, 2015 at 10:20 AM, Mike Hynes <91m...@gmail.com> wrote: > >> Hi Evan, >> >> (I just realized my initial email was a reply to the wrong thread; I'm >> very sorry about this). >> >> Thanks for your email, and your thoughts on the sampling. That the >> gradient computations are essentially the cost of a pass through each >> element of the partition makes sense, especially given the sparsity of >> the feature vectors. >> >> Would you have any idea why the communication time is so much larger >> in the final level of the aggregation, however? I can't immediately >> see why it should take longer to transfer the local gradient vectors >> in that level, since they are dense in every level. Furthermore, the >> driver is receiving the result of only 4 tasks, which is relatively >> small. >> >> Mike >> >> >> On 9/26/15, Evan R. Sparks <evan.spa...@gmail.com> wrote: >> > Mike, >> > >> > I believe the reason you're seeing near identical performance on the >> > gradient computations is twofold >> > 1) Gradient computations for GLM models are computationally pretty >> > cheap >> > from a FLOPs/byte read perspective. They are essentially a BLAS "gemv" >> call >> > in the dense case, which is well known to be bound by memory bandwidth >> > on >> > modern processors. So, you're basically paying the cost of a scan of >> > the >> > points you've sampled to do the gradient computation. >> > 2) The default sampling mechanism used by the GradientDescent optimizer >> in >> > MLlib is implemented via RDD.sample, which does reservoir sampling on >> each >> > partition. This requires a full scan of each partition at every >> > iteration >> > to collect the samples. >> > >> > So - you're going to pay the cost of a scan to do the sampling anyway, >> and >> > the gradient computation is essentially free at this point (and can be >> > pipelined, etc.). >> > >> > It is quite possible to improve #2 by coming up with a better sampling >> > algorithm. One easy algorithm would be to assume the data is already >> > randomly shuffled (or do that once) and then use the first >> > miniBatchFraction*partitionSize records on the first iteration, the >> second >> > set on the second set on the second iteration, and so on. You could >> > protoype this algorithm pretty easily by converting your data to an >> > RDD[Array[DenseVector]] and doing some bookkeeping at each iteration. >> > >> > That said - eventually the overheads of the platform catch up to you. >> > As >> a >> > rule of thumb I estimate about 50ms/iteration as a floor for things >> > like >> > task serialization and other platform overheads. You've got to balance >> how >> > much computation you want to do vs. the amount of time you want to >> > spend >> > waiting for the platform. >> > >> > - Evan >> > >> > On Sat, Sep 26, 2015 at 9:27 AM, Mike Hynes <91m...@gmail.com> wrote: >> > >> >> Hello Devs, >> >> >> >> This email concerns some timing results for a treeAggregate in >> >> computing a (stochastic) gradient over an RDD of labelled points, as >> >> is currently done in the MLlib optimization routine for SGD. >> >> >> >> In SGD, the underlying RDD is downsampled by a fraction f \in (0,1], >> >> and the subgradients over all the instances in the downsampled RDD are >> >> aggregated to the driver as a dense vector. However, we have noticed >> >> some unusual behaviour when f < 1: it takes the same amount of time to >> >> compute the stochastic gradient for a stochastic minibatch as it does >> >> for a full batch (f = 1). >> >> >> >> Attached are two plots of the mean task timing metrics for each level >> >> in the aggregation, which has been performed with 4 levels (level 4 is >> >> the final level, in which the results are communicated to the driver). >> >> 16 nodes are used, and the RDD has 256 partitions. We run in (client) >> >> standalone mode. Here, the total time for the tasks is shown (\tau) >> >> alongside the execution time (not counting GC), >> >> serialization/deserialization time, the GC time, and the difference >> >> between tau and all other times, assumed to be variable >> >> IO/communication/waiting time. The RDD in this case is a labelled >> >> point representation of the KDD Bridge to Algebra dataset, with 20M >> >> (sparse) instances and a problem dimension of 30M. The sparsity of the >> >> instances is very high---each individual instance vector may have only >> >> a hundred nonzeros. All metrics have been taken from the JSON Spark >> >> event logs. >> >> >> >> The plot gradient_f1.pdf shows the times for a gradient computation >> >> with f = 1, and gradient_f-3.pdf shows the same metrics with f = 1e-3. >> >> For other f values in {1e-1 1e-2 ... 1e-5}, the same effect is >> >> observed. >> >> >> >> What I would like to mention about these plots, and ask if anyone has >> >> experience with, is the following: >> >> 1. The times are essentially identical; I would have thought that >> >> downsampling the RDD before aggregating the subgradients would at >> >> least reduce the execution time required, if not the >> >> communication/serialization times. >> >> 2. The serialization time in level 4 is almost entirely from the >> >> result serialization to the driver, and not the task deserialization. >> >> In each level of the treeAggregation, however, the local (dense) >> >> gradients have to be communicated between compute nodes, so I am >> >> surprised that it takes so much longer to return the vectors to the >> >> driver. >> >> >> >> I initially wondered if the large IO overhead in the last stage had >> >> anything to do with client mode vs cluster mode, since, from what I >> >> understand, only a single core is allocated to the driver thread in >> >> client mode. However, when running tests in the two modes, I have >> >> previously seen no appreciable difference in the running time for >> >> other (admittedly smaller) problems. Furthermore, I am still very >> >> confused about why the execution time for each task is just as large >> >> for the downsampled RDD. It seems unlikely that sampling each >> >> partition would be as expensive as the gradient computations, even for >> >> sparse feature vectors. >> >> >> >> If anyone has experience working with the sampling in minibatch SGD or >> >> has tested the scalability of the treeAggregation operation for >> >> vectors, I'd really appreciate your thoughts. >> >> >> >> Thanks, >> >> Mike >> >> >> >> >> >> --------------------------------------------------------------------- >> >> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org >> >> For additional commands, e-mail: dev-h...@spark.apache.org >> >> >> > >> >> >> -- >> Thanks, >> Mike >> > -- Thanks, Mike --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org