I can't reproduce it (on Databricks / Spark 2.4), but as you say, sounds really specific to some way of executing it. I can't off the top of my head imagine why that would be. As you say, no matter the model, it should be the same result. I don't recall a bug being fixed around there, but nevertheless you might try 2.4.6 or 3.0.0 just to see. As a workaround, maybe avoid the UDF and just select "probability[1]" with Spark SQL - I think that might work.
On Fri, Jul 17, 2020 at 4:24 AM Ben Smith <benjamin.smi...@baesystems.com> wrote: > > Hi, > > I am having an issue that looks like a potentially serious bug with Spark > 2.4.3 as it impacts data accuracy. I have searched in the Spark Jira and > mail lists as best I can and cannot find reference to anyone else having > this issue. I am not sure if this would be suitable for raising as a bug in > the Spark Jira so thought I should request help here. > > The simplest summary of my suspected bug is: > Using pyspark with Spark 2.4.3 a MultiLayerPerceptron model givens > inconsistent outputs if a large amount of data is fed into it and at least > one of the model outputs is fed to a Python UDF. > > After doing some debugging I haven't been able to get to the bottom of this > or recreate it 100% reliably (it does happen very frequently), but I have > narrowed the problem down somewhat and produced some stripped down code that > demonstrates the problem. Some observations I have made while doing this > are: > - I can recreate the problem with a very simple MultilayerPerceptron with no > hidden layers (just 14 features and 2 outputs), I also see it with a more > complex MultilayerPerceptron model. > - I cannot recreate the problem unless the model output is fed to a python > UDF, removing this leads to good outputs for the model and having it means > that model outputs are inconsistent (note that not just the Python UDF > outputs are inconsistent) > - I cannot recreate the problem on minuscule amounts of data or when my data > is partitioned heavily. 100,000 rows of input with 2 partitions sees the > issue happen most of the time. > - Some of the bad outputs I get could be explained if certain features were > zero when they came into the model (when they are not in my actual feature > data) > - I can recreate the problem on several different environments (with the > same setup) so I don;t think its an issue with my hardware. > > My environment is CentOS 7.6 with Python 3.6.3 and Spark 2.4.3. I do not > have the native libraries for mllib installed. I'm aware later release of > Spark are available so please let me know if this is a problem (I would have > difficulty getting a later release installed on my environment, otherwise I > would test with that myself). > > The below code sample triggers the problem for me the vast majority of the > time when run from a pyspark shell. This code generates a dataframe > containing 100,000 identical rows, transforms it with a MultiLayerPerceptron > model and feeds one of the model output columns to a simple Python UDF to > generate an additional column. The resulting dataframe has the distinct rows > selected and since all the inputs are identical I would expect to get 1 row > back, instead I get unique many rows with the number returned varying each > time I run the code. To run the code you will need the model files locally. > I have attached the model as a zip archive to this message (I hope), > unzipping this to /tmp should be all you need to do. > > Please let me know if I have done anything wrong in this report. I haven't > posted to a mailing list like this before so am unsure on the format and > expectations when raising a message. > > model.zip > <http://apache-spark-user-list.1001560.n3.nabble.com/file/t10909/model.zip> > > > import pyspark > from pyspark.sql import functions as func > > from math import log10 > > import pyspark > from pyspark.ml import * > > from pyspark.ml.classification import MultilayerPerceptronClassifier, > MultilayerPerceptronClassificationModel > from pyspark.ml.feature import VectorAssembler > > from pyspark.sql import Window > from pyspark.sql.types import FloatType > > ############# > sc.stop() > > conf = pyspark.SparkConf().setMaster( > 'local' > ).setAppName( > 'bug-Testing2' > ).set( > 'spark.executor.memory', '1G' > ).set( > 'spark.executor.cores', '1' > ).set( > 'spark.executor.instances', '1' > ).set( > 'spark.sql.shuffle.partitions', '1' > ).set( > 'spark.default.parallelism', '2' > ) > > sc = pyspark.SparkContext(conf=conf) > spark = pyspark.sql.SparkSession(sc) > ############# > > data_array = [] > > for a1 in range(0,100000,1): > data_array.append(( > 1,1,1,1,1,1,1,1,1,1,1,1,1,1 > )) > > df = spark.createDataFrame( > data_array > ) > > > mlp_model = > MultilayerPerceptronClassificationModel.load("file:///tmp/model") > > > features_vector = VectorAssembler( > inputCols=[ > '_1', > '_2', > '_3', > '_4', > '_5', > '_6', > '_7', > '_8', > '_9', > '_10', > '_11', > '_12', > '_13', > '_14'], > outputCol="scaledFeatures" > ).transform(df).select( > [ > 'scaledFeatures' > ] > ) > > features_vector.cache() > > def __return( > vec, > position): > return float(vec[position]) > > __return_udf = func.udf(__return, FloatType()) > > > transform_result = mlp_model.transform(features_vector) > > final_results = transform_result.withColumn( > 'python_score', > __return_udf( > 'probability', > func.lit(1) > ) > ) > > final_results.select('python_score','rawPrediction','probability').distinct().collect() > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org