Would you mind letting us know the # training examples in the datasets?
Also, what do your features look like? Are they text, categorical etc? You
mention that most rows only have a few features, and all rows together have
a few 10,000s features, yet your max feature value is 20 million. How are
your constructing your feature vectors to get a 20 million size? The only
realistic way I can see this situation occurring in practice is with
feature hashing (HashingTF).

MultivariateOnlineSummarizer uses dense arrays, but it should be possible
to enable sparse data. Though in theory, the result will tend to be dense
anyway, unless you have very many entries in the input feature vector that
never occur and are actually zero throughout the data set (which it seems
is the case with your data?). So I doubt whether using sparse vectors for
the summarizer would improve performance in general.

LR doesn't accept a sparse weight vector, as it uses dense vectors for
coefficients and gradients currently. When using L1 regularization, it
could support sparse weight vectors, but the current implementation doesn't
do that yet.

On Thu, 10 Mar 2016 at 23:45 Daniel Siegmann <daniel.siegm...@teamaol.com>
wrote:

> Hi Nick,
>
> Thanks for the feedback and the pointers. I tried coalescing to fewer
> partitions and improved the situation dramatically. As you suggested, it is
> communication overhead dominating the overall runtime.
>
> The training run I mentioned originally had 900 partitions. Each tree
> aggregation has two stages, one for the original partitions, and then one
> with the aggregation into a smaller number (at 900 partitions the second
> stage was 30). The first tree aggregation job (the longer one) uses the
> MultivariateOnlineSummarizer you mentioned, while the subsequent
> aggregation jobs use LogisticAggregator (similar problem, though smaller).
>
> I've run some tests with fewer partitions on a very similar data set. 400
> partitions took 8 hours, 100 partitions took 4 hours, and 10 partitions
> took 1.4 hours. I put some screenshots from the Spark UI here:
> http://imgur.com/a/trRJU
>
> Still, these numbers seem oddly high. With 10 partitions it's shuffling
> only some 200 MB per job, but the median "Getting Result Time" is 2.1
> minutes. I would expected it to take *seconds* to transfer that data.
>
> Anyway, the MultivariateOnlineSummarizer creates several arrays of
> doubles equal to the size of the vector - arrays of course are inherently
> dense. While this is only one iteration it is the longest, taking a
> significant portion of the time by itself. LogisticAggregator meanwhile
> has fewer arrays, but if you try to pass coefficients as anything other
> than a dense vector it actually throws an error! Any idea why? Anyone know
> a reason these aggregators *must* store their data densely, or is just an
> implementation choice? Perhaps refactoring these classes to store data
> sparsely would fix the issue.
>
> On Wed, Mar 9, 2016 at 7:57 AM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>
>> Hi Daniel
>>
>> The bottleneck in Spark ML is most likely (a) the fact that the weight
>> vector itself is dense, and (b) the related communication via the driver. A
>> tree aggregation mechanism is used for computing gradient sums (see
>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala#L1080
>>  and
>> https://databricks.com/blog/2014/09/22/spark-1-1-mllib-performance-improvements.html),
>> which helps efficiency, but ultimately the driver must collect that
>> gradient vector and re-broadcast the updated weight vector on every
>> iteration.
>>
>> From a quick glance, MultivariateOnlineSummarizer doesn't seem optimized
>> to sparse data either (though that is only one pass of the dataset so
>> doubtful it adds too much overhead).
>>
>> It would be helpful to understand some further details:
>> 1. Some more exact timing numbers - if you could provide screenshots /
>> output from the UI to indicate the stages and call sites where the time is
>> being spent that would be really useful
>> 2. Is this part of a pipeline, and if so, what is the contribution of
>> other parts of that pipeline to overall runtime?
>> 3. Some stats on input / output data sizes from the critical stages
>> (again from the UI)
>> 4. The dataset size (# examples, avg sparsity % per example, etc)
>> 5. Related to (4), the number of partitions of your dataset
>> 6. Cluster details (# nodes and spec), as well as Spark version
>>
>> If you have a lot of partitions, you could find performance will be
>> better with fewer partitions because the communication overhead will tend
>> to dominate the overall runtime.
>>
>> Still, 10 hours and >100GB of driver memory seems extreme for a 20
>> million size dense weight vector (which should only be a few 100MB memory),
>> so perhaps something else is going on.
>>
>> Nick
>>
>> On Tue, 8 Mar 2016 at 22:55 Daniel Siegmann <daniel.siegm...@teamaol.com>
>> wrote:
>>
>>> Just for the heck of it I tried the old MLlib implementation, but it had
>>> the same scalability problem.
>>>
>>> Anyone familiar with the logistic regression implementation who could
>>> weigh in?
>>>
>>> On Mon, Mar 7, 2016 at 5:35 PM, Michał Zieliński <
>>> zielinski.mich...@gmail.com> wrote:
>>>
>>>> We're using SparseVector columns in a DataFrame, so they are definitely
>>>> supported. But maybe for LR some implicit magic is happening inside.
>>>>
>>>> On 7 March 2016 at 23:04, Devin Jones <devin.jo...@columbia.edu> wrote:
>>>>
>>>>> I could be wrong but its possible that toDF populates a dataframe
>>>>> which I understand do not support sparsevectors at the moment.
>>>>>
>>>>> If you use the MlLib logistic regression implementation (not ml) you
>>>>> can pass the RDD[LabeledPoint] data type directly to the learner.
>>>>>
>>>>>
>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
>>>>>
>>>>> Only downside is that you can't use the pipeline framework from spark
>>>>> ml.
>>>>>
>>>>> Cheers,
>>>>> Devin
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Mar 7, 2016 at 4:54 PM, Daniel Siegmann <
>>>>> daniel.siegm...@teamaol.com> wrote:
>>>>>
>>>>>> Yes, it is a SparseVector. Most rows only have a few features, and
>>>>>> all the rows together only have tens of thousands of features, but the
>>>>>> vector size is ~ 20 million because that is the largest feature.
>>>>>>
>>>>>> On Mon, Mar 7, 2016 at 4:31 PM, Devin Jones <devin.jo...@columbia.edu
>>>>>> > wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Which data structure are you using to train the model? If you
>>>>>>> haven't tried yet, you should consider the SparseVector
>>>>>>>
>>>>>>>
>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.SparseVector
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Mar 7, 2016 at 4:03 PM, Daniel Siegmann <
>>>>>>> daniel.siegm...@teamaol.com> wrote:
>>>>>>>
>>>>>>>> I recently tried to a model using
>>>>>>>> org.apache.spark.ml.classification.LogisticRegression on a data
>>>>>>>> set where the feature vector size was around ~20 million. It did
>>>>>>>> *not* go well. It took around 10 hours to train on a substantial
>>>>>>>> cluster. Additionally, it pulled a lot data back to the driver - I
>>>>>>>> eventually set --conf spark.driver.memory=128g --conf
>>>>>>>> spark.driver.maxResultSize=112g when submitting.
>>>>>>>>
>>>>>>>> Attempting the same application on the same cluster with the
>>>>>>>> feature vector size reduced to 100k took only ~ 9 minutes. Clearly 
>>>>>>>> there is
>>>>>>>> an issue with scaling to large numbers of features. I'm not doing 
>>>>>>>> anything
>>>>>>>> fancy in my app, here's the relevant code:
>>>>>>>>
>>>>>>>> val lr = new LogisticRegression().setRegParam(1)
>>>>>>>> val model = lr.fit(trainingSet.toDF())
>>>>>>>>
>>>>>>>> In comparison, a coworker trained a logistic regression model on
>>>>>>>> her *laptop* using the Java library liblinear in just a few
>>>>>>>> minutes. That's with the ~20 million-sized feature vectors. This 
>>>>>>>> suggests
>>>>>>>> to me there is some issue with Spark ML's implementation of logistic
>>>>>>>> regression which is limiting its scalability.
>>>>>>>>
>>>>>>>> Note that my feature vectors are *very* sparse. The maximum
>>>>>>>> feature is around 20 million, but I think there are only 10's of 
>>>>>>>> thousands
>>>>>>>> of features.
>>>>>>>>
>>>>>>>> Has anyone run into this? Any idea where the bottleneck is or how
>>>>>>>> this problem might be solved?
>>>>>>>>
>>>>>>>> One solution of course is to implement some dimensionality
>>>>>>>> reduction. I'd really like to avoid this, as it's just another thing to
>>>>>>>> deal with - not so hard to put it into the trainer, but then anything 
>>>>>>>> doing
>>>>>>>> scoring will need the same logic. Unless Spark ML supports this out of 
>>>>>>>> the
>>>>>>>> box? An easy way to save / load a model along with the dimensionality
>>>>>>>> reduction logic so when transform is called on the model it will 
>>>>>>>> handle the
>>>>>>>> dimensionality reduction transparently?
>>>>>>>>
>>>>>>>> Any advice would be appreciated.
>>>>>>>>
>>>>>>>> ~Daniel Siegmann
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Reply via email to