Short: spark uses lazy eval. This is the long answer The whole of this is taken from Google Gemini https://aistudio.google.com/ <https://aistudio.google.com/>
---------------------------------------------------------------------------- Of course. This is a classic and often subtle issue in Spark. Your suspicion that it's related to lazy evaluation and the complex DataFrame lineage is right on track. Let's break down the problem and then propose a robust solution. ### TL;DR: The Core Issue and Solution The most likely cause of your inconsistent results is **lazy evaluation combined with a "diamond" dependency graph**. You have a single DataFrame (`joined_df`) that acts as a source for three separate downstream transformations (`type2_changes`, `type1_changes`, `expired_records`). Even though you call `.persist()` on `joined_df`, Spark's lazy evaluation might not have physically computed and cached it yet. When you trigger the final actions (the three `MERGE`/`INSERT` statements), Spark might try to compute the plan for `joined_df` multiple times, and the optimizer could choose slightly different execution plans each time, leading to inconsistencies. The solution is to **force the materialization of your persisted DataFrames** at critical branching points before they are used in subsequent transformations. --- ### Detailed Analysis of the Problem 1. **Lazy Evaluation and the Spark DAG:** Spark doesn't execute transformations (like `join`, `filter`, `select`) immediately. It builds a logical plan, which is then optimized into a physical plan (a Directed Acyclic Graph, or DAG). The execution only happens when an **action** is called (e.g., `count()`, `collect()`, `save()`, `createOrReplaceTempView()`). 2. **The "Diamond" Problem:** Your code has this structure: ``` +-----------------+ | source_df | +-----------------+ | +-----------------+ | joined_df | <-- You called .persist() here +-----------------+ / | \ / | \ +----------------+ +----------------+ +-----------------+ | type1_changes | | type2_changes | | expired_records | +----------------+ +----------------+ +-----------------+ ``` When you run actions on the three final DataFrames (by creating temp views and running `MERGE`), Spark looks at the DAG. It sees that all three depend on `joined_df`. If `joined_df` hasn't been materialized and cached yet, the Spark optimizer might re-calculate it for each branch, or try to combine the plans in complex ways. This can lead to different results, especially if there are any non-deterministic operations or subtle race conditions in reading the source data. 3. **Why `.persist()` Isn't Enough on Its Own:** Calling `.persist()` or `.cache()` is just a declaration of intent. It tells Spark, "When you compute this DataFrame, please keep the result in memory/disk." It does **not** force the computation to happen at that moment. The computation is still deferred until the next action. ### Proposed Solution: Forcing Materialization To ensure consistent execution, you must force Spark to execute the plan up to the persisted DataFrame and store the results *before* you use it in the downstream branches. The easiest way to do this is to call a simple, relatively inexpensive action like `.count()` immediately after persisting. This action will trigger the computation of `joined_df`, and because you've marked it for persistence, the result will be cached. All subsequent operations that depend on `joined_df` will then read from this stable, consistent cache. I will also suggest another powerful technique, **checkpointing**, as a more robust alternative for very complex DAGs. --- ### Recommended Code Changes Here is your code, modified with the recommended changes. The key additions are the `.count()` calls after `.persist()`. ```python # No changes here source_df = spark.sql(f""" SELECT * FROM {catalog_name}.{db_name}.{table_name}""") data_df = source_df.persist() # COMMAND ---------- type2_columns = [] data_df = updateRowHashValues(data_df, type2_columns, primary_key) # Materialize data_df after a potentially expensive UDF # This ensures updateRowHashValues is only run once. data_df.count() # COMMAND ---------- target_df = spark.table(f"{catalog_name}.{db_name}.{table_name}").filter(col("IsCurrent") == True) target_col_list = target_df.columns source_with_target_df = data_df.alias("src").join(target_df.alias("tgt"), on="PatientId", how="left") # Persist the joined DataFrame, which is the critical branching point joined_df = source_with_target_df.persist() # ========================================================================= # SOLUCTION 1: Force materialization by calling an action. # This is the most important change. It forces Spark to compute and cache # joined_df before it's used in the three branches below. # ========================================================================= print(f"Materializing joined_df. Total joined records: {joined_df.count()}") # Now, all subsequent operations on joined_df will use the cached version filtered_joined_df = joined_df.filter(col("tgt.DimTypeIIKey").isNull()).select([col("src." + c).alias(c) for c in data_df.columns]).drop("SourceTimeStamp") new_records_df = filtered_joined_df.filter((lower(col("ModifiedBy")).contains("migration")) | (lower(col("ModifiedBy")).contains("migrated")) | (lower(col("CreatedBy")).contains("migration")) | (lower(col("CreatedBy")).contains("migrated"))) new_records_source_df = filtered_joined_df.alias("jdf").join(new_records_df.alias("nrd"),col("jdf.PatientId") == col("nrd.PatientId"),how="left_anti").select([col("jdf." + c).alias(c) for c in filtered_joined_df.columns]).drop('SourceTimeStamp') # COMMAND ---------- if is_check_banfield_data and (not new_records_df.isEmpty()): patient_df = spark.table(f"{banfield_catalog_name}.bfdw.patient").selectExpr("patientid as patient_patientid", "createdate as patient_createdate", "changedate as patient_changedate", "fw_modifiedts as patient_fw_modifiedts").withColumn('patient_patientid', upper('patient_patientid')) banfield_patient_df = patient_df.persist() # POTENTIAL ISSUE: Add a unique tie-breaker to the window function to ensure determinism # If two records have the same patient_changedate and patient_fw_modifiedts, the result is not guaranteed. # Add a unique key if one exists, e.g., a primary key from the source. # For now, let's assume this is acceptable, but it's a potential source of inconsistency. window_spec = Window.partitionBy("patient_patientid").orderBy(col("patient_changedate").desc(),col("patient_fw_modifiedts").desc()) banfield_patient_df = banfield_patient_df.withColumn("row_num", row_number().over(window_spec)) banfield_patient_df = banfield_patient_df.filter(col("row_num") == 1).drop("row_num") new_records_df = new_records_df.alias("new").join(banfield_patient_df.alias("pat"),col("new.PatientId") == col("pat.patient_patientid"),how="left") new_records_df = new_records_df.withColumn("BusinessEffectiveStartDate",coalesce(col("pat.patient_createdate"), col("BusinessEffectiveStartDate"))).select("new.*", "BusinessEffectiveStartDate") # Persist and materialize this complex DataFrame before unioning it. incoming_new_df = new_records_df.persist() print(f"Materializing incoming_new_df. Record count: {incoming_new_df.count()}") # Unpersist the intermediate DFs as they are now cached in incoming_new_df patient_df.unpersist() banfield_patient_df.unpersist() else: is_check_banfield_data = False # COMMAND ---------- # These three dataframes will now be derived from the *cached* joined_df, ensuring consistency. type2_changes = joined_df.filter((col("src.RowHashType2") != col("tgt.RowHashType2")) & col("tgt.DimTypeIIKey").isNotNull()).select("src.*") type1_changes = joined_df.filter((col("src.RowHashType2") == col("tgt.RowHashType2")) & (col("src.RowHashType1") != col("tgt.RowHashType1"))).select("src.*") expired_records = joined_df.filter((col("src.RowHashType2") != col("tgt.RowHashType2")) & col("tgt.DimTypeIIKey").isNotNull() ).select(col("tgt.*"), col("src.BusinessEffectiveStartDate").alias("NewBusinessEffectiveEndDate")).withColumn("BusinessEffectiveEndDate", col("NewBusinessEffectiveEndDate")).withColumn("IsCurrent", lit(False)).drop("NewBusinessEffectiveEndDate") # COMMAND ---------- max_key = spark.table(f"{catalog_name}.{db_name}.{table_name}").agg(spark_max(surrogate_key)).collect()[0][0] or 0 starting_key = max_key + 1 target_col_list = list(set(target_col_list) - {"DwCreatedYear","DwCreatedMonth", "DwCreatedDay", "IsDataMigrated", "IsCurrent"} - set(surrogate_key.split(','))) type2_changes = type2_changes.select(target_col_list) if is_check_banfield_data: type2_changes = type2_changes.unionByName(incoming_new_df) # Unpersist after it has been used in the union incoming_new_df.unpersist() type2_changes = type2_changes.unionByName(new_records_source_df) type2_changes = type2_changes.withColumn("IsDataMigrated", when(lower(col("ModifiedBy")).contains("migration") | lower(col("ModifiedBy")).contains("migrated") | lower(col("CreatedBy")).contains("migration") | lower(col("CreatedBy")).contains("migrated"),True).otherwise(False)) type2_changes = type2_changes.withColumn("BusinessEffectiveEndDate", lit("9999-12-31").cast("date")).withColumn("IsCurrent", lit(True)).withColumn(surrogate_key, row_number().over(Window.orderBy("PatientId")) + starting_key - 1) type1_updates_columns = list(set(type1_changes.columns) - set(type2_columns)) type1_updates = type1_changes.select(*type1_updates_columns) # Creating temp views is an action, but the damage from inconsistent plans would have already been done. # With our .count() call above, these views will be created from a stable, cached source. expired_records.createOrReplaceTempView(f"temp_updates_type2_expired_{db_name}_{table_name}") type2_changes.createOrReplaceTempView(f"temp_inserts_new_records_{db_name}_{table_name}") type1_updates.createOrReplaceTempView(f"temp_updates_type1_{db_name}_{table_name}") # ... (rest of your code for MERGE/INSERT statements) ... # COMMAND ---------- # Final cleanup source_df.unpersist() joined_df.unpersist() # Unpersist the explicitly cached DF ``` ### Alternative/More Robust Solution: Checkpointing For extremely complex pipelines, or if `.persist()` with `.count()` still proves insufficient (e.g., in case of executor failures where the cache might be lost), you can use `.checkpoint()`. * **What it does:** Checkpointing saves the DataFrame to a reliable file system (like DBFS, S3, HDFS) and, more importantly, **truncates the logical plan**. This means Spark completely "forgets" how the checkpointed DataFrame was created and will always read from the saved files. This provides the ultimate guarantee of stability. * **Downside:** It's more I/O-intensive than `persist()` because it always writes to disk. To use it, you would do the following: 1. **Set a checkpoint directory (once per session):** ```python spark.sparkContext.setCheckpointDir("/tmp/spark_checkpoints") ``` 2. **Replace `.persist()` with `.checkpoint()` at the critical step:** ```python # Instead of: joined_df = source_with_target_df.persist() joined_df = source_with_target_df.checkpoint() # The checkpoint() action itself triggers computation, so no .count() is needed. ``` For your use case, **forcing materialization with `.count()` is likely sufficient and more performant**. I would recommend trying that first. By implementing these changes, you will force a specific execution order at the most critical point in your code, ensuring that all three final views are derived from the exact same, pre-computed data, thus guaranteeing consistent results on every run. søn. 10. aug. 2025 kl. 20:47 skrev Karthick N <kcekarth...@gmail.com>: > Hi Team, > > I’m facing an issue with the execution order in the PySpark code snippet > below. I’m not certain whether it’s caused by lazy evaluation, Spark plan > optimization, or something else. > > *Issue:* > For the same data and scenario, during some runs, one of the final views > is not returning any data. This appears to be due to changes in the > execution order, which in turn affects the final result. In the final > steps, we have three different DataFrames derived from the same base > DataFrame, and I’m not sure if this could be the cause. > > I tried using the persist option to hold intermediate results and avoid > potential lazy evaluation issues, but the problem still persists. > > Could you please review this issue and suggest a solution to ensure > consistent execution order and results? > > *Note:* Please let me know if you need any clarification or additional > information on this. > > Code: > source_df = spark.sql(f""" > SELECT * FROM {catalog_name}.{db_name}.{table_name} > """) # Sample source query > > data_df = source_df.persist() > > # COMMAND ---------- > > type2_columns = [ > ] > data_df = updateRowHashValues(data_df, type2_columns, primary_key) > > # COMMAND ---------- > > target_df = spark.table(f"{catalog_name}.{db_name}.{table_name}" > ).filter(col("IsCurrent") == True) > target_col_list = target_df.columns > source_with_target_df = data_df.alias("src").join(target_df.alias("tgt"), > on="PatientId", how="left") > joined_df = source_with_target_df.persist() > filtered_joined_df = joined_df.filter(col("tgt.DimTypeIIKey" > ).isNull()).select([col("src." + c).alias(c) for c in data_df > .columns]).drop("SourceTimeStamp") > > new_records_df = filtered_joined_df.filter((lower(col("ModifiedBy" > )).contains("migration")) | (lower(col("ModifiedBy")).contains("migrated")) > | (lower(col("CreatedBy")).contains("migration")) | (lower(col("CreatedBy" > )).contains("migrated"))) > > new_records_source_df = filtered_joined_df.alias("jdf").join( > new_records_df.alias("nrd"),col("jdf.PatientId") == col("nrd.PatientId"), > how="left_anti").select([col("jdf." + c).alias(c) for c in > filtered_joined_df.columns]).drop('SourceTimeStamp') > > # COMMAND ---------- > > if is_check_banfield_data and (not new_records_df.isEmpty()): #This > is_check_banfield_data may get change based on the environment > patient_df = spark.table(f"{banfield_catalog_name}.bfdw.patient" > ).selectExpr("patientid as patient_patientid", "createdate as > patient_createdate", "changedate as patient_changedate", "fw_modifiedts > as patient_fw_modifiedts").withColumn('patient_patientid', upper( > 'patient_patientid')) > banfield_patient_df = patient_df.persist() > window_spec = Window.partitionBy("patient_patientid").orderBy(col( > "patient_changedate").desc(),col("patient_fw_modifiedts").desc()) > banfield_patient_df = banfield_patient_df.withColumn("row_num", > row_number().over(window_spec)) > banfield_patient_df = banfield_patient_df.filter(col("row_num") == 1 > ).drop("row_num") > new_records_df = new_records_df.alias("new").join(banfield_patient_df > .alias("pat"),col("new.PatientId") == col("pat.patient_patientid"),how= > "left") > new_records_df = new_records_df.withColumn( > "BusinessEffectiveStartDate",coalesce(col("pat.patient_createdate"), col( > "BusinessEffectiveStartDate"))).select("new.*", > "BusinessEffectiveStartDate") > incoming_new_df = new_records_df.persist() > else: > is_check_banfield_data = False > > # COMMAND ---------- > > type2_changes = joined_df.filter((col("src.RowHashType2") != col( > "tgt.RowHashType2")) & col("tgt.DimTypeIIKey").isNotNull()).select("src.*" > ) > type1_changes = joined_df.filter((col("src.RowHashType2") == col( > "tgt.RowHashType2")) & (col("src.RowHashType1") != col("tgt.RowHashType1" > ))).select("src.*") > expired_records = joined_df.filter((col("src.RowHashType2") != col( > "tgt.RowHashType2")) & col("tgt.DimTypeIIKey").isNotNull() ).select(col( > "tgt.*"), col("src.BusinessEffectiveStartDate").alias( > "NewBusinessEffectiveEndDate")).withColumn("BusinessEffectiveEndDate", > col("NewBusinessEffectiveEndDate")).withColumn("IsCurrent", lit(False > )).drop("NewBusinessEffectiveEndDate") > > # COMMAND ---------- > > max_key = spark.table(f"{catalog_name}.{db_name}.{table_name}" > ).agg(spark_max(surrogate_key)).collect()[0][0] or 0 > starting_key = max_key + 1 > target_col_list = list(set(target_col_list) - {"DwCreatedYear", > "DwCreatedMonth", "DwCreatedDay", "IsDataMigrated", "IsCurrent"} - set > (surrogate_key.split(','))) > type2_changes = type2_changes.select(target_col_list) > if is_check_banfield_data: > type2_changes = type2_changes.unionByName(incoming_new_df) > patient_df.unpersist() > new_records_df.unpersist() > type2_changes = type2_changes.unionByName(new_records_source_df) > type2_changes = type2_changes.withColumn("IsDataMigrated", when(lower(col( > "ModifiedBy")).contains("migration") | lower(col("ModifiedBy")).contains( > "migrated") | lower(col("CreatedBy")).contains("migration") | lower(col( > "CreatedBy")).contains("migrated"),True).otherwise(False)) > type2_changes = type2_changes.withColumn("BusinessEffectiveEndDate", lit( > "9999-12-31").cast("date")).withColumn("IsCurrent", > lit(True)).withColumn(surrogate_key, > row_number().over(Window.orderBy("PatientId")) + starting_key - 1) > type1_updates_columns = list(set(type1_changes.columns) - set( > type2_columns)) > type1_updates = type1_changes.select(*type1_updates_columns) > expired_records.createOrReplaceTempView(f"temp_updates_type2_expired_{ > db_name}_{table_name}") # This are the three final temp views that will > be used in the merge statements or inserts. In some run for one of the > views we don't getting data. > type2_changes.createOrReplaceTempView(f"temp_inserts_new_records_{db_name} > _{table_name}") > type1_updates.createOrReplaceTempView(f"temp_updates_type1_{db_name}_{ > table_name}") > > # COMMAND ---------- > > # DBTITLE 1,Type1 column changes update > existing_records_update = spark.sql(f"""MERGE INTO {catalog_name}.{db_name > }.{table_name} AS tgt > USING temp_updates_type1_{db_name}_{table_name} AS src > ON tgt.PatientId = src.PatientId AND tgt.IsCurrent = true > WHEN MATCHED THEN UPDATE SET > tgt.col1 = src.col1, > tgt.col2 = src.col2, > tgt.col3 = src.col3, > tgt.col4 = src.col4, > . > . > . > . > tgt.RowHashType1 = src.RowHashType1""") > print(f"Total no of records updated due to Type1 columns update: { > existing_records_update.select('num_updated_rows').collect()[0][0]}") > > # COMMAND ---------- > > # DBTITLE 1,Update Expired Record > update_expired_record = spark.sql(f"""MERGE INTO {catalog_name}.{db_name}. > {table_name} AS tgt > USING temp_updates_type2_expired_{db_name}_{table_name} AS src > ON tgt.PatientId = src.PatientId AND tgt.IsCurrent = true > WHEN MATCHED THEN UPDATE SET > tgt.IsCurrent = false, > tgt.BusinessEffectiveEndDate = src.BusinessEffectiveEndDate, > tgt.DwModifiedts = src.DwModifiedts, > tgt.DwCreatedYear = year(src.DwModifiedts), > tgt.DwCreatedMonth = month(src.DwModifiedts), > tgt.DwCreatedDay = day(src.DwModifiedts)""") > print(log_message=f"Total no of records marked IsCurrent as False due to > type2 columns update: {update_expired_record.select('num_updated_rows' > ).collect()[0][0]}") > > # COMMAND ---------- > > # DBTITLE 1,Insert new records as type2 value changed and first time data > arrival > new_records_insertion = spark.sql(f"""INSERT INTO {catalog_name}.{db_name} > .{table_name} ( > col1() > values( > col1 ) > FROM temp_inserts_new_records_{db_name}_{table_name} > """) > print(log_message=f"Total no of new records inserted: { > new_records_insertion.select('num_inserted_rows').collect()[0][0]}") > > # COMMAND ---------- > > source_df.unpersist() > source_with_target_df.unpersist() > -- Bjørn Jørgensen Vestre Aspehaug 4, 6010 Ålesund Norge +47 480 94 297