That is an interesting point; I run the driver as a background process
on the master node so that I can still pipe the stdout/stderr
filestreams to the (network) filesystem.
I should mention that the master is connected to the slaves with a 10
Gb link on the same managed switch that the slaves use.

On 9/26/15, Evan R. Sparks <evan.spa...@gmail.com> wrote:
> Off the top of my head, I'm not sure, but it looks like virtually all the
> extra time between each stage is accounted for with T_{io} in your plot,
> which I'm guessing is time spent communicating results over the network? Is
> your driver running on the master or is it on a different node? If you look
> at the code for treeAggregate, the last stage uses a .reduce() for the
> final combination, which happens on the driver. In this case, the size of
> the gradients is O(1GB) so if you've got to go over a slow link for the
> last portion this could really make a difference.
>
> On Sat, Sep 26, 2015 at 10:20 AM, Mike Hynes <91m...@gmail.com> wrote:
>
>> Hi Evan,
>>
>> (I just realized my initial email was a reply to the wrong thread; I'm
>> very sorry about this).
>>
>> Thanks for your email, and your thoughts on the sampling. That the
>> gradient computations are essentially the cost of a pass through each
>> element of the partition makes sense, especially given the sparsity of
>> the feature vectors.
>>
>> Would you have any idea why the communication time is so much larger
>> in the final level of the aggregation, however? I can't immediately
>> see why it should take longer to transfer the local gradient vectors
>> in that level, since they are dense in every level. Furthermore, the
>> driver is receiving the result of only 4 tasks, which is relatively
>> small.
>>
>> Mike
>>
>>
>> On 9/26/15, Evan R. Sparks <evan.spa...@gmail.com> wrote:
>> > Mike,
>> >
>> > I believe the reason you're seeing near identical performance on the
>> > gradient computations is twofold
>> > 1) Gradient computations for GLM models are computationally pretty
>> > cheap
>> > from a FLOPs/byte read perspective. They are essentially a BLAS "gemv"
>> call
>> > in the dense case, which is well known to be bound by memory bandwidth
>> > on
>> > modern processors. So, you're basically paying the cost of a scan of
>> > the
>> > points you've sampled to do the gradient computation.
>> > 2) The default sampling mechanism used by the GradientDescent optimizer
>> in
>> > MLlib is implemented via RDD.sample, which does reservoir sampling on
>> each
>> > partition. This requires a full scan of each partition at every
>> > iteration
>> > to collect the samples.
>> >
>> > So - you're going to pay the cost of a scan to do the sampling anyway,
>> and
>> > the gradient computation is essentially free at this point (and can be
>> > pipelined, etc.).
>> >
>> > It is quite possible to improve #2 by coming up with a better sampling
>> > algorithm. One easy algorithm would be to assume the data is already
>> > randomly shuffled (or do that once) and then use the first
>> > miniBatchFraction*partitionSize records on the first iteration, the
>> second
>> > set on the second set on the second iteration, and so on. You could
>> > protoype this algorithm pretty easily by converting your data to an
>> > RDD[Array[DenseVector]] and doing some bookkeeping at each iteration.
>> >
>> > That said - eventually the overheads of the platform catch up to you.
>> > As
>> a
>> > rule of thumb I estimate about 50ms/iteration as a floor for things
>> > like
>> > task serialization and other platform overheads. You've got to balance
>> how
>> > much computation you want to do vs. the amount of time you want to
>> > spend
>> > waiting for the platform.
>> >
>> > - Evan
>> >
>> > On Sat, Sep 26, 2015 at 9:27 AM, Mike Hynes <91m...@gmail.com> wrote:
>> >
>> >> Hello Devs,
>> >>
>> >> This email concerns some timing results for a treeAggregate in
>> >> computing a (stochastic) gradient over an RDD of labelled points, as
>> >> is currently done in the MLlib optimization routine for SGD.
>> >>
>> >> In SGD, the underlying RDD is downsampled by a fraction f \in (0,1],
>> >> and the subgradients over all the instances in the downsampled RDD are
>> >> aggregated to the driver as a dense vector. However, we have noticed
>> >> some unusual behaviour when f < 1: it takes the same amount of time to
>> >> compute the stochastic gradient for a stochastic minibatch as it does
>> >> for a full batch (f = 1).
>> >>
>> >> Attached are two plots of the mean task timing metrics for each level
>> >> in the aggregation, which has been performed with 4 levels (level 4 is
>> >> the final level, in which the results are communicated to the driver).
>> >> 16 nodes are used, and the RDD has 256 partitions. We run in (client)
>> >> standalone mode. Here, the total time for the tasks is shown (\tau)
>> >> alongside the execution time (not counting GC),
>> >> serialization/deserialization time, the GC time, and the difference
>> >> between tau and all other times, assumed to be variable
>> >> IO/communication/waiting time. The RDD in this case is a labelled
>> >> point representation of the KDD Bridge to Algebra dataset, with 20M
>> >> (sparse) instances and a problem dimension of 30M. The sparsity of the
>> >> instances is very high---each individual instance vector may have only
>> >> a hundred nonzeros. All metrics have been taken from the JSON Spark
>> >> event logs.
>> >>
>> >> The plot gradient_f1.pdf shows the times for a gradient computation
>> >> with f = 1, and gradient_f-3.pdf shows the same metrics with f = 1e-3.
>> >> For other f values in {1e-1 1e-2 ... 1e-5}, the same effect is
>> >> observed.
>> >>
>> >> What I would like to mention about these plots, and ask if anyone has
>> >> experience with, is the following:
>> >> 1. The times are essentially identical; I would have thought that
>> >> downsampling the RDD before aggregating the subgradients would at
>> >> least reduce the execution time required, if not the
>> >> communication/serialization times.
>> >> 2. The serialization time in level 4 is almost entirely from the
>> >> result serialization to the driver, and not the task deserialization.
>> >> In each level of the treeAggregation, however, the local (dense)
>> >> gradients have to be communicated between compute nodes, so I am
>> >> surprised that it takes so much longer to return the vectors to the
>> >> driver.
>> >>
>> >> I initially wondered if the large IO overhead in the last stage had
>> >> anything to do with client mode vs cluster mode, since, from what I
>> >> understand, only a single core is allocated to the driver thread in
>> >> client mode. However, when running tests in the two modes, I have
>> >> previously seen no appreciable difference in the running time for
>> >> other (admittedly smaller) problems. Furthermore, I am still very
>> >> confused about why the execution time for each task is just as large
>> >> for the downsampled RDD. It seems unlikely that sampling each
>> >> partition would be as expensive as the gradient computations, even for
>> >> sparse feature vectors.
>> >>
>> >> If anyone has experience working with the sampling in minibatch SGD or
>> >> has tested the scalability of the treeAggregation operation for
>> >> vectors, I'd really appreciate your thoughts.
>> >>
>> >> Thanks,
>> >> Mike
>> >>
>> >>
>> >> ---------------------------------------------------------------------
>> >> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> >> For additional commands, e-mail: dev-h...@spark.apache.org
>> >>
>> >
>>
>>
>> --
>> Thanks,
>> Mike
>>
>


-- 
Thanks,
Mike

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to