wbo4958 commented on code in PR #49547: URL: https://github.com/apache/spark/pull/49547#discussion_r1920167547
########## python/pyspark/ml/tests/test_evaluation.py: ########## @@ -14,18 +14,368 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +import tempfile import unittest import numpy as np -from pyspark.ml.evaluation import ClusteringEvaluator, RegressionEvaluator +from pyspark.ml.evaluation import ( + ClusteringEvaluator, + RegressionEvaluator, + BinaryClassificationEvaluator, + MulticlassClassificationEvaluator, + MultilabelClassificationEvaluator, + RankingEvaluator, +) from pyspark.ml.linalg import Vectors -from pyspark.sql import Row -from pyspark.testing.mlutils import SparkSessionTestCase +from pyspark.sql import Row, SparkSession + + +class EvaluatorTestsMixin: + def test_ranking_evaluator(self): + scoreAndLabels = [ + ([1.0, 6.0, 2.0, 7.0, 8.0, 3.0, 9.0, 10.0, 4.0, 5.0], [1.0, 2.0, 3.0, 4.0, 5.0]), + ([4.0, 1.0, 5.0, 6.0, 2.0, 7.0, 3.0, 8.0, 9.0, 10.0], [1.0, 2.0, 3.0]), + ([1.0, 2.0, 3.0, 4.0, 5.0], []), + ] + dataset = self.spark.createDataFrame(scoreAndLabels, ["prediction", "label"]) + + # Initialize RankingEvaluator + evaluator = RankingEvaluator().setPredictionCol("prediction") + + # Evaluate the dataset using the default metric (mean average precision) + mean_average_precision = evaluator.evaluate(dataset) + self.assertTrue(np.allclose(mean_average_precision, 0.3550, atol=1e-4)) + + # Evaluate the dataset using precisionAtK for k=2 + precision_at_k = evaluator.evaluate( + dataset, {evaluator.metricName: "precisionAtK", evaluator.k: 2} + ) + self.assertTrue(np.allclose(precision_at_k, 0.3333, atol=1e-4)) + + # read/write + with tempfile.TemporaryDirectory(prefix="save") as tmp_dir: + # Save the evaluator + ranke_path = tmp_dir + "/ranke" + evaluator.write().overwrite().save(ranke_path) + # Load the saved evaluator + evaluator2 = RankingEvaluator.load(ranke_path) + self.assertEqual(evaluator2.getPredictionCol(), "prediction") + + def test_multilabel_classification_evaluator(self): + dataset = self.spark.createDataFrame( + [ + ([0.0, 1.0], [0.0, 2.0]), + ([0.0, 2.0], [0.0, 1.0]), + ([], [0.0]), + ([2.0], [2.0]), + ([2.0, 0.0], [2.0, 0.0]), + ([0.0, 1.0, 2.0], [0.0, 1.0]), + ([1.0], [1.0, 2.0]), + ], + ["prediction", "label"], + ) + + evaluator = MultilabelClassificationEvaluator().setPredictionCol("prediction") + + # Evaluate the dataset using the default metric (f1 measure by default) + f1_score = evaluator.evaluate(dataset) + self.assertTrue(np.allclose(f1_score, 0.6380, atol=1e-4)) + # Evaluate the dataset using accuracy + accuracy = evaluator.evaluate(dataset, {evaluator.metricName: "accuracy"}) + self.assertTrue(np.allclose(accuracy, 0.5476, atol=1e-4)) + + # read/write + with tempfile.TemporaryDirectory(prefix="save") as tmp_dir: + # Save the evaluator + mlce_path = tmp_dir + "/mlce" + evaluator.write().overwrite().save(mlce_path) + # Load the saved evaluator + evaluator2 = MultilabelClassificationEvaluator.load(mlce_path) + self.assertEqual(evaluator2.getPredictionCol(), "prediction") + + def test_multiclass_classification_evaluator(self): + dataset = self.spark.createDataFrame( + [ + (0.0, 0.0), + (0.0, 1.0), + (0.0, 0.0), + (1.0, 0.0), + (1.0, 1.0), + (1.0, 1.0), + (1.0, 1.0), + (2.0, 2.0), + (2.0, 0.0), + ], + ["prediction", "label"], + ) + + evaluator = MulticlassClassificationEvaluator().setPredictionCol("prediction") + + f1_score = evaluator.evaluate(dataset) + self.assertTrue(np.allclose(f1_score, 0.6613, atol=1e-4)) + + # Evaluate the dataset using accuracy + accuracy = evaluator.evaluate(dataset, {evaluator.metricName: "accuracy"}) + self.assertTrue(np.allclose(accuracy, 0.6666, atol=1e-4)) + + # Evaluate the true positive rate for label 1.0 + true_positive_rate_label_1 = evaluator.evaluate( + dataset, {evaluator.metricName: "truePositiveRateByLabel", evaluator.metricLabel: 1.0} + ) + self.assertEqual(true_positive_rate_label_1, 0.75) + + # Set the metric to Hamming loss + evaluator.setMetricName("hammingLoss") + + # Evaluate the dataset using Hamming loss + hamming_loss = evaluator.evaluate(dataset) + self.assertTrue(np.allclose(hamming_loss, 0.3333, atol=1e-4)) + + # read/write + with tempfile.TemporaryDirectory(prefix="save") as tmp_dir: + # Save the evaluator + mce_path = tmp_dir + "/mce" + evaluator.write().overwrite().save(mce_path) + # Load the saved evaluator + evaluator2 = MulticlassClassificationEvaluator.load(mce_path) + self.assertEqual(evaluator2.getPredictionCol(), "prediction") + + # Create a DataFrame with weights + dataset_with_weight = self.spark.createDataFrame( + [ + (0.0, 0.0, 1.0), + (0.0, 1.0, 1.0), + (0.0, 0.0, 1.0), + (1.0, 0.0, 1.0), + (1.0, 1.0, 1.0), + (1.0, 1.0, 1.0), + (1.0, 1.0, 1.0), + (2.0, 2.0, 1.0), + (2.0, 0.0, 1.0), + ], + ["prediction", "label", "weight"], + ) + + # Initialize MulticlassClassificationEvaluator with weight column + evaluator = MulticlassClassificationEvaluator( + predictionCol="prediction", weightCol="weight" + ) + + # Evaluate the dataset with weights using default metric (f1 score) + weighted_f1_score = evaluator.evaluate(dataset_with_weight) + self.assertTrue(np.allclose(weighted_f1_score, 0.6613, atol=1e-4)) + + # Evaluate the dataset with weights using accuracy + weighted_accuracy = evaluator.evaluate( + dataset_with_weight, {evaluator.metricName: "accuracy"} + ) + self.assertTrue(np.allclose(weighted_accuracy, 0.6666, atol=1e-4)) + + # Create a DataFrame with probabilities + dataset_with_probabilities = self.spark.createDataFrame( + [ + (1.0, 1.0, 1.0, [0.1, 0.8, 0.1]), + (0.0, 2.0, 1.0, [0.9, 0.05, 0.05]), + (0.0, 0.0, 1.0, [0.8, 0.2, 0.0]), + (1.0, 1.0, 1.0, [0.3, 0.65, 0.05]), + ], + ["prediction", "label", "weight", "probability"], + ) + # Initialize MulticlassClassificationEvaluator with probability column + evaluator = MulticlassClassificationEvaluator( + predictionCol="prediction", probabilityCol="probability" + ) + # Set the metric to log loss + evaluator.setMetricName("logLoss") + # Evaluate the dataset using log loss + log_loss = evaluator.evaluate(dataset_with_probabilities) + self.assertTrue(np.allclose(log_loss, 0.9682, atol=1e-4)) + + def test_binary_classification_evaluator(self): + # Define score and labels data + scoreAndLabels = map( + lambda x: (Vectors.dense([1.0 - x[0], x[0]]), x[1]), + [(0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.6, 0.0), (0.6, 1.0), (0.6, 1.0), (0.8, 1.0)], + ) + dataset = self.spark.createDataFrame(scoreAndLabels, ["raw", "label"]) + + evaluator = BinaryClassificationEvaluator().setRawPredictionCol("raw") + auc_roc = evaluator.evaluate(dataset) + self.assertTrue(np.allclose(auc_roc, 0.7083, atol=1e-4)) + + # Evaluate the dataset using the areaUnderPR metric + auc_pr = evaluator.evaluate(dataset, {evaluator.metricName: "areaUnderPR"}) + self.assertTrue(np.allclose(auc_pr, 0.8339, atol=1e-4)) + # read/write + with tempfile.TemporaryDirectory(prefix="save") as tmp_dir: + # Save the evaluator + bce_path = tmp_dir + "/bce" + evaluator.write().overwrite().save(bce_path) + # Load the saved evaluator + evaluator2 = BinaryClassificationEvaluator.load(bce_path) + self.assertEqual(evaluator2.getRawPredictionCol(), "raw") + + # Define score, labels, and weights data + scoreAndLabelsAndWeight = map( + lambda x: (Vectors.dense([1.0 - x[0], x[0]]), x[1], x[2]), + [ + (0.1, 0.0, 1.0), + (0.1, 1.0, 0.9), + (0.4, 0.0, 0.7), + (0.6, 0.0, 0.9), + (0.6, 1.0, 1.0), + (0.6, 1.0, 0.3), + (0.8, 1.0, 1.0), + ], + ) + # Create a DataFrame with weights + dataset_with_weight = self.spark.createDataFrame( + scoreAndLabelsAndWeight, ["raw", "label", "weight"] + ) + + evaluator = BinaryClassificationEvaluator(rawPredictionCol="raw", weightCol="weight") + + # Evaluate the dataset with weights using the default metric (areaUnderROC) + auc_roc_weighted = evaluator.evaluate(dataset_with_weight) + self.assertTrue(np.allclose(auc_roc_weighted, 0.7025, atol=1e-4)) + + # Evaluate the dataset with weights using the areaUnderPR metric + auc_pr_weighted = evaluator.evaluate( + dataset_with_weight, {evaluator.metricName: "areaUnderPR"} + ) + self.assertTrue(np.allclose(auc_pr_weighted, 0.8221, atol=1e-4)) + + # Get the number of bins used to compute areaUnderROC + num_bins = evaluator.getNumBins() + self.assertTrue(num_bins, 0.1000) Review Comment: my bad, should be "assertEqual" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org