You have got two separate things going on: You did materialize
but your unionByName is failing because the two DataFrames don’t actually have the same column set/schema at the moment of the union, even if printSchema() you looked at earlier seemed to match. Why unionByName is complaining (common culprits in your snippet) - You change type2_changes with: - target_col_list = list(set(target_col_list) - {...} - set(surrogate_key.split(','))) type2_changes = type2_changes.select(target_col_list) Using a set here can drop columns unpredictably (orderless; easy to miss a field). Also, target_col_list comes from the target schema, but type2_changes/source_new_non_migration_data_df come from the source schema. Those aren’t guaranteed to be identical. - You never re-project source_new_non_migration_data_df to target_col_list before the union. So the two frames can have different column sets. - If the is_check_banfield_data branch ever runs, this line: .select("new.*", "BusinessEffectiveStartDate") duplicates BusinessEffectiveStartDate (it’s already in new.*). That produces duplicate column names → unions/merges will blow up later. - Quick, robust fix (align schemas explicitly before union) Define the exact columns you plan to insert/update ONCE, then project both DataFrames to that list in the same order: from pyspark.sql import functions as F # 1) Pick a canonical column list (don’t use set(), keep order deterministic) drop_cols = {"DwCreatedYear","DwCreatedMonth","DwCreatedDay","IsDataMigrated","IsCurrent"} | set(surrogate_key.split(',')) # Base this on the *source* if you’re inserting source-shaped rows: insert_cols = [c for c in data_df.columns if c not in drop_cols] # 2) Re-project BOTH sides to the same ordered list type2_changes = type2_changes.select(*insert_cols) source_new_non_migration_data_df = source_new_non_migration_data_df.select(*insert_cols) # (Optional but recommended) Cast to identical types before union src_schema = {f.name: f.dataType for f in data_df.schema if f.name in insert_cols} def align_types(df, cols, schema_map): exprs = [ (F.col(c).cast(schema_map[c]).alias(c) if c in df.columns else F.lit(None).cast(schema_map[c]).alias(c)) for c in cols ] return df.select(*exprs) type2_changes = align_types(type2_changes, insert_cols, src_schema) source_new_non_migration_data_df = align_types(source_new_non_migration_data_df, insert_cols, src_schema) # 3) Now the unionByName will work type2_changes = type2_changes.unionByName(source_new_non_migration_data_df) Bottom line: your union error isn’t lazy-eval anymore — it’s a schema alignment issue. Pick the canonical column list, project both DFs to it (and cast types), then union. HTH Dr Mich Talebzadeh, Architect | Data Science | Financial Crime | Forensic Analysis | GDPR view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> On Wed, 13 Aug 2025 at 15:54, Karthick N <kcekarth...@gmail.com> wrote: > Thanks for support. > > I tried using action(count()) after persist to materialize the DataFrame > results. However, I’m still facing a column count mismatch issue when > performing unionByName. I have verified the column count and names in > both DataFrames using printSchema, and it shows that both DataFrames have > the same columns. Still, I’m not sure if there’s an underlying issue in the > code. I have highlighted the section where I’m encountering the problem. > > Note: Here source(*data_df *), we are not getting data from the same > table f"{catalog_name}.{db_name}.{table_name}, it is from a different table. > > Could you please take a look into the issue. > > if source_data_count > 0: > data_df = updateRowHashValues(data_df, type2_columns, primary_key) > data_df = data_df.drop("SourceTimeStamp") > target_df = spark.table(f"{catalog_name}.{db_name}.{table_name}") > .filter(col("IsCurrent") == True) # here, we are taking data from target > table > target_col_list = target_df.columns > source_with_target_df = data_df.alias("src").join(target_df.alias( > "tgt"), on="PatientId", how="left") > joined_df = source_with_target_df.persist() > print(f"Joined source new data with active target data count: { > joined_df.count()}") > source_new_data_df = joined_df.filter(col("tgt.DimPatientTypeIIKey"). > isNull()).select([col("src." + c).alias(c) for c in data_df.columns]) > source_new_migration_data_df = source_new_data_df.filter((lower(col(" > ModifiedBy")).contains("migration")) | (lower(col("ModifiedBy")).contains( > "migrated")) | (lower(col("CreatedBy")).contains("migration")) | (lower( > col("CreatedBy")).contains("migrated"))) > source_new_non_migration_data_df = source_new_data_df.alias("jdf"). > join(source_new_migration_data_df.alias("nrd"),col("jdf.PatientId") == col > ("nrd.PatientId"),how="left_anti").select([col("jdf." + c).alias(c) for c > in source_new_data_df.columns]) > > if is_check_banfield_data and (not source_new_migration_data_df. > isEmpty()): > patient_df = spark.table(f"{banfield_catalog_name}.bfdw.patient"). > selectExpr("patientid as patient_patientid", "createdate as > patient_createdate", "changedate as patient_changedate", "fw_modifiedts > as patient_fw_modifiedts").withColumn('patient_patientid', upper( > 'patient_patientid')) > banfield_patient_df = patient_df.persist() > window_spec = Window.partitionBy("patient_patientid").orderBy(col( > "patient_changedate").desc(),col("patient_fw_modifiedts").desc()) > banfield_patient_df = banfield_patient_df.withColumn("row_num", > row_number().over(window_spec)) > banfield_patient_df = banfield_patient_df.filter(col("row_num") == > 1).drop("row_num") > source_new_migration_data_df = source_new_migration_data_df.alias( > "new").join(banfield_patient_df.alias("pat"),col("new.PatientId") == col( > "pat.patient_patientid"),how="left") > source_new_migration_data_df = source_new_migration_data_df. > withColumn("BusinessEffectiveStartDate",least(col("pat.patient_createdate"), > col("BusinessEffectiveStartDate"))).select("new.*", > "BusinessEffectiveStartDate") > incomming_new_migration_data_df = source_new_migration_data_df. > persist() > else: > is_check_banfield_data = False > > type2_changes = joined_df.filter((col("src.RowHashType2") != col( > "tgt.RowHashType2")) & col("tgt.DimPatientTypeIIKey").isNotNull()).select( > "src.*") > type1_changes = joined_df.filter((col("src.RowHashType2") == col( > "tgt.RowHashType2")) & (col("src.RowHashType1") != col("tgt.RowHashType1" > ))).select("src.*") > expired_records = joined_df.filter((col("src.RowHashType2") != col( > "tgt.RowHashType2")) & col("tgt.DimPatientTypeIIKey").isNotNull() ).select > (col("tgt.*"), col("src.BusinessEffectiveStartDate").alias( > "NewBusinessEffectiveEndDate")).withColumn("BusinessEffectiveEndDate", col > ("NewBusinessEffectiveEndDate")).withColumn("IsCurrent", lit(False)).drop( > "NewBusinessEffectiveEndDate") > > max_key = spark.table(f"{catalog_name}.{db_name}.{table_name}").agg( > spark_max(surrogate_key)).collect()[0][0] or 0 > starting_key = max_key + 1 > target_col_list = list(set(target_col_list) - {"DwCreatedYear", > "DwCreatedMonth", "DwCreatedDay", "IsDataMigrated", "IsCurrent"} - set > (surrogate_key.split(','))) > print(f"type2_changes schema") > print(f"column count:{len(type2_changes.columns)}") > type2_changes.printSchema()# in this steps I am getting same column > count and details > # print(f"incomming_new_migration_data_df schema") > # incomming_new_migration_data_df.printSchema() > type2_changes = type2_changes.select(target_col_list) > if is_check_banfield_data: # For my test cases, it is False with > release > type2_changes = type2_changes.unionByName > (incomming_new_migration_data_df) > patient_df.unpersist() > source_new_migration_data_df.unpersist() > print(f"source_new_non_migration_data_df schema") > print(f"column count:{len(source_new_non_migration_data_df.columns)}") > source_new_non_migration_data_df.printSchema() # in this steps I am > getting same column count and details > type2_changes_records_count = type2_changes.count() > type2_changes = > type2_changes.unionByName(source_new_non_migration_data_df) > # I am facing column mismatch issue in this steps > type2_changes = type2_changes.withColumn("IsDataMigrated", when(lower( > col("ModifiedBy")).contains("migration") | lower(col("ModifiedBy")). > contains("migrated") | lower(col("CreatedBy")).contains("migration") | > lower(col("CreatedBy")).contains("migrated"),True).otherwise(False)) > type2_changes = type2_changes.withColumn("BusinessEffectiveEndDate", > lit("9999-12-31").cast("date")).withColumn("IsCurrent", lit(True)). > withColumn(surrogate_key, row_number().over(Window.orderBy("PatientId")) + > starting_key - 1) > type1_updates_columns = list(set(type1_changes.columns) - set > (type2_columns)) > type1_updates = type1_changes.select(*type1_updates_columns) > expired_records.createOrReplaceTempView(f"temp_updates_type2_expired_{ > db_name}_{table_name}") > type2_changes.createOrReplaceTempView(f"temp_inserts_new_records_{ > db_name}_{table_name}") > type1_updates.createOrReplaceTempView(f"temp_updates_type1_{db_name}_{ > table_name}") > expired_records_count = expired_records.count() > type1_updates_records_count = type1_updates.count() > print(log_message=f"{catalog_name}.{db_name}.{table_name} — "f"Existing > records expired count: {expired_records_count}, "f"New/changed Type 2 > records count: {type2_changes_records_count}, "f"Type 1 updates records > count: {type1_updates_records_count}") > > Thanks > > > On Mon, Aug 11, 2025 at 9:42 PM Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> Hi Karthick, >> >> The problem seems to be that you were performing >> transformation/recipe on three data frames without materialisation, then >> writing back to that target table. Each MERGE re-evaluated its “recipe” at >> a different time, so they saw different snapshots → flaky/empty results. >> >> Fix (short + sweet): >> >> 1. Freeze joined_df and each final DF *before any writes* (e.g., >> persist(); >> count() or localCheckpoint(eager=True)). >> 2. >> >> Or write the three sources to temp tables first, then MERGE from those >> >> >> This example below is built on Parquet but shows the concept >> >> >> # 0) Start Spark & clean a folder >> from pyspark.sql import SparkSession, functions as F >> import shutil >> >> spark = SparkSession.builder.appName("FreezeDemo-Parquet").getOrCreate() >> spark.sparkContext.setLogLevel("ERROR") >> >> path = "/tmp/demo_table_parquet" >> shutil.rmtree(path, ignore_errors=True) >> >> # 1) Seed data >> spark.createDataFrame([(1,"A"), (2,"B")], "id INT, val STRING") \ >> .write.mode("overwrite").parquet(path) >> >> # 2) Build DF BEFORE first append (will NOT see id=3 because Parquet path >> listing was captured) >> df1 = spark.read.parquet(path).filter(F.col("id") > 0) >> print("== Old DF (built before append) — no id=3 ==") >> df1.show() >> >> # 3) Append id=3 and build a NEW DF (this one sees 1,2,3) >> spark.createDataFrame([(3,"C")], "id INT, val STRING") \ >> .write.mode("append").parquet(path) >> df2 = spark.read.parquet(path).filter(F.col("id") > 0) >> print("== New DF (after append) — has id=3 ==") >> df2.show() >> >> # 4) HARD FREEZE df2 BEFORE any further appends >> spark.sparkContext.setCheckpointDir("/tmp/spark_checkpoints") # any >> writable path >> df2_frozen = df2.localCheckpoint(eager=True) >> >> # 5) Now append id=4 AFTER freezing >> spark.createDataFrame([(4,"D")], "id INT, val STRING") \ >> .write.mode("append").parquet(path) >> >> print("== Frozen snapshot (should NOT include id=4) ==") >> df2_frozen.show() # expect only 1,2,3 >> >> # Optional: prove live data now has 1,2,3,4 >> print("== Fresh read (should include id=4) ==") >> spark.read.parquet(path).orderBy("id").show() >> >> Output ( Running Spark version 3.5.5) >> >> == Old DF (built before append) — no id=3 == >> +---+---+ >> | id|val| >> +---+---+ >> | 1| A| >> | 2| B| >> +---+---+ >> >> == New DF (after append) — has id=3 == >> +---+---+ >> | id|val| >> +---+---+ >> | 1| A| >> | 3| C| >> | 2| B| >> +---+---+ >> >> == Frozen snapshot (should NOT include id=4) == >> +---+---+ >> | id|val| >> +---+---+ >> | 1| A| >> | 3| C| >> | 2| B| >> +---+---+ >> >> == Fresh read (should include id=4) == >> +---+---+ >> | id|val| >> +---+---+ >> | 1| A| >> | 2| B| >> | 3| C| >> | 4| D| >> +---+---+ >> >> HTH >> >> Dr Mich Talebzadeh, >> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> >> >> On Mon, 11 Aug 2025 at 08:12, Karthick N <kcekarth...@gmail.com> wrote: >> >>> Hi *Ángel*, >>> >>> Thank you for checking on this. I’ll review the points you mentioned and >>> get back to you with an update. >>> >>> Hi *Mich*, >>> Looping you in here — could you please assist in reviewing this issue >>> and share your inputs or suggestions? Your expertise would be really >>> helpful in resolving it. >>> >>> Thanks in advance. >>> >>> >>> On Mon, Aug 11, 2025 at 2:02 AM Ángel Álvarez Pascua < >>> angel.alvarez.pas...@gmail.com> wrote: >>> >>>> Have you tried disabling AQE? >>>> >>>> >>>> El dom, 10 ago 2025, 20:48, Karthick N <kcekarth...@gmail.com> >>>> escribió: >>>> >>>>> Hi Team, >>>>> >>>>> I’m facing an issue with the execution order in the PySpark code >>>>> snippet below. I’m not certain whether it’s caused by lazy evaluation, >>>>> Spark plan optimization, or something else. >>>>> >>>>> *Issue:* >>>>> For the same data and scenario, during some runs, one of the final >>>>> views is not returning any data. This appears to be due to changes in the >>>>> execution order, which in turn affects the final result. In the final >>>>> steps, we have three different DataFrames derived from the same base >>>>> DataFrame, and I’m not sure if this could be the cause. >>>>> >>>>> I tried using the persist option to hold intermediate results and >>>>> avoid potential lazy evaluation issues, but the problem still persists. >>>>> >>>>> Could you please review this issue and suggest a solution to ensure >>>>> consistent execution order and results? >>>>> >>>>> *Note:* Please let me know if you need any clarification or >>>>> additional information on this. >>>>> >>>>> Code: >>>>> source_df = spark.sql(f""" >>>>> SELECT * FROM {catalog_name}.{db_name}.{ >>>>> table_name}""") # Sample source query >>>>> >>>>> data_df = source_df.persist() >>>>> >>>>> # COMMAND ---------- >>>>> >>>>> type2_columns = [ >>>>> ] >>>>> data_df = updateRowHashValues(data_df, type2_columns, primary_key) >>>>> >>>>> # COMMAND ---------- >>>>> >>>>> target_df = spark.table(f"{catalog_name}.{db_name}.{table_name}" >>>>> ).filter(col("IsCurrent") == True) >>>>> target_col_list = target_df.columns >>>>> source_with_target_df = data_df.alias("src").join(target_df.alias( >>>>> "tgt"), on="PatientId", how="left") >>>>> joined_df = source_with_target_df.persist() >>>>> filtered_joined_df = joined_df.filter(col("tgt.DimTypeIIKey" >>>>> ).isNull()).select([col("src." + c).alias(c) for c in data_df >>>>> .columns]).drop("SourceTimeStamp") >>>>> >>>>> new_records_df = filtered_joined_df.filter((lower(col("ModifiedBy" >>>>> )).contains("migration")) | (lower(col("ModifiedBy")).contains( >>>>> "migrated")) | (lower(col("CreatedBy")).contains("migration")) | >>>>> (lower(col("CreatedBy")).contains("migrated"))) >>>>> >>>>> new_records_source_df = filtered_joined_df.alias("jdf").join( >>>>> new_records_df.alias("nrd"),col("jdf.PatientId") == col( >>>>> "nrd.PatientId"),how="left_anti").select([col("jdf." + c).alias(c) for >>>>> c in filtered_joined_df.columns]).drop('SourceTimeStamp') >>>>> >>>>> # COMMAND ---------- >>>>> >>>>> if is_check_banfield_data and (not new_records_df.isEmpty()): #This >>>>> is_check_banfield_data may get change based on the environment >>>>> patient_df = spark.table(f"{banfield_catalog_name}.bfdw.patient" >>>>> ).selectExpr("patientid as patient_patientid", "createdate as >>>>> patient_createdate", "changedate as patient_changedate", "fw_modifiedts >>>>> as patient_fw_modifiedts").withColumn('patient_patientid', upper( >>>>> 'patient_patientid')) >>>>> banfield_patient_df = patient_df.persist() >>>>> window_spec = Window.partitionBy("patient_patientid").orderBy(col( >>>>> "patient_changedate").desc(),col("patient_fw_modifiedts").desc()) >>>>> banfield_patient_df = banfield_patient_df.withColumn("row_num", >>>>> row_number().over(window_spec)) >>>>> banfield_patient_df = banfield_patient_df.filter(col("row_num") >>>>> == 1).drop("row_num") >>>>> new_records_df = new_records_df.alias("new").join( >>>>> banfield_patient_df.alias("pat"),col("new.PatientId") == col( >>>>> "pat.patient_patientid"),how="left") >>>>> new_records_df = new_records_df.withColumn( >>>>> "BusinessEffectiveStartDate",coalesce(col("pat.patient_createdate"), >>>>> col("BusinessEffectiveStartDate"))).select("new.*", >>>>> "BusinessEffectiveStartDate") >>>>> incoming_new_df = new_records_df.persist() >>>>> else: >>>>> is_check_banfield_data = False >>>>> >>>>> # COMMAND ---------- >>>>> >>>>> type2_changes = joined_df.filter((col("src.RowHashType2") != col( >>>>> "tgt.RowHashType2")) & col("tgt.DimTypeIIKey").isNotNull()).select( >>>>> "src.*") >>>>> type1_changes = joined_df.filter((col("src.RowHashType2") == col( >>>>> "tgt.RowHashType2")) & (col("src.RowHashType1") != col( >>>>> "tgt.RowHashType1"))).select("src.*") >>>>> expired_records = joined_df.filter((col("src.RowHashType2") != col( >>>>> "tgt.RowHashType2")) & col("tgt.DimTypeIIKey").isNotNull() >>>>> ).select(col("tgt.*"), col("src.BusinessEffectiveStartDate").alias( >>>>> "NewBusinessEffectiveEndDate")).withColumn("BusinessEffectiveEndDate", >>>>> col("NewBusinessEffectiveEndDate")).withColumn("IsCurrent", lit(False >>>>> )).drop("NewBusinessEffectiveEndDate") >>>>> >>>>> # COMMAND ---------- >>>>> >>>>> max_key = spark.table(f"{catalog_name}.{db_name}.{table_name}" >>>>> ).agg(spark_max(surrogate_key)).collect()[0][0] or 0 >>>>> starting_key = max_key + 1 >>>>> target_col_list = list(set(target_col_list) - {"DwCreatedYear", >>>>> "DwCreatedMonth", "DwCreatedDay", "IsDataMigrated", "IsCurrent"} - set >>>>> (surrogate_key.split(','))) >>>>> type2_changes = type2_changes.select(target_col_list) >>>>> if is_check_banfield_data: >>>>> type2_changes = type2_changes.unionByName(incoming_new_df) >>>>> patient_df.unpersist() >>>>> new_records_df.unpersist() >>>>> type2_changes = type2_changes.unionByName(new_records_source_df) >>>>> type2_changes = type2_changes.withColumn("IsDataMigrated", >>>>> when(lower(col("ModifiedBy")).contains("migration") | lower(col( >>>>> "ModifiedBy")).contains("migrated") | lower(col("CreatedBy" >>>>> )).contains("migration") | lower(col("CreatedBy")).contains("migrated" >>>>> ),True).otherwise(False)) >>>>> type2_changes = type2_changes.withColumn("BusinessEffectiveEndDate", >>>>> lit("9999-12-31").cast("date")).withColumn("IsCurrent", >>>>> lit(True)).withColumn(surrogate_key, >>>>> row_number().over(Window.orderBy("PatientId")) + starting_key - 1) >>>>> type1_updates_columns = list(set(type1_changes.columns) - set( >>>>> type2_columns)) >>>>> type1_updates = type1_changes.select(*type1_updates_columns) >>>>> expired_records.createOrReplaceTempView(f"temp_updates_type2_expired_{ >>>>> db_name}_{table_name}") # This are the three final temp views that >>>>> will be used in the merge statements or inserts. In some run for one of >>>>> the >>>>> views we don't getting data. >>>>> type2_changes.createOrReplaceTempView(f"temp_inserts_new_records_{ >>>>> db_name}_{table_name}") >>>>> type1_updates.createOrReplaceTempView(f"temp_updates_type1_{db_name}_{ >>>>> table_name}") >>>>> >>>>> # COMMAND ---------- >>>>> >>>>> # DBTITLE 1,Type1 column changes update >>>>> existing_records_update = spark.sql(f"""MERGE INTO {catalog_name}.{ >>>>> db_name}.{table_name} AS tgt >>>>> USING temp_updates_type1_{db_name}_{table_name} AS src >>>>> ON tgt.PatientId = src.PatientId AND tgt.IsCurrent = true >>>>> WHEN MATCHED THEN UPDATE SET >>>>> tgt.col1 = src.col1, >>>>> tgt.col2 = src.col2, >>>>> tgt.col3 = src.col3, >>>>> tgt.col4 = src.col4, >>>>> . >>>>> . >>>>> . >>>>> . >>>>> tgt.RowHashType1 = src.RowHashType1""") >>>>> print(f"Total no of records updated due to Type1 columns update: { >>>>> existing_records_update.select('num_updated_rows').collect()[0][0]}") >>>>> >>>>> # COMMAND ---------- >>>>> >>>>> # DBTITLE 1,Update Expired Record >>>>> update_expired_record = spark.sql(f"""MERGE INTO {catalog_name}.{ >>>>> db_name}.{table_name} AS tgt >>>>> USING temp_updates_type2_expired_{db_name}_{table_name} AS src >>>>> ON tgt.PatientId = src.PatientId AND tgt.IsCurrent = true >>>>> WHEN MATCHED THEN UPDATE SET >>>>> tgt.IsCurrent = false, >>>>> tgt.BusinessEffectiveEndDate = src.BusinessEffectiveEndDate, >>>>> tgt.DwModifiedts = src.DwModifiedts, >>>>> tgt.DwCreatedYear = year(src.DwModifiedts), >>>>> tgt.DwCreatedMonth = month(src.DwModifiedts), >>>>> tgt.DwCreatedDay = day(src.DwModifiedts)""") >>>>> print(log_message=f"Total no of records marked IsCurrent as False due >>>>> to type2 columns update: {update_expired_record.select( >>>>> 'num_updated_rows').collect()[0][0]}") >>>>> >>>>> # COMMAND ---------- >>>>> >>>>> # DBTITLE 1,Insert new records as type2 value changed and first time >>>>> data arrival >>>>> new_records_insertion = spark.sql(f"""INSERT INTO {catalog_name}.{ >>>>> db_name}.{table_name} ( >>>>> col1() >>>>> values( >>>>> col1 ) >>>>> FROM temp_inserts_new_records_{db_name}_{table_name} >>>>> """) >>>>> print(log_message=f"Total no of new records inserted: { >>>>> new_records_insertion.select('num_inserted_rows').collect()[0][0]}") >>>>> >>>>> # COMMAND ---------- >>>>> >>>>> source_df.unpersist() >>>>> source_with_target_df.unpersist() >>>>> >>>>