Hi Francis, >From my observation when using spark sql, dataframe.limit(n) does not necessarily return the same result each time when running Apps.
To be more precise, in one App, the result should be same for the same n, however, changing n might not result in the same prefix(the result for n = 10 doesn't necessarily start with the result for n = 5.) When running different Apps, results are usually different for the same n. Thanks On Tue, Aug 11, 2015 at 2:56 PM, Francis Lau <francis....@smartsheet.com> wrote: > Has anyone see this issue? I am calling the > LogisticRegressionWithLBFGS.train API and about 7 out of 10 times, I get > an ""Input validation failed" error". The exact same code and dataset works > sometimes but fails at other times. It is odd. I can't seem to find any > info on this. Below is the pyspark code and the error message. I did check > the dataset and all values are zero or greater. There are no blank spaces > or nulls. This code below is pretty much the sample code from the Spark > site. > > Thanks in advance for any help or pointers in how to investigate this > issue. > > -- > *Francis * > > *CODE:* > > from pyspark.mllib.classification import LogisticRegressionWithLBFGS > from pyspark.mllib.regression import LabeledPoint > from numpy import array > > # Load and parse the data > def parsePoint(line): > #values = [float(x) for x in line.split(' ')] > values = [float(x) for x in line.asDict().values()] # need to convert > from Row to Array > return LabeledPoint(values[0], values[1:]) > > # convert SQL to format needed for training model > regData = sqlContext.sql("select statement") > > df = regData.limit(1000) > data = df.rdd > > parsedData = data.map(parsePoint) > > # Build the model > model = LogisticRegressionWithLBFGS.train(parsedData) > > # Evaluating the model on training data > labelsAndPreds = parsedData.map(lambda p: (p.label, > model.predict(p.features))) > trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / > float(parsedData.count()) > > print("Training Error = " + str(trainErr)) > print "Intercept: " + str(model.intercept) > print "Weights: " + str(model.weights) > > > > > *ERROR:* > > --------------------------------------------------------------------------- > Py4JJavaError Traceback (most recent call last) > <ipython-input-134-b31b9c04499a> in <module>() > 20 > 21 # Build the model > ---> 22 model = LogisticRegressionWithLBFGS.train(parsedData) > 23 > 24 # Evaluating the model on training data > > /home/ubuntu/databricks/spark/python/pyspark/mllib/classification.py in > train(cls, data, iterations, initialWeights, regParam, regType, intercept, > corrections, tolerance, validateData, numClasses) > 344 else: > 345 initialWeights = [0.0] * > len(data.first().features) * (numClasses - 1) > --> 346 return _regression_train_wrapper(train, > LogisticRegressionModel, data, initialWeights) > 347 > 348 > > /home/ubuntu/databricks/spark/python/pyspark/mllib/regression.py in > _regression_train_wrapper(train_func, modelClass, data, initial_weights) > 186 if (modelClass == LogisticRegressionModel): > 187 weights, intercept, numFeatures, numClasses = train_func( > --> 188 data, _convert_to_vector(initial_weights)) > 189 return modelClass(weights, intercept, numFeatures, > numClasses) > 190 else: > > /home/ubuntu/databricks/spark/python/pyspark/mllib/classification.py in > train(rdd, i) > 334 return > callMLlibFunc("trainLogisticRegressionModelWithLBFGS", rdd, > int(iterations), i, > 335 float(regParam), regType, > bool(intercept), int(corrections), > --> 336 float(tolerance), > bool(validateData), int(numClasses)) > 337 > 338 if initialWeights is None: > > /home/ubuntu/databricks/spark/python/pyspark/mllib/common.py in > callMLlibFunc(name, *args) > 126 sc = SparkContext._active_spark_context > 127 api = getattr(sc._jvm.PythonMLLibAPI(), name) > --> 128 return callJavaFunc(sc, api, *args) > 129 > 130 > > /home/ubuntu/databricks/spark/python/pyspark/mllib/common.py in > callJavaFunc(sc, func, *args) > 119 """ Call Java Function """ > 120 args = [_py2java(sc, a) for a in args] > --> 121 return _java2py(sc, func(*args)) > 122 > 123 > > /home/ubuntu/databricks/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py > in __call__(self, *args) > 536 answer = self.gateway_client.send_command(command) > 537 return_value = get_return_value(answer, > self.gateway_client, > --> 538 self.target_id, self.name) > 539 > 540 for temp_arg in temp_args: > > /home/ubuntu/databricks/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py > in get_return_value(answer, gateway_client, target_id, name) > 298 raise Py4JJavaError( > 299 'An error occurred while calling {0}{1}{2}.\n'. > --> 300 format(target_id, '.', name), value) > 301 else: > 302 raise Py4JError( > > Py4JJavaError: An error occurred while calling > o5803.trainLogisticRegressionModelWithLBFGS. > : org.apache.spark.SparkException: *Input validation failed.* > at > org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:225) > at > org.apache.spark.mllib.api.python.PythonMLLibAPI.trainRegressionModel(PythonMLLibAPI.scala:81) > at > org.apache.spark.mllib.api.python.PythonMLLibAPI.trainLogisticRegressionModelWithLBFGS(PythonMLLibAPI.scala:270) > at sun.reflect.GeneratedMethodAccessor155.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) > at py4j.Gateway.invoke(Gateway.java:259) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:207) > at java.lang.Thread.run(Thread.java:745) > -- Best Ai