Bernardo Alemar created SPARK-57352:
---------------------------------------
Summary: [SDP] Pipeline execution order is broken because lineage
is not inferred from spark.table / spark.readStream.table
Key: SPARK-57352
URL: https://issues.apache.org/jira/browse/SPARK-57352
Project: Spark
Issue Type: Bug
Components: Connect, Declarative Pipelines
Affects Versions: 4.1.1
Environment: - {*}Spark Version{*}: 4.1.1
- {*}Execution Mode{*}: Spark Connect
- {*}Deployment Platform{*}: AWS EKS (Elastic Kubernetes Service)
- {*}Catalog Metastore{*}: AWS Glue Data Catalog
- {*}Table Format{*}: Apache Iceberg
- {*}Apache Iceberg Version{*}: 1.10.1
- {*}AWS Bundle Version{*}: 1.10.1
- {*}Spark Packages{*}:
org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.10.1
org.apache.iceberg:iceberg-aws-bundle:1.10.1
Reporter: Bernardo Alemar
*Error Description*
When building a pipeline using the new Declarative Pipelines framework
(`pyspark.pipelines`), the execution engine fails to correctly infer
dependencies (DAG lineage) if a downstream table (e.g., Silver layer)
references an upstream table (e.g., Bronze layer) using `spark.table()` or
`spark.readStream.table()`.
Because the framework does not analyze the table identifier strings to resolve
internal pipeline dependencies, it schedules downstream materialized
views/tables to run *before* their upstream dependencies, causing the pipeline
execution order to break.
*Steps to Reproduce*
Consider the following pipeline script where Silver tables depend on Bronze raw
tables:
{code:java}
from pyspark import pipelines as dp
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col
spark = SparkSession.active()
BUCKET = "spark-poc-ingest-teste"
# 1. BRONZE LAYER (Raw data)
@dp.materialized_view
def user_data() -> DataFrame:
return
spark.read.format("parquet").load(f"s3a://{BUCKET}/teodoro/mock_data_materialized_view")
@dp.table
def exchange() -> DataFrame:
return
spark.readStream.format("parquet").load(f"s3a://{BUCKET}/teodoro/exchange_data")
# 2. SILVER LAYER (Fails to infer that it depends on the Bronze layer above)
@dp.materialized_view(name="teodoro_silver.user_data")
def silver_user_data():
# Reading via spark.table hides the lineage from the dp engine
return spark.table("teodoro_raw.user_data")
@dp.table(name="teodoro_silver.exchange")
def silver_exchange():
# Reading via spark.readStream.table hides the lineage from the dp engine
return spark.readStream.table("teodoro_raw.exchange").withColumn("time",
col("time").cast("timestamp")){code}
*Expected Behavior*
The Declarative Pipeline engine should parse or intercept `spark.table()` /
`spark.readStream.table()` calls within the `@dp` decorators, or provide a
mechanism to explicitly resolve dependencies, ensuring that the Bronze tables
(`user_data`, `exchange`) are executed and populated *before* the Silver tables
start running.
*Actual Behavior*
The pipeline starts executing Silver tasks first (e.g.,
`teodoro_silver.user_data`), leading to execution failures because the
underlying raw tables/views have not been initialized or updated yet by the
framework.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]