stanlocht commented on code in PR #50644:
URL: https://github.com/apache/spark/pull/50644#discussion_r2081946148


##########
python/pyspark/testing/utils.py:
##########
@@ -1060,18 +1092,1108 @@ def assert_rows_equal(
         actual_list = sorted(actual_list, key=lambda x: str(x))
         expected_list = sorted(expected_list, key=lambda x: str(x))
 
-    assert_rows_equal(actual_list, expected_list, maxErrors=maxErrors, 
showOnlyDiff=showOnlyDiff)
+    assert_rows_equal(
+        actual_list,
+        expected_list,
+        maxErrors=maxErrors,
+        showOnlyDiff=showOnlyDiff,
+    )
+
+
+def assertColumnUnique(
+    df: Union[DataFrame, "pandas.DataFrame", "pyspark.pandas.DataFrame"],
+    columns: Union[str, List[str]],
+    message: Optional[str] = None,
+) -> None:
+    """Assert that the specified column(s) in a DataFrame contain unique 
values.
+
+    This function checks if the values in the specified column(s) are unique. 
If not,
+    it raises an AssertionError with details about the duplicate values.
+
+    Supports Spark, Spark Connect, pandas, and pandas-on-Spark DataFrames.
+
+    .. versionadded:: 4.0.0
+
+    Parameters
+    ----------
+    df : DataFrame (Spark, Spark Connect, pandas, or pandas-on-Spark)
+        The DataFrame to check for uniqueness.
+    columns : str or list of str
+        The column name(s) to check for uniqueness. Can be a single column 
name or a list
+        of column names. If a list is provided, the combination of values 
across these
+        columns is checked for uniqueness.
+    message : str, optional
+        Custom error message to include if the assertion fails.
+
+    Raises
+    ------
+    AssertionError
+        If the specified column(s) contain duplicate values.
+    PySparkAssertionError
+        If the input DataFrame is not of a supported type or is a streaming 
DataFrame.
+    ValueError
+        If the columns parameter is invalid or if specified columns don't 
exist in the
+        DataFrame.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([(1, "a"), (2, "b"), (3, "c")], ["id", 
"value"])
+    >>> assertColumnUnique(df, "id")  # passes, 'id' column has unique values
+
+    >>> df = spark.createDataFrame([(1, "a"), (1, "b"), (3, "c")], ["id", 
"value"])
+    >>> assertColumnUnique(df, "id")  # doctest: +IGNORE_EXCEPTION_DETAIL
+    Traceback (most recent call last):
+    ...
+    AssertionError: Column 'id' contains duplicate values.
+
+    Check multiple columns for uniqueness (composite key):
+
+    >>> df = spark.createDataFrame([(1, "a"), (1, "b"), (2, "a")], ["id", 
"value"])
+    >>> assertColumnUnique(df, ["id", "value"])  # passes, combinations are 
unique
+
+    >>> df = spark.createDataFrame([(1, "a"), (1, "a"), (2, "b")], ["id", 
"value"])
+    >>> assertColumnUnique(df, ["id", "value"])  # doctest: 
+IGNORE_EXCEPTION_DETAIL
+    Traceback (most recent call last):
+    ...
+    AssertionError: Columns ['id', 'value'] contains duplicate values.
+
+    """
+    if df is None:
+        raise PySparkAssertionError(
+            errorClass="INVALID_TYPE_DF_EQUALITY_ARG",
+            messageParameters={
+                "expected_type": "Union[DataFrame, pandas.DataFrame, 
pyspark.pandas.DataFrame]",
+                "arg_name": "df",
+                "actual_type": None,
+            },
+        )
+
+    # Validate columns parameter
+    if not columns:
+        raise ValueError("The 'columns' parameter cannot be empty.")
+
+    if isinstance(columns, str):
+        columns = [columns]
+    elif not isinstance(columns, list):
+        raise ValueError("The 'columns' parameter must be a string or a list 
of strings.")
+
+    has_pandas = False
+    try:
+        # If pandas dependencies are available, allow pandas or 
pandas-on-Spark DataFrame
+        import pandas as pd
+
+        has_pandas = True
+    except ImportError:
+        # no pandas, so we won't call pandasutils functions
+        pass
+
+    # Check if pyarrow is available
+    has_arrow = False
+    try:
+        import importlib.util
+
+        has_arrow = importlib.util.find_spec("pyarrow") is not None
+    except ImportError:
+        pass
+
+    # Handle pandas and pandas-on-Spark DataFrames
+    if has_pandas and has_arrow:
+        import pyspark.pandas as ps
+
+        if isinstance(df, pd.DataFrame):
+            # Check if all columns exist in the DataFrame
+            missing_columns = [col for col in columns if col not in df.columns]
+            if missing_columns:
+                raise ValueError(
+                    f"The following columns do not exist in the DataFrame: 
{missing_columns}"
+                )
+
+            # Check for duplicates using pandas methods
+            if len(columns) == 1:
+                # For a single column, use duplicated() method
+                duplicates = df[df[columns[0]].duplicated(keep=False)]
+                if not duplicates.empty:
+                    # Get examples of duplicates
+                    duplicate_examples = duplicates.head(5)
+                    examples_str = "\n".join([str(row) for _, row in 
duplicate_examples.iterrows()])
+
+                    # Create error message
+                    error_msg = f"Column '{columns[0]}' contains duplicate 
values.\n"
+                    error_msg += f"Examples of duplicates:\n{examples_str}"
+
+                    if message:
+                        error_msg += f"\n{message}"
+

Review Comment:
   I refactored this to handle the pandas type dataframes with new methods on 
the `PandasOnSparkTestUtils` class, similarly as to how it is done in the 
`assertDataFrameEqual` function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to