asl3 commented on code in PR #50644:
URL: https://github.com/apache/spark/pull/50644#discussion_r2059059636


##########
python/pyspark/testing/utils.py:
##########
@@ -580,6 +598,7 @@ def compare_datatypes_ignore_nullable(dt1: Any, dt2: Any):
 
 if TYPE_CHECKING:
     import pandas
+

Review Comment:
   nit: revert



##########
python/pyspark/testing/utils.py:
##########
@@ -1060,18 +1092,1108 @@ def assert_rows_equal(
         actual_list = sorted(actual_list, key=lambda x: str(x))
         expected_list = sorted(expected_list, key=lambda x: str(x))
 
-    assert_rows_equal(actual_list, expected_list, maxErrors=maxErrors, 
showOnlyDiff=showOnlyDiff)
+    assert_rows_equal(
+        actual_list,
+        expected_list,
+        maxErrors=maxErrors,
+        showOnlyDiff=showOnlyDiff,
+    )
+
+
+def assertColumnUnique(
+    df: Union[DataFrame, "pandas.DataFrame", "pyspark.pandas.DataFrame"],
+    columns: Union[str, List[str]],
+    message: Optional[str] = None,
+) -> None:
+    """Assert that the specified column(s) in a DataFrame contain unique 
values.
+
+    This function checks if the values in the specified column(s) are unique. 
If not,
+    it raises an AssertionError with details about the duplicate values.
+
+    Supports Spark, Spark Connect, pandas, and pandas-on-Spark DataFrames.
+
+    .. versionadded:: 4.0.0
+
+    Parameters
+    ----------
+    df : DataFrame (Spark, Spark Connect, pandas, or pandas-on-Spark)
+        The DataFrame to check for uniqueness.
+    columns : str or list of str
+        The column name(s) to check for uniqueness. Can be a single column 
name or a list
+        of column names. If a list is provided, the combination of values 
across these
+        columns is checked for uniqueness.
+    message : str, optional
+        Custom error message to include if the assertion fails.
+
+    Raises
+    ------
+    AssertionError
+        If the specified column(s) contain duplicate values.
+    PySparkAssertionError
+        If the input DataFrame is not of a supported type or is a streaming 
DataFrame.
+    ValueError
+        If the columns parameter is invalid or if specified columns don't 
exist in the
+        DataFrame.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([(1, "a"), (2, "b"), (3, "c")], ["id", 
"value"])
+    >>> assertColumnUnique(df, "id")  # passes, 'id' column has unique values
+
+    >>> df = spark.createDataFrame([(1, "a"), (1, "b"), (3, "c")], ["id", 
"value"])
+    >>> assertColumnUnique(df, "id")  # doctest: +IGNORE_EXCEPTION_DETAIL
+    Traceback (most recent call last):
+    ...
+    AssertionError: Column 'id' contains duplicate values.
+
+    Check multiple columns for uniqueness (composite key):
+
+    >>> df = spark.createDataFrame([(1, "a"), (1, "b"), (2, "a")], ["id", 
"value"])
+    >>> assertColumnUnique(df, ["id", "value"])  # passes, combinations are 
unique
+
+    >>> df = spark.createDataFrame([(1, "a"), (1, "a"), (2, "b")], ["id", 
"value"])
+    >>> assertColumnUnique(df, ["id", "value"])  # doctest: 
+IGNORE_EXCEPTION_DETAIL
+    Traceback (most recent call last):
+    ...
+    AssertionError: Columns ['id', 'value'] contains duplicate values.
+
+    """
+    if df is None:
+        raise PySparkAssertionError(
+            errorClass="INVALID_TYPE_DF_EQUALITY_ARG",
+            messageParameters={
+                "expected_type": "Union[DataFrame, pandas.DataFrame, 
pyspark.pandas.DataFrame]",
+                "arg_name": "df",
+                "actual_type": None,
+            },
+        )
+
+    # Validate columns parameter
+    if not columns:
+        raise ValueError("The 'columns' parameter cannot be empty.")
+
+    if isinstance(columns, str):
+        columns = [columns]
+    elif not isinstance(columns, list):
+        raise ValueError("The 'columns' parameter must be a string or a list 
of strings.")
+
+    has_pandas = False
+    try:
+        # If pandas dependencies are available, allow pandas or 
pandas-on-Spark DataFrame
+        import pandas as pd
+
+        has_pandas = True
+    except ImportError:
+        # no pandas, so we won't call pandasutils functions
+        pass
+
+    # Check if pyarrow is available
+    has_arrow = False
+    try:
+        import importlib.util
+
+        has_arrow = importlib.util.find_spec("pyarrow") is not None
+    except ImportError:
+        pass
+
+    # Handle pandas and pandas-on-Spark DataFrames
+    if has_pandas and has_arrow:
+        import pyspark.pandas as ps
+
+        if isinstance(df, pd.DataFrame):
+            # Check if all columns exist in the DataFrame
+            missing_columns = [col for col in columns if col not in df.columns]
+            if missing_columns:
+                raise ValueError(
+                    f"The following columns do not exist in the DataFrame: 
{missing_columns}"
+                )
+
+            # Check for duplicates using pandas methods
+            if len(columns) == 1:
+                # For a single column, use duplicated() method
+                duplicates = df[df[columns[0]].duplicated(keep=False)]
+                if not duplicates.empty:
+                    # Get examples of duplicates
+                    duplicate_examples = duplicates.head(5)
+                    examples_str = "\n".join([str(row) for _, row in 
duplicate_examples.iterrows()])
+
+                    # Create error message
+                    error_msg = f"Column '{columns[0]}' contains duplicate 
values.\n"
+                    error_msg += f"Examples of duplicates:\n{examples_str}"
+
+                    if message:
+                        error_msg += f"\n{message}"
+

Review Comment:
   it seems like this logic is repeated across `pd.DataFrame`, `ps.DataFrame`, 
Spark DataFrame - can we refactor into a shared util?



##########
python/pyspark/sql/tests/test_utils.py:
##########
@@ -943,9 +951,10 @@ def test_assert_equal_approx_pandas_on_spark_df(self):
 
     @unittest.skipIf(not have_pandas or not have_pyarrow, "no pandas or 
pyarrow dependency")
     def test_assert_error_pandas_pyspark_df(self):
-        import pyspark.pandas as ps
         import pandas as pd
 

Review Comment:
   nit: remove empty line between imports?



##########
python/pyspark/sql/tests/test_utils.py:
##########
@@ -1015,6 +1024,196 @@ def test_assert_error_non_pyspark_df(self):
             },
         )
 
+    def test_assert_column_unique_single_column(self):

Review Comment:
   should we test for other df types (pd.DataFrame, ps.DataFrame, streaming) in 
your new APIs?



##########
python/pyspark/sql/tests/test_utils.py:
##########
@@ -1834,13 +2033,100 @@ def test_assert_schema_equal_with_decimal_types(self):
         with self.assertRaises(PySparkAssertionError):
             assertSchemaEqual(s1, s2)
 
+    def test_assert_referential_integrity_valid(self):
+        # Create a "customers" DataFrame with customer IDs
+        customers = self.spark.createDataFrame(
+            [(1, "Alice"), (2, "Bob"), (3, "Charlie")], ["id", "name"]
+        )
+
+        # Create an "orders" DataFrame with customer IDs as foreign keys
+        orders = self.spark.createDataFrame(
+            [(101, 1), (102, 2), (103, 3), (104, None)], ["order_id", 
"customer_id"]
+        )
+
+        # This should pass because all non-null customer_ids in orders exist 
in customers.id
+        from pyspark.testing.utils import assertReferentialIntegrity
+
+        assertReferentialIntegrity(orders, "customer_id", customers, "id")
+
+    def test_assert_referential_integrity_invalid(self):
+        # Create a "customers" DataFrame with customer IDs
+        customers = self.spark.createDataFrame(
+            [(1, "Alice"), (2, "Bob"), (3, "Charlie")], ["id", "name"]
+        )
+
+        # Create an orders DataFrame with an invalid customer ID
+        orders_invalid = self.spark.createDataFrame(
+            [(101, 1), (102, 2), (103, 4)], ["order_id", "customer_id"]
+        )
+
+        # This should fail because customer_id 4 doesn't exist in customers.id
+        from pyspark.testing.utils import assertReferentialIntegrity
+
+        with self.assertRaises(AssertionError) as cm:
+            assertReferentialIntegrity(orders_invalid, "customer_id", 
customers, "id")
+
+        # Check that the error message contains information about the missing 
values
+        error_message = str(cm.exception)
+        self.assertIn("customer_id", error_message)
+        self.assertIn("id", error_message)
+        self.assertIn("4", error_message)
+        self.assertIn("Missing values", error_message)
+
+        # Verify that the total count of missing values is correct
+        self.assertIn("Total missing values: 1", error_message)
+
+    def test_assert_referential_integrity_null_values(self):
+        # Create a "customers" DataFrame with customer IDs
+        customers = self.spark.createDataFrame(
+            [(1, "Alice"), (2, "Bob"), (3, "Charlie")], ["id", "name"]
+        )
+
+        # Create an orders DataFrame with null values in the foreign key
+        orders_with_nulls = self.spark.createDataFrame(
+            [(101, 1), (102, 2), (103, None), (104, None)], ["order_id", 
"customer_id"]
+        )
+
+        # This should pass because null values in the foreign key are ignored
+        from pyspark.testing.utils import assertReferentialIntegrity
+
+        assertReferentialIntegrity(orders_with_nulls, "customer_id", 
customers, "id")
+
+    def test_assert_referential_integrity_multiple_invalid(self):
+        # Create a "customers" DataFrame with customer IDs
+        customers = self.spark.createDataFrame(
+            [(1, "Alice"), (2, "Bob"), (3, "Charlie")], ["id", "name"]
+        )
+
+        # Create an orders DataFrame with multiple invalid customer IDs
+        orders_multiple_invalid = self.spark.createDataFrame(
+            [(101, 1), (102, 4), (103, 5), (104, 6)], ["order_id", 
"customer_id"]
+        )
+
+        # This should fail because multiple customer_ids don't exist in 
customers.id
+        from pyspark.testing.utils import assertReferentialIntegrity
+
+        with self.assertRaises(AssertionError) as cm:
+            assertReferentialIntegrity(orders_multiple_invalid, "customer_id", 
customers, "id")
+
+        # Check that the error message contains information about the missing 
values
+        error_message = str(cm.exception)
+        self.assertIn("customer_id", error_message)
+        self.assertIn("id", error_message)
+        # Check that at least one of the invalid values is mentioned
+        self.assertTrue(any(str(val) in error_message for val in [4, 5, 6]))
+
+        # Verify that the total count of missing values is correct (should be 
3: values 4, 5, and 6)
+        self.assertIn("Total missing values: 3", error_message)
+
 
 class UtilsTests(ReusedSQLTestCase, UtilsTestsMixin):
     pass
 
 
 if __name__ == "__main__":
     import unittest
+

Review Comment:
   nit: remove



##########
python/pyspark/testing/utils.py:
##########
@@ -1060,18 +1092,1108 @@ def assert_rows_equal(
         actual_list = sorted(actual_list, key=lambda x: str(x))
         expected_list = sorted(expected_list, key=lambda x: str(x))
 
-    assert_rows_equal(actual_list, expected_list, maxErrors=maxErrors, 
showOnlyDiff=showOnlyDiff)
+    assert_rows_equal(
+        actual_list,
+        expected_list,
+        maxErrors=maxErrors,
+        showOnlyDiff=showOnlyDiff,
+    )
+
+
+def assertColumnUnique(
+    df: Union[DataFrame, "pandas.DataFrame", "pyspark.pandas.DataFrame"],
+    columns: Union[str, List[str]],
+    message: Optional[str] = None,
+) -> None:
+    """Assert that the specified column(s) in a DataFrame contain unique 
values.
+
+    This function checks if the values in the specified column(s) are unique. 
If not,
+    it raises an AssertionError with details about the duplicate values.
+
+    Supports Spark, Spark Connect, pandas, and pandas-on-Spark DataFrames.
+
+    .. versionadded:: 4.0.0
+
+    Parameters
+    ----------
+    df : DataFrame (Spark, Spark Connect, pandas, or pandas-on-Spark)
+        The DataFrame to check for uniqueness.
+    columns : str or list of str
+        The column name(s) to check for uniqueness. Can be a single column 
name or a list
+        of column names. If a list is provided, the combination of values 
across these
+        columns is checked for uniqueness.
+    message : str, optional
+        Custom error message to include if the assertion fails.
+
+    Raises
+    ------
+    AssertionError
+        If the specified column(s) contain duplicate values.
+    PySparkAssertionError
+        If the input DataFrame is not of a supported type or is a streaming 
DataFrame.
+    ValueError
+        If the columns parameter is invalid or if specified columns don't 
exist in the
+        DataFrame.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([(1, "a"), (2, "b"), (3, "c")], ["id", 
"value"])
+    >>> assertColumnUnique(df, "id")  # passes, 'id' column has unique values
+
+    >>> df = spark.createDataFrame([(1, "a"), (1, "b"), (3, "c")], ["id", 
"value"])
+    >>> assertColumnUnique(df, "id")  # doctest: +IGNORE_EXCEPTION_DETAIL
+    Traceback (most recent call last):
+    ...
+    AssertionError: Column 'id' contains duplicate values.
+
+    Check multiple columns for uniqueness (composite key):
+
+    >>> df = spark.createDataFrame([(1, "a"), (1, "b"), (2, "a")], ["id", 
"value"])
+    >>> assertColumnUnique(df, ["id", "value"])  # passes, combinations are 
unique
+
+    >>> df = spark.createDataFrame([(1, "a"), (1, "a"), (2, "b")], ["id", 
"value"])
+    >>> assertColumnUnique(df, ["id", "value"])  # doctest: 
+IGNORE_EXCEPTION_DETAIL
+    Traceback (most recent call last):
+    ...
+    AssertionError: Columns ['id', 'value'] contains duplicate values.
+
+    """
+    if df is None:
+        raise PySparkAssertionError(
+            errorClass="INVALID_TYPE_DF_EQUALITY_ARG",
+            messageParameters={
+                "expected_type": "Union[DataFrame, pandas.DataFrame, 
pyspark.pandas.DataFrame]",
+                "arg_name": "df",
+                "actual_type": None,
+            },
+        )
+
+    # Validate columns parameter
+    if not columns:
+        raise ValueError("The 'columns' parameter cannot be empty.")
+
+    if isinstance(columns, str):
+        columns = [columns]
+    elif not isinstance(columns, list):
+        raise ValueError("The 'columns' parameter must be a string or a list 
of strings.")
+
+    has_pandas = False
+    try:
+        # If pandas dependencies are available, allow pandas or 
pandas-on-Spark DataFrame
+        import pandas as pd
+
+        has_pandas = True
+    except ImportError:
+        # no pandas, so we won't call pandasutils functions
+        pass
+
+    # Check if pyarrow is available
+    has_arrow = False
+    try:
+        import importlib.util
+
+        has_arrow = importlib.util.find_spec("pyarrow") is not None
+    except ImportError:
+        pass
+
+    # Handle pandas and pandas-on-Spark DataFrames
+    if has_pandas and has_arrow:
+        import pyspark.pandas as ps
+
+        if isinstance(df, pd.DataFrame):
+            # Check if all columns exist in the DataFrame
+            missing_columns = [col for col in columns if col not in df.columns]
+            if missing_columns:
+                raise ValueError(
+                    f"The following columns do not exist in the DataFrame: 
{missing_columns}"
+                )
+
+            # Check for duplicates using pandas methods
+            if len(columns) == 1:
+                # For a single column, use duplicated() method
+                duplicates = df[df[columns[0]].duplicated(keep=False)]
+                if not duplicates.empty:
+                    # Get examples of duplicates
+                    duplicate_examples = duplicates.head(5)
+                    examples_str = "\n".join([str(row) for _, row in 
duplicate_examples.iterrows()])
+
+                    # Create error message
+                    error_msg = f"Column '{columns[0]}' contains duplicate 
values.\n"
+                    error_msg += f"Examples of duplicates:\n{examples_str}"
+
+                    if message:
+                        error_msg += f"\n{message}"
+

Review Comment:
   same comment for the other new testing APIs



##########
python/pyspark/testing/utils.py:
##########
@@ -1060,18 +1092,1108 @@ def assert_rows_equal(
         actual_list = sorted(actual_list, key=lambda x: str(x))
         expected_list = sorted(expected_list, key=lambda x: str(x))
 
-    assert_rows_equal(actual_list, expected_list, maxErrors=maxErrors, 
showOnlyDiff=showOnlyDiff)
+    assert_rows_equal(
+        actual_list,
+        expected_list,
+        maxErrors=maxErrors,
+        showOnlyDiff=showOnlyDiff,
+    )
+
+
+def assertColumnUnique(
+    df: Union[DataFrame, "pandas.DataFrame", "pyspark.pandas.DataFrame"],
+    columns: Union[str, List[str]],
+    message: Optional[str] = None,
+) -> None:
+    """Assert that the specified column(s) in a DataFrame contain unique 
values.
+
+    This function checks if the values in the specified column(s) are unique. 
If not,
+    it raises an AssertionError with details about the duplicate values.
+
+    Supports Spark, Spark Connect, pandas, and pandas-on-Spark DataFrames.
+
+    .. versionadded:: 4.0.0
+
+    Parameters
+    ----------
+    df : DataFrame (Spark, Spark Connect, pandas, or pandas-on-Spark)
+        The DataFrame to check for uniqueness.
+    columns : str or list of str
+        The column name(s) to check for uniqueness. Can be a single column 
name or a list
+        of column names. If a list is provided, the combination of values 
across these
+        columns is checked for uniqueness.
+    message : str, optional
+        Custom error message to include if the assertion fails.
+
+    Raises
+    ------
+    AssertionError
+        If the specified column(s) contain duplicate values.
+    PySparkAssertionError
+        If the input DataFrame is not of a supported type or is a streaming 
DataFrame.
+    ValueError
+        If the columns parameter is invalid or if specified columns don't 
exist in the
+        DataFrame.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([(1, "a"), (2, "b"), (3, "c")], ["id", 
"value"])
+    >>> assertColumnUnique(df, "id")  # passes, 'id' column has unique values
+
+    >>> df = spark.createDataFrame([(1, "a"), (1, "b"), (3, "c")], ["id", 
"value"])
+    >>> assertColumnUnique(df, "id")  # doctest: +IGNORE_EXCEPTION_DETAIL
+    Traceback (most recent call last):
+    ...
+    AssertionError: Column 'id' contains duplicate values.
+
+    Check multiple columns for uniqueness (composite key):
+
+    >>> df = spark.createDataFrame([(1, "a"), (1, "b"), (2, "a")], ["id", 
"value"])
+    >>> assertColumnUnique(df, ["id", "value"])  # passes, combinations are 
unique
+
+    >>> df = spark.createDataFrame([(1, "a"), (1, "a"), (2, "b")], ["id", 
"value"])
+    >>> assertColumnUnique(df, ["id", "value"])  # doctest: 
+IGNORE_EXCEPTION_DETAIL
+    Traceback (most recent call last):
+    ...
+    AssertionError: Columns ['id', 'value'] contains duplicate values.
+
+    """
+    if df is None:
+        raise PySparkAssertionError(
+            errorClass="INVALID_TYPE_DF_EQUALITY_ARG",
+            messageParameters={
+                "expected_type": "Union[DataFrame, pandas.DataFrame, 
pyspark.pandas.DataFrame]",
+                "arg_name": "df",
+                "actual_type": None,
+            },
+        )
+
+    # Validate columns parameter
+    if not columns:
+        raise ValueError("The 'columns' parameter cannot be empty.")
+
+    if isinstance(columns, str):
+        columns = [columns]
+    elif not isinstance(columns, list):
+        raise ValueError("The 'columns' parameter must be a string or a list 
of strings.")
+
+    has_pandas = False
+    try:
+        # If pandas dependencies are available, allow pandas or 
pandas-on-Spark DataFrame
+        import pandas as pd
+
+        has_pandas = True
+    except ImportError:
+        # no pandas, so we won't call pandasutils functions
+        pass
+
+    # Check if pyarrow is available
+    has_arrow = False
+    try:
+        import importlib.util
+
+        has_arrow = importlib.util.find_spec("pyarrow") is not None
+    except ImportError:
+        pass
+
+    # Handle pandas and pandas-on-Spark DataFrames
+    if has_pandas and has_arrow:
+        import pyspark.pandas as ps
+
+        if isinstance(df, pd.DataFrame):
+            # Check if all columns exist in the DataFrame
+            missing_columns = [col for col in columns if col not in df.columns]
+            if missing_columns:
+                raise ValueError(
+                    f"The following columns do not exist in the DataFrame: 
{missing_columns}"
+                )
+
+            # Check for duplicates using pandas methods
+            if len(columns) == 1:
+                # For a single column, use duplicated() method
+                duplicates = df[df[columns[0]].duplicated(keep=False)]
+                if not duplicates.empty:
+                    # Get examples of duplicates
+                    duplicate_examples = duplicates.head(5)
+                    examples_str = "\n".join([str(row) for _, row in 
duplicate_examples.iterrows()])
+
+                    # Create error message
+                    error_msg = f"Column '{columns[0]}' contains duplicate 
values.\n"
+                    error_msg += f"Examples of duplicates:\n{examples_str}"
+
+                    if message:
+                        error_msg += f"\n{message}"
+
+                    raise AssertionError(error_msg)
+            else:
+                # For multiple columns, use duplicated() with subset parameter
+                duplicates = df[df.duplicated(subset=columns, keep=False)]
+                if not duplicates.empty:
+                    # Get examples of duplicates
+                    duplicate_examples = duplicates.head(5)
+                    examples_str = "\n".join([str(row) for _, row in 
duplicate_examples.iterrows()])
+
+                    # Create error message
+                    error_msg = f"Columns {columns} contain duplicate 
values.\n"
+                    error_msg += f"Examples of duplicates:\n{examples_str}"
+
+                    if message:
+                        error_msg += f"\n{message}"
+
+                    raise AssertionError(error_msg)
+
+            # If we get here, no duplicates were found
+            return
+
+        elif isinstance(df, ps.DataFrame):
+            # Check if all columns exist in the DataFrame
+            missing_columns = [col for col in columns if col not in df.columns]
+            if missing_columns:
+                raise ValueError(
+                    f"The following columns do not exist in the DataFrame: 
{missing_columns}"
+                )
+
+            # Check for duplicates using pandas-on-Spark methods
+            if len(columns) == 1:
+                # For a single column, use duplicated() method
+                duplicates = df[df[columns[0]].duplicated(keep=False)]
+                if len(duplicates) > 0:
+                    # Get examples of duplicates
+                    duplicate_examples = duplicates.head(5)
+                    examples_str = "\n".join([str(row) for _, row in 
duplicate_examples.iterrows()])
+
+                    # Create error message
+                    error_msg = f"Column '{columns[0]}' contains duplicate 
values.\n"
+                    error_msg += f"Examples of duplicates:\n{examples_str}"
+
+                    if message:
+                        error_msg += f"\n{message}"
+
+                    raise AssertionError(error_msg)
+            else:
+                # For multiple columns, use duplicated() with subset parameter
+                duplicates = df[df.duplicated(subset=columns, keep=False)]
+                if len(duplicates) > 0:
+                    # Get examples of duplicates
+                    duplicate_examples = duplicates.head(5)
+                    examples_str = "\n".join([str(row) for _, row in 
duplicate_examples.iterrows()])
+
+                    # Create error message
+                    error_msg = f"Columns {columns} contain duplicate 
values.\n"
+                    error_msg += f"Examples of duplicates:\n{examples_str}"
+
+                    if message:
+                        error_msg += f"\n{message}"
+
+                    raise AssertionError(error_msg)
+
+            # If we get here, no duplicates were found
+            return
+
+    # If we get here, we're dealing with a Spark DataFrame or pandas 
dependencies are not available
+    if not isinstance(df, DataFrame):
+        raise PySparkAssertionError(
+            errorClass="INVALID_TYPE_DF_EQUALITY_ARG",
+            messageParameters={
+                "expected_type": "Union[DataFrame, pandas.DataFrame, 
pyspark.pandas.DataFrame]",
+                "arg_name": "df",
+                "actual_type": type(df),
+            },
+        )
+
+    # Check if it's a streaming DataFrame
+    if df.isStreaming:
+        raise PySparkAssertionError(
+            errorClass="UNSUPPORTED_OPERATION",
+            messageParameters={"operation": "assertColumnUnique on streaming 
DataFrame"},
+        )
+
+    # Validate that all columns exist in the DataFrame
+    df_columns = set(df.columns)
+    missing_columns = [col for col in columns if col not in df_columns]
+    if missing_columns:
+        raise ValueError(f"The following columns do not exist in the 
DataFrame: {missing_columns}")
+
+    # Count occurrences of each value combination in the specified columns
+    counts = df.groupBy(*columns).count()
+
+    # Find rows with count > 1 (duplicates)
+    duplicates = counts.filter("count > 1")
+
+    if duplicates.count() > 0:
+        # Get the first few duplicate values to include in the error message
+        duplicate_examples = duplicates.limit(5).collect()
+
+        # Format duplicate examples for error message
+        examples_str = "\n".join([str(row) for row in duplicate_examples])
+
+        # Create error message
+        column_desc = f"Column '{columns[0]}'" if len(columns) == 1 else 
f"Columns {columns}"
+
+        error_msg = f"{column_desc} contains duplicate values.\n"
+        error_msg += f"Examples of duplicates:\n{examples_str}"
+
+        if message:
+            error_msg += f"\n{message}"
+
+        raise AssertionError(error_msg)
+
+
+def assertColumnNonNull(
+    df: Union[DataFrame, "pandas.DataFrame", "pyspark.pandas.DataFrame"],
+    columns: Union[str, List[str]],
+    message: Optional[str] = None,
+) -> None:
+    """
+    Assert that the specified column(s) in a DataFrame do not contain any null 
values.
+
+    Supports Spark, Spark Connect, pandas, and pandas-on-Spark DataFrames.
+
+    Parameters
+    ----------
+    df : DataFrame (Spark, Spark Connect, pandas, or pandas-on-Spark)
+        The DataFrame to check.
+    columns : str or list
+        The column name(s) to check for null values.
+    message : str, optional
+        An optional message to include in the exception if the assertion fails.
+
+    Raises
+    ------
+    AssertionError
+        If any of the specified columns contain null values.
+    PySparkAssertionError
+        If the input DataFrame is not of a supported type or is a streaming 
DataFrame.
+    ValueError
+        If the columns parameter is invalid or if specified columns don't 
exist in the DataFrame.
+
+    Examples
+    --------
+    >>> from pyspark.sql import Row
+    >>> df = spark.createDataFrame([(1, "a"), (2, None), (3, "c")], ["id", 
"value"])
+    >>> assertColumnNonNull(df, "id")  # This will pass
+    >>> assertColumnNonNull(df, "value")  # doctest: +IGNORE_EXCEPTION_DETAIL
+    Traceback (most recent call last):
+    ...
+    AssertionError: Column 'value' contains null values.
+    >>> assertColumnNonNull(df, ["id", "value"])  # doctest: 
+IGNORE_EXCEPTION_DETAIL
+    Traceback (most recent call last):
+    ...
+    AssertionError: Columns ['id', 'value'] contain null values.
+
+    """
+    if df is None:
+        raise PySparkAssertionError(
+            errorClass="INVALID_TYPE_DF_EQUALITY_ARG",
+            messageParameters={
+                "expected_type": "Union[DataFrame, pandas.DataFrame, 
pyspark.pandas.DataFrame]",
+                "arg_name": "df",
+                "actual_type": None,
+            },
+        )
+
+    # Validate columns parameter
+    if not columns:
+        raise ValueError("The 'columns' parameter cannot be empty.")
+
+    if isinstance(columns, str):
+        columns = [columns]
+    elif not isinstance(columns, list):
+        raise ValueError("The 'columns' parameter must be a string or a list 
of strings.")
+
+    has_pandas = False
+    try:
+        # If pandas dependencies are available, allow pandas or 
pandas-on-Spark DataFrame
+        import pandas as pd
+
+        has_pandas = True
+    except ImportError:
+        # no pandas, so we won't call pandasutils functions
+        pass
+
+    # Check if pyarrow is available
+    has_arrow = False
+    try:
+        import importlib.util
+
+        has_arrow = importlib.util.find_spec("pyarrow") is not None
+    except ImportError:
+        pass
+
+    # Handle pandas and pandas-on-Spark DataFrames
+    if has_pandas and has_arrow:
+        import pyspark.pandas as ps
+
+        if isinstance(df, pd.DataFrame):
+            # Check if all columns exist in the DataFrame
+            missing_columns = [col for col in columns if col not in df.columns]
+            if missing_columns:
+                raise ValueError(
+                    f"The following columns do not exist in the DataFrame: 
{missing_columns}"
+                )
+
+            # Check for null values using pandas methods
+            null_counts = {}
+            for column in columns:
+                # Count null values in the column
+                null_count = df[column].isna().sum()
+                if null_count > 0:
+                    null_counts[column] = null_count
+
+            if null_counts:
+                # Create error message
+                column_desc = (
+                    f"Column '{columns[0]}'" if len(columns) == 1 else 
f"Columns {columns}"
+                )
+
+                plural = "s" if len(columns) == 1 else ""
+                error_msg = f"{column_desc} contain{plural} null values.\n"
+                error_msg += "Null counts by column:\n"
+                for col_name, count in null_counts.items():
+                    error_msg += f"- {col_name}: {count} null value{'s' if 
count != 1 else ''}\n"
+
+                if message:
+                    error_msg += f"\n{message}"
+
+                raise AssertionError(error_msg)
+
+            # If we get here, no null values were found
+            return
+
+        elif isinstance(df, ps.DataFrame):
+            # Check if all columns exist in the DataFrame
+            missing_columns = [col for col in columns if col not in df.columns]
+            if missing_columns:
+                raise ValueError(
+                    f"The following columns do not exist in the DataFrame: 
{missing_columns}"
+                )
+
+            # Check for null values using pandas-on-Spark methods
+            null_counts = {}
+            for column in columns:
+                # Count null values in the column
+                null_count = df[column].isna().sum()
+                if null_count > 0:
+                    null_counts[column] = null_count
+
+            if null_counts:
+                # Create error message
+                column_desc = (
+                    f"Column '{columns[0]}'" if len(columns) == 1 else 
f"Columns {columns}"
+                )
+
+                plural = "s" if len(columns) == 1 else ""
+                error_msg = f"{column_desc} contain{plural} null values.\n"
+                error_msg += "Null counts by column:\n"
+                for col_name, count in null_counts.items():
+                    error_msg += f"- {col_name}: {count} null value{'s' if 
count != 1 else ''}\n"
+
+                if message:
+                    error_msg += f"\n{message}"
+
+                raise AssertionError(error_msg)
+
+            # If we get here, no null values were found
+            return
+
+    # If we get here, we're dealing with a Spark DataFrame or pandas 
dependencies are not available
+    if not isinstance(df, DataFrame):
+        raise PySparkAssertionError(
+            errorClass="INVALID_TYPE_DF_EQUALITY_ARG",
+            messageParameters={
+                "expected_type": "Union[DataFrame, pandas.DataFrame, 
pyspark.pandas.DataFrame]",
+                "arg_name": "df",
+                "actual_type": type(df),
+            },
+        )
+
+    # Check if it's a streaming DataFrame
+    if df.isStreaming:
+        raise PySparkAssertionError(
+            errorClass="UNSUPPORTED_OPERATION",
+            messageParameters={"operation": "assertColumnNonNull on streaming 
DataFrame"},
+        )
+
+    # Validate that all columns exist in the DataFrame
+    df_columns = set(df.columns)
+    missing_columns = [col for col in columns if col not in df_columns]
+    if missing_columns:
+        raise ValueError(f"The following columns do not exist in the 
DataFrame: {missing_columns}")
+
+    # Check each column for null values
+    null_counts = {}
+    for column in columns:
+        # Count null values in the column
+        null_count = df.filter(df[column].isNull()).count()
+        if null_count > 0:
+            null_counts[column] = null_count
+
+    if null_counts:
+        # Create error message
+        column_desc = f"Column '{columns[0]}'" if len(columns) == 1 else 
f"Columns {columns}"
+
+        error_msg = f"{column_desc} contain{'s' if len(columns) == 1 else ''} 
null values.\n"
+        error_msg += "Null counts by column:\n"
+        for col_name, count in null_counts.items():
+            error_msg += f"- {col_name}: {count} null value{'s' if count != 1 
else ''}\n"
+
+        if message:
+            error_msg += f"\n{message}"
+
+        raise AssertionError(error_msg)
+
+
+def assertColumnValuesInSet(
+    df: Union[DataFrame, "pandas.DataFrame", "pyspark.pandas.DataFrame"],
+    columns: Union[str, List[str]],
+    accepted_values: Union[Set[Any], List[Any], Dict[str, Set[Any]]],
+    message: Optional[str] = None,
+) -> None:
+    """
+    Assert that all values in the specified column(s) of a DataFrame are 
within a given set of
+    accepted values.
+
+    Supports Spark, Spark Connect, pandas, and pandas-on-Spark DataFrames.
+
+    Parameters
+    ----------
+    df : DataFrame (Spark, Spark Connect, pandas, or pandas-on-Spark)
+        The DataFrame to check.
+    columns : str or list
+        The column name(s) to check for values.
+    accepted_values : set or list or tuple or dict
+        The set of accepted values for the column(s). If columns is a list and 
accepted_values
+        is a dict, the keys in the dict should correspond to the column names, 
and the values
+        should be the sets of accepted values for each column. If columns is a 
list and
+        accepted_values is not a dict, the same set of accepted values will be 
used for all
+        columns.
+    message : str, optional
+        An optional message to include in the exception if the assertion fails.
+
+    Raises
+    ------
+    AssertionError
+        If any values in the specified column(s) are not in the set of 
accepted values.
+    PySparkAssertionError
+        If the input DataFrame is not of a supported type or is a streaming 
DataFrame.
+    ValueError
+        If the columns parameter is invalid, if specified columns don't exist 
in the
+        DataFrame, or if the accepted_values parameter is invalid.
+
+    Examples
+    --------
+    >>> from pyspark.sql import Row
+    >>> df = spark.createDataFrame([(1, "A"), (2, "B"), (3, "C")], ["id", 
"category"])
+    >>> assertColumnValuesInSet(df, "category", {"A", "B", "C"})  # This will 
pass
+    >>> df_with_invalid = spark.createDataFrame([(1, "A"), (2, "B"), (3, 
"X")], ["id", "category"])
+    >>> # This will fail because 'X' is not in the accepted values
+    >>> # Test with doctest ignore exception detail
+    >>> # Test with invalid value
+    >>> assertColumnValuesInSet(df_with_invalid, "category", {"A", "B", "C"})
+    ... # doctest: +IGNORE_EXCEPTION_DETAIL
+    Traceback (most recent call last):
+    ...
+    AssertionError: Column 'category' contains values not in the accepted set.
+
+    Column 'category':
+      Accepted values: {'A', 'B', 'C'}
+      Invalid values found: ['X']
+      Total invalid values: 1
+
+    >>> # Multiple columns with the same accepted values
+    >>> df = spark.createDataFrame([("A", "B"), ("B", "A"), ("C", "C")], 
["col1", "col2"])
+    >>> assertColumnValuesInSet(df, ["col1", "col2"], {"A", "B", "C"})  # This 
will pass
+
+    >>> # Multiple columns with different accepted values
+    >>> df = spark.createDataFrame([(1, "A"), (2, "B"), (3, "C")], ["id", 
"category"])
+    >>> # Test with multiple columns and different accepted values
+    >>> values = {"id": {1, 2, 3}, "category": {"A", "B", "C"}}
+    >>> assertColumnValuesInSet(df, ["id", "category"], values)  # This will 
pass
+
+    """
+    if df is None:
+        raise PySparkAssertionError(
+            errorClass="INVALID_TYPE_DF_EQUALITY_ARG",
+            messageParameters={
+                "expected_type": "Union[DataFrame, pandas.DataFrame, 
pyspark.pandas.DataFrame]",
+                "arg_name": "df",
+                "actual_type": None,
+            },
+        )
+
+    # Validate columns parameter
+    if not columns:
+        raise ValueError("The 'columns' parameter cannot be empty.")
+
+    if isinstance(columns, str):
+        columns = [columns]
+    elif not isinstance(columns, list):
+        raise ValueError("The 'columns' parameter must be a string or a list 
of strings.")
+
+    # Validate accepted_values parameter
+    if accepted_values is None:
+        raise ValueError("The 'accepted_values' parameter cannot be None.")
+
+    # Handle different types of accepted_values
+    if isinstance(accepted_values, dict):
+        # Make sure all columns are in the accepted_values dict
+        missing_columns = set(columns) - set(accepted_values.keys())
+        if missing_columns:
+            raise ValueError(
+                f"The following columns are missing from accepted_values: 
{missing_columns}"
+            )
+    else:
+        # Convert accepted_values to a set if it's not already
+        if not isinstance(accepted_values, set):
+            try:
+                accepted_values = set(accepted_values)
+            except TypeError:
+                raise ValueError(
+                    "The 'accepted_values' parameter must be a set, list, 
tuple, or dict."
+                )
+        # Create a dict with the same accepted values for all columns
+        accepted_values = {column: accepted_values for column in columns}
+
+    has_pandas = False
+    try:
+        # If pandas dependencies are available, allow pandas or 
pandas-on-Spark DataFrame
+        import pandas as pd
+
+        has_pandas = True
+    except ImportError:
+        # no pandas, so we won't call pandasutils functions
+        pass
+
+    # Check if pyarrow is available
+    has_arrow = False
+    try:
+        import importlib.util
+
+        has_arrow = importlib.util.find_spec("pyarrow") is not None
+    except ImportError:
+        pass
+
+    # Handle pandas and pandas-on-Spark DataFrames
+    if has_pandas and has_arrow:
+        import pyspark.pandas as ps
+
+        if isinstance(df, pd.DataFrame):
+            # Check if all columns exist in the DataFrame
+            missing_columns = [col for col in columns if col not in df.columns]
+            if missing_columns:
+                raise ValueError(
+                    f"The following columns do not exist in the DataFrame: 
{missing_columns}"
+                )
+
+            # Check each column for invalid values
+            invalid_columns = {}
+
+            for column in columns:
+                # Get the set of accepted values for this column
+                column_accepted_values = accepted_values[column]
+
+                # Find values that are not in the accepted set and not null
+                invalid_mask = ~df[column].isin(list(column_accepted_values)) 
& ~df[column].isna()
+                invalid_values_df = df[invalid_mask]
+
+                # Count invalid values
+                invalid_count = len(invalid_values_df)
+
+                if invalid_count > 0:
+                    # Get examples of invalid values (limit to 10 for 
readability)
+                    invalid_examples = 
invalid_values_df[column].drop_duplicates().head(10).tolist()
+
+                    invalid_columns[column] = {
+                        "count": invalid_count,
+                        "examples": invalid_examples,
+                        "accepted": column_accepted_values,
+                    }
+
+            if invalid_columns:
+                # Create error message
+                column_desc = (
+                    f"Column '{columns[0]}'" if len(columns) == 1 else 
f"Columns {columns}"
+                )
+                plural = "s" if len(columns) == 1 else ""
+                error_msg = f"{column_desc} contain{plural} " f"values not in 
the accepted set.\n"
+
+                for column, details in invalid_columns.items():
+                    error_msg += f"\nColumn '{column}':\n"
+                    error_msg += f"  Accepted values: {details['accepted']}\n"
+                    error_msg += f"  Invalid values found: 
{details['examples']}\n"
+                    error_msg += f"  Total invalid values: 
{details['count']}\n"
+
+                if message:
+                    error_msg += f"\n{message}"
+
+                raise AssertionError(error_msg)
+
+            # If we get here, all values are in the accepted sets
+            return
+
+        elif isinstance(df, ps.DataFrame):
+            # Check if all columns exist in the DataFrame
+            missing_columns = [col for col in columns if col not in df.columns]
+            if missing_columns:
+                raise ValueError(
+                    f"The following columns do not exist in the DataFrame: 
{missing_columns}"
+                )
+
+            # Check each column for invalid values
+            invalid_columns = {}
+
+            for column in columns:
+                # Get the set of accepted values for this column
+                column_accepted_values = accepted_values[column]
+
+                # Find values that are not in the accepted set and not null
+                invalid_mask = ~df[column].isin(list(column_accepted_values)) 
& ~df[column].isna()
+                invalid_values_df = df[invalid_mask]
+
+                # Count invalid values
+                invalid_count = len(invalid_values_df)
+
+                if invalid_count > 0:
+                    # Get examples of invalid values (limit to 10 for 
readability)
+                    invalid_examples = 
invalid_values_df[column].drop_duplicates().head(10).tolist()
+
+                    invalid_columns[column] = {
+                        "count": invalid_count,
+                        "examples": invalid_examples,
+                        "accepted": column_accepted_values,
+                    }
+
+            if invalid_columns:
+                # Create error message
+                column_desc = (
+                    f"Column '{columns[0]}'" if len(columns) == 1 else 
f"Columns {columns}"
+                )
+                plural = "s" if len(columns) == 1 else ""
+                error_msg = f"{column_desc} contain{plural} " f"values not in 
the accepted set.\n"
+
+                for column, details in invalid_columns.items():
+                    error_msg += f"\nColumn '{column}':\n"
+                    error_msg += f"  Accepted values: {details['accepted']}\n"
+                    error_msg += f"  Invalid values found: 
{details['examples']}\n"
+                    error_msg += f"  Total invalid values: 
{details['count']}\n"
+
+                if message:
+                    error_msg += f"\n{message}"
+
+                raise AssertionError(error_msg)
+
+            # If we get here, all values are in the accepted sets
+            return
+
+    # If we get here, we're dealing with a Spark DataFrame or pandas 
dependencies are not available
+    if not isinstance(df, DataFrame):
+        raise PySparkAssertionError(
+            errorClass="INVALID_TYPE_DF_EQUALITY_ARG",
+            messageParameters={
+                "expected_type": "Union[DataFrame, pandas.DataFrame, 
pyspark.pandas.DataFrame]",
+                "arg_name": "df",
+                "actual_type": type(df),
+            },
+        )
+
+    # Check if it's a streaming DataFrame
+    if df.isStreaming:
+        raise PySparkAssertionError(
+            errorClass="UNSUPPORTED_OPERATION",
+            messageParameters={"operation": "assertColumnValuesInSet on 
streaming DataFrame"},
+        )
+
+    # Validate that all columns exist in the DataFrame
+    df_columns = set(df.columns)
+    missing_columns = [col for col in columns if col not in df_columns]
+    if missing_columns:
+        raise ValueError(f"The following columns do not exist in the 
DataFrame: {missing_columns}")
+
+    # Check each column for invalid values
+    invalid_columns = {}
+
+    for column in columns:
+        # Get the set of accepted values for this column
+        column_accepted_values = accepted_values[column]
+
+        # Find values that are not in the accepted set
+        invalid_values_df = df.filter(
+            ~df[column].isin(list(column_accepted_values)) & 
df[column].isNotNull()
+        )
+
+        # Count invalid values
+        invalid_count = invalid_values_df.count()
+
+        if invalid_count > 0:
+            # Get examples of invalid values (limit to 10 for readability)
+            invalid_examples = 
invalid_values_df.select(column).distinct().limit(10).collect()
+            invalid_values = [row[column] for row in invalid_examples]
+
+            invalid_columns[column] = {
+                "count": invalid_count,
+                "examples": invalid_values,
+                "accepted": column_accepted_values,
+            }
+
+    if invalid_columns:
+        # Create error message
+        column_desc = f"Column '{columns[0]}'" if len(columns) == 1 else 
f"Columns {columns}"
+        plural = "s" if len(columns) == 1 else ""
+        error_msg = f"{column_desc} contain{plural} values not in the accepted 
set.\n"
+
+        for column, details in invalid_columns.items():
+            error_msg += f"\nColumn '{column}':\n"
+            error_msg += f"  Accepted values: {details['accepted']}\n"
+            error_msg += f"  Invalid values found: {details['examples']}\n"
+            error_msg += f"  Total invalid values: {details['count']}\n"
+
+        if message:
+            error_msg += f"\n{message}"
+
+        raise AssertionError(error_msg)
+
+
+def assertReferentialIntegrity(
+    source_df: Union[DataFrame, "pandas.DataFrame", 
"pyspark.pandas.DataFrame"],
+    source_column: str,
+    target_df: Union[DataFrame, "pandas.DataFrame", 
"pyspark.pandas.DataFrame"],
+    target_column: str,
+    message: Optional[str] = None,
+) -> None:
+    """
+    Assert that all non-null values in a column of one DataFrame exist in a 
column of
+    another DataFrame.
+
+    This function checks referential integrity between two DataFrames, similar 
to a foreign
+    key constraint in a relational database. It verifies that all non-null 
values in the
+    source column exist in the target column.
+
+    Supports Spark, Spark Connect, pandas, and pandas-on-Spark DataFrames.
+
+    Parameters
+    ----------
+    source_df : DataFrame (Spark, Spark Connect, pandas, or pandas-on-Spark)
+        The DataFrame containing the foreign key column to check.
+    source_column : str
+        The name of the column in source_df to check (foreign key).
+    target_df : DataFrame (Spark, Spark Connect, pandas, or pandas-on-Spark)
+        The DataFrame containing the primary key column to check against.
+    target_column : str
+        The name of the column in target_df to check against (primary key).
+    message : str, optional
+        An optional message to include in the exception if the assertion fails.
+
+    Raises
+    ------
+    AssertionError
+        If any non-null values in the source column do not exist in the target 
column.
+    PySparkAssertionError
+        If either input DataFrame is not of a supported type or is a streaming 
DataFrame.
+    ValueError
+        If the column parameters are invalid or if specified columns don't 
exist in the
+        DataFrames.
+
+    Examples
+    --------
+    >>> # Create a "customers" DataFrame with customer IDs
+    >>> # Create customer data
+    >>> # Define customer data
+    >>> customers = spark.createDataFrame([(1, "Alice"), (2, "Bob"), (3, 
"Charlie")],
+    ...                                  ["id", "name"])
+    >>>
+    >>> # Create an "orders" DataFrame with customer IDs as foreign keys
+    >>> # Create order data
+    >>> orders = spark.createDataFrame([(101, 1), (102, 2), (103, 3), (104, 
None)],
+    ...                               ["order_id", "customer_id"])
+    >>>
+    >>> # This will pass because all non-null customer_ids in orders exist in 
customers.id
+    >>> assertReferentialIntegrity(orders, "customer_id", customers, "id")
+    >>>
+    >>> # Create an orders DataFrame with an invalid customer ID
+    >>> # Create invalid order data
+    >>> orders_invalid = spark.createDataFrame([(101, 1), (102, 2), (103, 4)],
+    ...                                      ["order_id", "customer_id"])
+    >>>
+    >>> # This will fail because customer_id 4 doesn't exist in customers.id
+    >>> # Test with invalid data
+    >>> assertReferentialIntegrity(orders_invalid, "customer_id", customers, 
"id")
+    ... # doctest: +IGNORE_EXCEPTION_DETAIL
+    Traceback (most recent call last):
+    ...
+    AssertionError: Column 'customer_id' contains values not found in target 
column 'id'.
+    Missing values: [4]
+    Total missing values: 1
+    """
+    # Validate source_df
+    if source_df is None:
+        raise PySparkAssertionError(
+            errorClass="INVALID_TYPE_DF_EQUALITY_ARG",
+            messageParameters={
+                "expected_type": ("Union[DataFrame, pandas.DataFrame, 
pyspark.pandas.DataFrame]"),
+                "arg_name": "source_df",
+                "actual_type": None,
+            },
+        )
+
+    # Validate target_df
+    if target_df is None:
+        raise PySparkAssertionError(
+            errorClass="INVALID_TYPE_DF_EQUALITY_ARG",
+            messageParameters={
+                "expected_type": ("Union[DataFrame, pandas.DataFrame, 
pyspark.pandas.DataFrame]"),
+                "arg_name": "target_df",
+                "actual_type": None,
+            },
+        )
+
+    # Validate source_column
+    if not source_column or not isinstance(source_column, str):
+        raise ValueError("The 'source_column' parameter must be a non-empty 
string.")
+
+    # Validate target_column
+    if not target_column or not isinstance(target_column, str):
+        raise ValueError("The 'target_column' parameter must be a non-empty 
string.")
+
+    has_pandas = False
+    try:
+        # If pandas dependencies are available, allow pandas or 
pandas-on-Spark DataFrame
+        import pandas as pd
+
+        has_pandas = True
+    except ImportError:
+        # no pandas, so we won't call pandasutils functions
+        pass
+
+    # Check if pyarrow is available
+    has_arrow = False
+    try:
+        import importlib.util
+
+        has_arrow = importlib.util.find_spec("pyarrow") is not None
+    except ImportError:
+        pass
+
+    # Handle pandas DataFrames
+    if has_pandas and has_arrow:
+        import pyspark.pandas as ps
+
+        # Case 1: Both are pandas DataFrames
+        if isinstance(source_df, pd.DataFrame) and isinstance(target_df, 
pd.DataFrame):
+            # Validate columns exist
+            if source_column not in source_df.columns:
+                raise ValueError(f"Column '{source_column}' does not exist in 
source DataFrame.")
+            if target_column not in target_df.columns:
+                raise ValueError(f"Column '{target_column}' does not exist in 
target DataFrame.")
+
+            # Get unique non-null values from source column
+            source_values = source_df[source_column].dropna().unique()
+
+            # Get unique values from target column
+            target_values = set(target_df[target_column].unique())
+
+            # Find values in source that don't exist in target
+            missing_values = [val for val in source_values if val not in 
target_values]
+
+            if missing_values:
+                # Count occurrences of each missing value
+                missing_counts = {}
+                for val in missing_values:
+                    missing_counts[val] = 
len(source_df[source_df[source_column] == val])
+
+                # Create error message
+                error_msg = (
+                    f"Column '{source_column}' contains values not found in "
+                    f"target column '{target_column}'.\n"
+                )
+                error_msg += f"Missing values: {missing_values[:10]}" + (
+                    " (showing first 10 only)" if len(missing_values) > 10 
else ""
+                )
+                error_msg += f"\nTotal missing values: {len(missing_values)}"
+
+                if message:
+                    error_msg += f"\n{message}"
+
+                raise AssertionError(error_msg)
+
+            # If we get here, all values exist in target
+            return
+
+        # Case 2: Both are pandas-on-Spark DataFrames
+        elif isinstance(source_df, ps.DataFrame) and isinstance(target_df, 
ps.DataFrame):
+            # Validate columns exist
+            if source_column not in source_df.columns:
+                raise ValueError(f"Column '{source_column}' does not exist in 
source DataFrame.")
+            if target_column not in target_df.columns:
+                raise ValueError(f"Column '{target_column}' does not exist in 
target DataFrame.")
+
+            # Get unique non-null values from source column
+            source_values = 
source_df[source_column].dropna().unique().to_list()
+
+            # Get unique values from target column
+            target_values = set(target_df[target_column].unique().to_list())
+
+            # Find values in source that don't exist in target
+            missing_values = [val for val in source_values if val not in 
target_values]
+
+            if missing_values:
+                # Count occurrences of each missing value
+                missing_counts = {}
+                for val in missing_values:
+                    missing_counts[val] = 
len(source_df[source_df[source_column] == val])
+
+                # Create error message
+                error_msg = (
+                    f"Column '{source_column}' contains values not found in "
+                    f"target column '{target_column}'.\n"
+                )
+                error_msg += f"Missing values: {missing_values[:10]}" + (
+                    " (showing first 10 only)" if len(missing_values) > 10 
else ""
+                )
+                error_msg += f"\nTotal missing values: {len(missing_values)}"
+
+                if message:
+                    error_msg += f"\n{message}"
+
+                raise AssertionError(error_msg)
+
+            # If we get here, all values exist in target
+            return
+
+        # Case 3: Mixed DataFrame types - convert to Spark DataFrames for 
comparison
+        # This is handled by the Spark case below
+
+    # Handle Spark DataFrames or mixed types
+    # Ensure source_df is a Spark DataFrame
+    if not isinstance(source_df, DataFrame):
+        if has_pandas and has_arrow:
+            if isinstance(source_df, pd.DataFrame):
+                from pyspark.sql import SparkSession
+
+                spark = SparkSession.builder.getOrCreate()
+                source_df = spark.createDataFrame(source_df)
+            elif isinstance(source_df, ps.DataFrame):
+                source_df = source_df.to_spark()
+            else:
+                raise PySparkAssertionError(
+                    errorClass="INVALID_TYPE_DF_EQUALITY_ARG",
+                    messageParameters={
+                        "expected_type": (
+                            "Union[DataFrame, pandas.DataFrame, 
pyspark.pandas.DataFrame]"
+                        ),
+                        "arg_name": "source_df",
+                        "actual_type": type(source_df),
+                    },
+                )
+        else:
+            raise PySparkAssertionError(
+                errorClass="INVALID_TYPE_DF_EQUALITY_ARG",
+                messageParameters={
+                    "expected_type": (
+                        "Union[DataFrame, pandas.DataFrame, 
pyspark.pandas.DataFrame]"
+                    ),
+                    "arg_name": "source_df",
+                    "actual_type": type(source_df),
+                },
+            )
+
+    # Ensure target_df is a Spark DataFrame
+    if not isinstance(target_df, DataFrame):
+        if has_pandas and has_arrow:
+            if isinstance(target_df, pd.DataFrame):
+                from pyspark.sql import SparkSession
+
+                spark = SparkSession.builder.getOrCreate()
+                target_df = spark.createDataFrame(target_df)
+            elif isinstance(target_df, ps.DataFrame):
+                target_df = target_df.to_spark()
+            else:
+                raise PySparkAssertionError(
+                    errorClass="INVALID_TYPE_DF_EQUALITY_ARG",
+                    messageParameters={
+                        "expected_type": (
+                            "Union[DataFrame, pandas.DataFrame, 
pyspark.pandas.DataFrame]"
+                        ),
+                        "arg_name": "target_df",
+                        "actual_type": type(target_df),
+                    },
+                )
+        else:
+            raise PySparkAssertionError(
+                errorClass="INVALID_TYPE_DF_EQUALITY_ARG",
+                messageParameters={
+                    "expected_type": (
+                        "Union[DataFrame, pandas.DataFrame, 
pyspark.pandas.DataFrame]"
+                    ),
+                    "arg_name": "target_df",
+                    "actual_type": type(target_df),
+                },
+            )
+
+    # Check if either DataFrame is streaming
+    if source_df.isStreaming:
+        raise PySparkAssertionError(
+            errorClass="UNSUPPORTED_OPERATION",
+            messageParameters={"operation": "assertReferentialIntegrity on 
streaming DataFrame"},
+        )
+    if target_df.isStreaming:
+        raise PySparkAssertionError(
+            errorClass="UNSUPPORTED_OPERATION",
+            messageParameters={"operation": "assertReferentialIntegrity on 
streaming DataFrame"},
+        )
+
+    # Validate columns exist
+    if source_column not in source_df.columns:
+        raise ValueError(f"Column '{source_column}' does not exist in source 
DataFrame.")
+    if target_column not in target_df.columns:
+        raise ValueError(f"Column '{target_column}' does not exist in target 
DataFrame.")
+
+    # Get distinct non-null values from source column
+    source_values = (
+        
source_df.filter(source_df[source_column].isNotNull()).select(source_column).distinct()
+    )
+
+    # Get distinct values from target column
+    target_values = target_df.select(target_column).distinct()
+
+    # Find values in source that don't exist in target using a left anti join
+    missing_values_df = source_values.join(
+        target_values,
+        source_values[source_column] == target_values[target_column],
+        "left_anti",
+    )
+
+    # If there are missing values, raise an error
+    if missing_values_df.count() > 0:
+        # Get examples of missing values
+        missing_examples = missing_values_df.limit(10).collect()
+        missing_values_list = [row[source_column] for row in missing_examples]
+
+        # Create error message
+        error_msg = (
+            f"Column '{source_column}' contains values not found in "
+            f"target column '{target_column}'.\n"
+        )
+        error_msg += f"Missing values: {missing_values_list}" + (
+            " (showing first 10 only)" if len(missing_examples) >= 10 else ""
+        )
+        error_msg += f"\nTotal missing values: {missing_values_df.count()}"
+
+        if message:
+            error_msg += f"\n{message}"
+
+        raise AssertionError(error_msg)
 
 
 def _test() -> None:
     import doctest
-    from pyspark.sql import SparkSession
+

Review Comment:
   nit: remove



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to