Hi, I am having an issue that looks like a potentially serious bug with Spark 2.4.3 as it impacts data accuracy. I have searched in the Spark Jira and mail lists as best I can and cannot find reference to anyone else having this issue. I am not sure if this would be suitable for raising as a bug in the Spark Jira so thought I should request help here.
The simplest summary of my suspected bug is: Using pyspark with Spark 2.4.3 a MultiLayerPerceptron model givens inconsistent outputs if a large amount of data is fed into it and at least one of the model outputs is fed to a Python UDF. After doing some debugging I haven't been able to get to the bottom of this or recreate it 100% reliably (it does happen very frequently), but I have narrowed the problem down somewhat and produced some stripped down code that demonstrates the problem. Some observations I have made while doing this are: - I can recreate the problem with a very simple MultilayerPerceptron with no hidden layers (just 14 features and 2 outputs), I also see it with a more complex MultilayerPerceptron model. - I cannot recreate the problem unless the model output is fed to a python UDF, removing this leads to good outputs for the model and having it means that model outputs are inconsistent (note that not just the Python UDF outputs are inconsistent) - I cannot recreate the problem on minuscule amounts of data or when my data is partitioned heavily. 100,000 rows of input with 2 partitions sees the issue happen most of the time. - Some of the bad outputs I get could be explained if certain features were zero when they came into the model (when they are not in my actual feature data) - I can recreate the problem on several different environments (with the same setup) so I don;t think its an issue with my hardware. My environment is CentOS 7.6 with Python 3.6.3 and Spark 2.4.3. I do not have the native libraries for mllib installed. I'm aware later release of Spark are available so please let me know if this is a problem (I would have difficulty getting a later release installed on my environment, otherwise I would test with that myself). The below code sample triggers the problem for me the vast majority of the time when run from a pyspark shell. This code generates a dataframe containing 100,000 identical rows, transforms it with a MultiLayerPerceptron model and feeds one of the model output columns to a simple Python UDF to generate an additional column. The resulting dataframe has the distinct rows selected and since all the inputs are identical I would expect to get 1 row back, instead I get unique many rows with the number returned varying each time I run the code. To run the code you will need the model files locally. I have attached the model as a zip archive to this message (I hope), unzipping this to /tmp should be all you need to do. Please let me know if I have done anything wrong in this report. I haven't posted to a mailing list like this before so am unsure on the format and expectations when raising a message. model.zip <http://apache-spark-user-list.1001560.n3.nabble.com/file/t10909/model.zip> import pyspark from pyspark.sql import functions as func from math import log10 import pyspark from pyspark.ml import * from pyspark.ml.classification import MultilayerPerceptronClassifier, MultilayerPerceptronClassificationModel from pyspark.ml.feature import VectorAssembler from pyspark.sql import Window from pyspark.sql.types import FloatType ############# sc.stop() conf = pyspark.SparkConf().setMaster( 'local' ).setAppName( 'bug-Testing2' ).set( 'spark.executor.memory', '1G' ).set( 'spark.executor.cores', '1' ).set( 'spark.executor.instances', '1' ).set( 'spark.sql.shuffle.partitions', '1' ).set( 'spark.default.parallelism', '2' ) sc = pyspark.SparkContext(conf=conf) spark = pyspark.sql.SparkSession(sc) ############# data_array = [] for a1 in range(0,100000,1): data_array.append(( 1,1,1,1,1,1,1,1,1,1,1,1,1,1 )) df = spark.createDataFrame( data_array ) mlp_model = MultilayerPerceptronClassificationModel.load("file:///tmp/model") features_vector = VectorAssembler( inputCols=[ '_1', '_2', '_3', '_4', '_5', '_6', '_7', '_8', '_9', '_10', '_11', '_12', '_13', '_14'], outputCol="scaledFeatures" ).transform(df).select( [ 'scaledFeatures' ] ) features_vector.cache() def __return( vec, position): return float(vec[position]) __return_udf = func.udf(__return, FloatType()) transform_result = mlp_model.transform(features_vector) final_results = transform_result.withColumn( 'python_score', __return_udf( 'probability', func.lit(1) ) ) final_results.select('python_score','rawPrediction','probability').distinct().collect() -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org