Bernardo Alemar created SPARK-57352:
---------------------------------------

             Summary: [SDP] Pipeline execution order is broken because lineage 
is not inferred from spark.table / spark.readStream.table
                 Key: SPARK-57352
                 URL: https://issues.apache.org/jira/browse/SPARK-57352
             Project: Spark
          Issue Type: Bug
          Components: Connect, Declarative Pipelines
    Affects Versions: 4.1.1
         Environment: - {*}Spark Version{*}: 4.1.1
- {*}Execution Mode{*}: Spark Connect
- {*}Deployment Platform{*}: AWS EKS (Elastic Kubernetes Service)
- {*}Catalog Metastore{*}: AWS Glue Data Catalog
- {*}Table Format{*}: Apache Iceberg
- {*}Apache Iceberg Version{*}: 1.10.1
- {*}AWS Bundle Version{*}: 1.10.1

- {*}Spark Packages{*}: 
  org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.10.1
  org.apache.iceberg:iceberg-aws-bundle:1.10.1
            Reporter: Bernardo Alemar


*Error Description*

When building a pipeline using the new Declarative Pipelines framework 
(`pyspark.pipelines`), the execution engine fails to correctly infer 
dependencies (DAG lineage) if a downstream table (e.g., Silver layer) 
references an upstream table (e.g., Bronze layer) using `spark.table()` or 
`spark.readStream.table()`.

Because the framework does not analyze the table identifier strings to resolve 
internal pipeline dependencies, it schedules downstream materialized 
views/tables to run *before* their upstream dependencies, causing the pipeline 
execution order to break.

 

*Steps to Reproduce*

Consider the following pipeline script where Silver tables depend on Bronze raw 
tables:
{code:java}
from pyspark import pipelines as dp
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col
spark = SparkSession.active()
BUCKET = "spark-poc-ingest-teste"
# 1. BRONZE LAYER (Raw data)
@dp.materialized_view
def user_data() -> DataFrame:
    return 
spark.read.format("parquet").load(f"s3a://{BUCKET}/teodoro/mock_data_materialized_view")
@dp.table
def exchange() -> DataFrame:
    return 
spark.readStream.format("parquet").load(f"s3a://{BUCKET}/teodoro/exchange_data")
# 2. SILVER LAYER (Fails to infer that it depends on the Bronze layer above)
@dp.materialized_view(name="teodoro_silver.user_data")
def silver_user_data():
    # Reading via spark.table hides the lineage from the dp engine
    return spark.table("teodoro_raw.user_data")
@dp.table(name="teodoro_silver.exchange")
def silver_exchange():
    # Reading via spark.readStream.table hides the lineage from the dp engine
    return spark.readStream.table("teodoro_raw.exchange").withColumn("time", 
col("time").cast("timestamp")){code}
*Expected Behavior*

The Declarative Pipeline engine should parse or intercept `spark.table()` / 
`spark.readStream.table()` calls within the `@dp` decorators, or provide a 
mechanism to explicitly resolve dependencies, ensuring that the Bronze tables 
(`user_data`, `exchange`) are executed and populated *before* the Silver tables 
start running.

 

*Actual Behavior*

The pipeline starts executing Silver tasks first (e.g., 
`teodoro_silver.user_data`), leading to execution failures because the 
underlying raw tables/views have not been initialized or updated yet by the 
framework.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to