Thanks for support.

I tried using action(count()) after persist to materialize the DataFrame
results. However, I’m still facing a column count mismatch issue when
performing unionByName. I have verified the column count and names in both
DataFrames using printSchema, and it shows that both DataFrames have the
same columns. Still, I’m not sure if there’s an underlying issue in the
code. I have highlighted the section where I’m encountering the problem.

Note: Here source(*data_df *), we are not getting data from the same
table f"{catalog_name}.{db_name}.{table_name}, it is from a different table.

Could you please take a look into the issue.

if source_data_count > 0:
    data_df = updateRowHashValues(data_df, type2_columns, primary_key)
    data_df = data_df.drop("SourceTimeStamp")
    target_df = spark.table(f"{catalog_name}.{db_name}.{table_name}").filter
(col("IsCurrent") == True) # here, we are taking data from target table
    target_col_list = target_df.columns
    source_with_target_df = data_df.alias("src").join(target_df.alias("tgt"),
on="PatientId", how="left")
    joined_df = source_with_target_df.persist()
    print(f"Joined source new data with active target data count: {
joined_df.count()}")
    source_new_data_df = joined_df.filter(col("tgt.DimPatientTypeIIKey").
isNull()).select([col("src." + c).alias(c) for c in data_df.columns])
    source_new_migration_data_df = source_new_data_df.filter((lower(col("
ModifiedBy")).contains("migration")) | (lower(col("ModifiedBy")).contains(
"migrated")) | (lower(col("CreatedBy")).contains("migration")) | (lower(col(
"CreatedBy")).contains("migrated")))
    source_new_non_migration_data_df = source_new_data_df.alias("jdf").join
(source_new_migration_data_df.alias("nrd"),col("jdf.PatientId") == col(
"nrd.PatientId"),how="left_anti").select([col("jdf." + c).alias(c) for c in
source_new_data_df.columns])

    if is_check_banfield_data and (not source_new_migration_data_df.isEmpty
()):
        patient_df = spark.table(f"{banfield_catalog_name}.bfdw.patient").
selectExpr("patientid as patient_patientid", "createdate as
patient_createdate", "changedate as patient_changedate", "fw_modifiedts as
patient_fw_modifiedts").withColumn('patient_patientid', upper(
'patient_patientid'))
        banfield_patient_df = patient_df.persist()
        window_spec = Window.partitionBy("patient_patientid").orderBy(col(
"patient_changedate").desc(),col("patient_fw_modifiedts").desc())
        banfield_patient_df = banfield_patient_df.withColumn("row_num",
row_number().over(window_spec))
        banfield_patient_df = banfield_patient_df.filter(col("row_num") == 1
).drop("row_num")
        source_new_migration_data_df = source_new_migration_data_df.alias(
"new").join(banfield_patient_df.alias("pat"),col("new.PatientId") == col(
"pat.patient_patientid"),how="left")
        source_new_migration_data_df = source_new_migration_data_df.
withColumn("BusinessEffectiveStartDate",least(col("pat.patient_createdate"),
col("BusinessEffectiveStartDate"))).select("new.*",
"BusinessEffectiveStartDate")
        incomming_new_migration_data_df = source_new_migration_data_df.
persist()
    else:
        is_check_banfield_data = False

    type2_changes = joined_df.filter((col("src.RowHashType2") != col(
"tgt.RowHashType2")) & col("tgt.DimPatientTypeIIKey").isNotNull()).select(
"src.*")
    type1_changes = joined_df.filter((col("src.RowHashType2") == col(
"tgt.RowHashType2")) & (col("src.RowHashType1") != col("tgt.RowHashType1"
))).select("src.*")
    expired_records = joined_df.filter((col("src.RowHashType2") != col(
"tgt.RowHashType2")) & col("tgt.DimPatientTypeIIKey").isNotNull() ).select(
col("tgt.*"), col("src.BusinessEffectiveStartDate").alias(
"NewBusinessEffectiveEndDate")).withColumn("BusinessEffectiveEndDate", col(
"NewBusinessEffectiveEndDate")).withColumn("IsCurrent", lit(False)).drop(
"NewBusinessEffectiveEndDate")

    max_key = spark.table(f"{catalog_name}.{db_name}.{table_name}").agg(
spark_max(surrogate_key)).collect()[0][0] or 0
    starting_key = max_key + 1
    target_col_list = list(set(target_col_list) - {"DwCreatedYear",
"DwCreatedMonth", "DwCreatedDay", "IsDataMigrated", "IsCurrent"} - set
(surrogate_key.split(',')))
    print(f"type2_changes schema")
    print(f"column count:{len(type2_changes.columns)}")
    type2_changes.printSchema()# in this steps I am getting same column
count and details
    # print(f"incomming_new_migration_data_df schema")
    # incomming_new_migration_data_df.printSchema()
    type2_changes = type2_changes.select(target_col_list)
    if is_check_banfield_data: # For my test cases, it is False with release
        type2_changes = type2_changes.unionByName
(incomming_new_migration_data_df)
        patient_df.unpersist()
        source_new_migration_data_df.unpersist()
    print(f"source_new_non_migration_data_df schema")
    print(f"column count:{len(source_new_non_migration_data_df.columns)}")
    source_new_non_migration_data_df.printSchema() # in this steps I am
getting same column count and details
    type2_changes_records_count = type2_changes.count()
    type2_changes = type2_changes.unionByName(source_new_non_migration_data_df)
# I am facing column mismatch issue in this steps
    type2_changes = type2_changes.withColumn("IsDataMigrated", when(lower(
col("ModifiedBy")).contains("migration") | lower(col("ModifiedBy")).contains
("migrated") | lower(col("CreatedBy")).contains("migration") | lower(col(
"CreatedBy")).contains("migrated"),True).otherwise(False))
    type2_changes = type2_changes.withColumn("BusinessEffectiveEndDate", lit
("9999-12-31").cast("date")).withColumn("IsCurrent",
lit(True)).withColumn(surrogate_key,
row_number().over(Window.orderBy("PatientId")) + starting_key - 1)
    type1_updates_columns = list(set(type1_changes.columns) - set
(type2_columns))
    type1_updates = type1_changes.select(*type1_updates_columns)
    expired_records.createOrReplaceTempView(f"temp_updates_type2_expired_{
db_name}_{table_name}")
    type2_changes.createOrReplaceTempView(f"temp_inserts_new_records_{
db_name}_{table_name}")
    type1_updates.createOrReplaceTempView(f"temp_updates_type1_{db_name}_{
table_name}")
    expired_records_count = expired_records.count()
    type1_updates_records_count = type1_updates.count()
    print(log_message=f"{catalog_name}.{db_name}.{table_name} — "f"Existing
records expired count: {expired_records_count}, "f"New/changed Type 2
records count: {type2_changes_records_count}, "f"Type 1 updates records
count: {type1_updates_records_count}")

Thanks


On Mon, Aug 11, 2025 at 9:42 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi Karthick,
>
> The problem seems to be that you were performing  transformation/recipe on
> three data frames without materialisation, then writing back to that target
> table. Each MERGE re-evaluated its “recipe” at a different time, so they
> saw different snapshots → flaky/empty results.
>
> Fix (short + sweet):
>
>    1. Freeze joined_df and each final DF *before any writes* (e.g., persist();
>    count() or localCheckpoint(eager=True)).
>    2.
>
>    Or write the three sources to temp tables first, then MERGE from those
>
>
> This example below is built on Parquet but shows the concept
>
>
> # 0)  Start Spark & clean a folder
> from pyspark.sql import SparkSession, functions as F
> import shutil
>
> spark = SparkSession.builder.appName("FreezeDemo-Parquet").getOrCreate()
> spark.sparkContext.setLogLevel("ERROR")
>
> path = "/tmp/demo_table_parquet"
> shutil.rmtree(path, ignore_errors=True)
>
> # 1) Seed data
> spark.createDataFrame([(1,"A"), (2,"B")], "id INT, val STRING") \
>      .write.mode("overwrite").parquet(path)
>
> # 2) Build DF BEFORE first append (will NOT see id=3 because Parquet path
> listing was captured)
> df1 = spark.read.parquet(path).filter(F.col("id") > 0)
> print("== Old DF (built before append) — no id=3 ==")
> df1.show()
>
> # 3) Append id=3 and build a NEW DF (this one sees 1,2,3)
> spark.createDataFrame([(3,"C")], "id INT, val STRING") \
>      .write.mode("append").parquet(path)
> df2 = spark.read.parquet(path).filter(F.col("id") > 0)
> print("== New DF (after append) — has id=3 ==")
> df2.show()
>
> # 4) HARD FREEZE df2 BEFORE any further appends
> spark.sparkContext.setCheckpointDir("/tmp/spark_checkpoints")  # any
> writable path
> df2_frozen = df2.localCheckpoint(eager=True)
>
> # 5) Now append id=4 AFTER freezing
> spark.createDataFrame([(4,"D")], "id INT, val STRING") \
>      .write.mode("append").parquet(path)
>
> print("== Frozen snapshot (should NOT include id=4) ==")
> df2_frozen.show()   # expect only 1,2,3
>
> # Optional: prove live data now has 1,2,3,4
> print("== Fresh read (should include id=4) ==")
> spark.read.parquet(path).orderBy("id").show()
>
> Output ( Running Spark version 3.5.5)
>
> == Old DF (built before append) — no id=3 ==
> +---+---+
> | id|val|
> +---+---+
> |  1|  A|
> |  2|  B|
> +---+---+
>
> == New DF (after append) — has id=3 ==
> +---+---+
> | id|val|
> +---+---+
> |  1|  A|
> |  3|  C|
> |  2|  B|
> +---+---+
>
> == Frozen snapshot (should NOT include id=4) ==
> +---+---+
> | id|val|
> +---+---+
> |  1|  A|
> |  3|  C|
> |  2|  B|
> +---+---+
>
> == Fresh read (should include id=4) ==
> +---+---+
> | id|val|
> +---+---+
> |  1|  A|
> |  2|  B|
> |  3|  C|
> |  4|  D|
> +---+---+
>
> HTH
>
> Dr Mich Talebzadeh,
> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
>
>
> On Mon, 11 Aug 2025 at 08:12, Karthick N <kcekarth...@gmail.com> wrote:
>
>> Hi *Ángel*,
>>
>> Thank you for checking on this. I’ll review the points you mentioned and
>> get back to you with an update.
>>
>> Hi *Mich*,
>> Looping you in here — could you please assist in reviewing this issue and
>> share your inputs or suggestions? Your expertise would be really helpful in
>> resolving it.
>>
>> Thanks in advance.
>>
>>
>> On Mon, Aug 11, 2025 at 2:02 AM Ángel Álvarez Pascua <
>> angel.alvarez.pas...@gmail.com> wrote:
>>
>>> Have you tried disabling AQE?
>>>
>>>
>>> El dom, 10 ago 2025, 20:48, Karthick N <kcekarth...@gmail.com> escribió:
>>>
>>>> Hi Team,
>>>>
>>>> I’m facing an issue with the execution order in the PySpark code
>>>> snippet below. I’m not certain whether it’s caused by lazy evaluation,
>>>> Spark plan optimization, or something else.
>>>>
>>>> *Issue:*
>>>> For the same data and scenario, during some runs, one of the final
>>>> views is not returning any data. This appears to be due to changes in the
>>>> execution order, which in turn affects the final result. In the final
>>>> steps, we have three different DataFrames derived from the same base
>>>> DataFrame, and I’m not sure if this could be the cause.
>>>>
>>>> I tried using the persist option to hold intermediate results and
>>>> avoid potential lazy evaluation issues, but the problem still persists.
>>>>
>>>> Could you please review this issue and suggest a solution to ensure
>>>> consistent execution order and results?
>>>>
>>>> *Note:* Please let me know if you need any clarification or additional
>>>> information on this.
>>>>
>>>> Code:
>>>> source_df = spark.sql(f"""
>>>>                       SELECT * FROM {catalog_name}.{db_name}.{
>>>> table_name}""")  # Sample source query
>>>>
>>>> data_df = source_df.persist()
>>>>
>>>> # COMMAND ----------
>>>>
>>>> type2_columns = [
>>>> ]
>>>> data_df = updateRowHashValues(data_df, type2_columns, primary_key)
>>>>
>>>> # COMMAND ----------
>>>>
>>>> target_df = spark.table(f"{catalog_name}.{db_name}.{table_name}"
>>>> ).filter(col("IsCurrent") == True)
>>>> target_col_list = target_df.columns
>>>> source_with_target_df = data_df.alias("src").join(target_df.alias("tgt"),
>>>> on="PatientId", how="left")
>>>> joined_df = source_with_target_df.persist()
>>>> filtered_joined_df = joined_df.filter(col("tgt.DimTypeIIKey"
>>>> ).isNull()).select([col("src." + c).alias(c) for c in data_df
>>>> .columns]).drop("SourceTimeStamp")
>>>>
>>>> new_records_df = filtered_joined_df.filter((lower(col("ModifiedBy"
>>>> )).contains("migration")) | (lower(col("ModifiedBy")).contains(
>>>> "migrated")) | (lower(col("CreatedBy")).contains("migration")) |
>>>> (lower(col("CreatedBy")).contains("migrated")))
>>>>
>>>> new_records_source_df = filtered_joined_df.alias("jdf").join(
>>>> new_records_df.alias("nrd"),col("jdf.PatientId") == col("nrd.PatientId"
>>>> ),how="left_anti").select([col("jdf." + c).alias(c) for c in
>>>> filtered_joined_df.columns]).drop('SourceTimeStamp')
>>>>
>>>> # COMMAND ----------
>>>>
>>>> if is_check_banfield_data and (not new_records_df.isEmpty()):    #This
>>>> is_check_banfield_data may get change based on the environment
>>>>     patient_df = spark.table(f"{banfield_catalog_name}.bfdw.patient"
>>>> ).selectExpr("patientid as patient_patientid", "createdate as
>>>> patient_createdate", "changedate as patient_changedate", "fw_modifiedts
>>>> as patient_fw_modifiedts").withColumn('patient_patientid', upper(
>>>> 'patient_patientid'))
>>>>     banfield_patient_df = patient_df.persist()
>>>>     window_spec = Window.partitionBy("patient_patientid").orderBy(col(
>>>> "patient_changedate").desc(),col("patient_fw_modifiedts").desc())
>>>>     banfield_patient_df = banfield_patient_df.withColumn("row_num",
>>>> row_number().over(window_spec))
>>>>     banfield_patient_df = banfield_patient_df.filter(col("row_num") ==
>>>> 1).drop("row_num")
>>>>     new_records_df = new_records_df.alias("new").join(
>>>> banfield_patient_df.alias("pat"),col("new.PatientId") == col(
>>>> "pat.patient_patientid"),how="left")
>>>>     new_records_df = new_records_df.withColumn(
>>>> "BusinessEffectiveStartDate",coalesce(col("pat.patient_createdate"),
>>>> col("BusinessEffectiveStartDate"))).select("new.*",
>>>> "BusinessEffectiveStartDate")
>>>>     incoming_new_df = new_records_df.persist()
>>>> else:
>>>>     is_check_banfield_data = False
>>>>
>>>> # COMMAND ----------
>>>>
>>>> type2_changes = joined_df.filter((col("src.RowHashType2") != col(
>>>> "tgt.RowHashType2")) & col("tgt.DimTypeIIKey").isNotNull()).select(
>>>> "src.*")
>>>> type1_changes = joined_df.filter((col("src.RowHashType2") == col(
>>>> "tgt.RowHashType2")) & (col("src.RowHashType1") != col(
>>>> "tgt.RowHashType1"))).select("src.*")
>>>> expired_records = joined_df.filter((col("src.RowHashType2") != col(
>>>> "tgt.RowHashType2")) & col("tgt.DimTypeIIKey").isNotNull()
>>>> ).select(col("tgt.*"), col("src.BusinessEffectiveStartDate").alias(
>>>> "NewBusinessEffectiveEndDate")).withColumn("BusinessEffectiveEndDate",
>>>> col("NewBusinessEffectiveEndDate")).withColumn("IsCurrent", lit(False
>>>> )).drop("NewBusinessEffectiveEndDate")
>>>>
>>>> # COMMAND ----------
>>>>
>>>> max_key = spark.table(f"{catalog_name}.{db_name}.{table_name}"
>>>> ).agg(spark_max(surrogate_key)).collect()[0][0] or 0
>>>> starting_key = max_key + 1
>>>> target_col_list = list(set(target_col_list) - {"DwCreatedYear",
>>>> "DwCreatedMonth", "DwCreatedDay", "IsDataMigrated", "IsCurrent"} - set
>>>> (surrogate_key.split(',')))
>>>> type2_changes = type2_changes.select(target_col_list)
>>>> if is_check_banfield_data:
>>>>     type2_changes = type2_changes.unionByName(incoming_new_df)
>>>>     patient_df.unpersist()
>>>>     new_records_df.unpersist()
>>>> type2_changes = type2_changes.unionByName(new_records_source_df)
>>>> type2_changes = type2_changes.withColumn("IsDataMigrated",
>>>> when(lower(col("ModifiedBy")).contains("migration") | lower(col(
>>>> "ModifiedBy")).contains("migrated") | lower(col("CreatedBy")).contains(
>>>> "migration") | lower(col("CreatedBy")).contains("migrated"),True
>>>> ).otherwise(False))
>>>> type2_changes = type2_changes.withColumn("BusinessEffectiveEndDate",
>>>> lit("9999-12-31").cast("date")).withColumn("IsCurrent", 
>>>> lit(True)).withColumn(surrogate_key,
>>>> row_number().over(Window.orderBy("PatientId")) + starting_key - 1)
>>>> type1_updates_columns = list(set(type1_changes.columns) - set(
>>>> type2_columns))
>>>> type1_updates = type1_changes.select(*type1_updates_columns)
>>>> expired_records.createOrReplaceTempView(f"temp_updates_type2_expired_{
>>>> db_name}_{table_name}")   # This are the three final temp views that
>>>> will be used in the merge statements or inserts. In some run for one of the
>>>> views we don't getting data.
>>>> type2_changes.createOrReplaceTempView(f"temp_inserts_new_records_{
>>>> db_name}_{table_name}")
>>>> type1_updates.createOrReplaceTempView(f"temp_updates_type1_{db_name}_{
>>>> table_name}")
>>>>
>>>> # COMMAND ----------
>>>>
>>>> # DBTITLE 1,Type1 column changes update
>>>> existing_records_update = spark.sql(f"""MERGE INTO {catalog_name}.{
>>>> db_name}.{table_name} AS tgt
>>>> USING temp_updates_type1_{db_name}_{table_name} AS src
>>>> ON tgt.PatientId = src.PatientId AND tgt.IsCurrent = true
>>>> WHEN MATCHED THEN UPDATE SET
>>>>     tgt.col1 = src.col1,
>>>>     tgt.col2 = src.col2,
>>>>     tgt.col3 = src.col3,
>>>>     tgt.col4 = src.col4,
>>>>     .
>>>>     .
>>>>     .
>>>>     .
>>>>     tgt.RowHashType1 = src.RowHashType1""")
>>>> print(f"Total no of records updated due to Type1 columns update: {
>>>> existing_records_update.select('num_updated_rows').collect()[0][0]}")
>>>>
>>>> # COMMAND ----------
>>>>
>>>> # DBTITLE 1,Update Expired Record
>>>> update_expired_record = spark.sql(f"""MERGE INTO {catalog_name}.{
>>>> db_name}.{table_name} AS tgt
>>>> USING temp_updates_type2_expired_{db_name}_{table_name} AS src
>>>> ON tgt.PatientId = src.PatientId AND tgt.IsCurrent = true
>>>> WHEN MATCHED THEN UPDATE SET
>>>>     tgt.IsCurrent = false,
>>>>     tgt.BusinessEffectiveEndDate = src.BusinessEffectiveEndDate,
>>>>     tgt.DwModifiedts = src.DwModifiedts,
>>>>     tgt.DwCreatedYear = year(src.DwModifiedts),
>>>>     tgt.DwCreatedMonth = month(src.DwModifiedts),
>>>>     tgt.DwCreatedDay = day(src.DwModifiedts)""")
>>>> print(log_message=f"Total no of records marked IsCurrent as False due
>>>> to type2 columns update: {update_expired_record.select(
>>>> 'num_updated_rows').collect()[0][0]}")
>>>>
>>>> # COMMAND ----------
>>>>
>>>> # DBTITLE 1,Insert new records as type2 value changed and first time
>>>> data arrival
>>>> new_records_insertion = spark.sql(f"""INSERT INTO {catalog_name}.{
>>>> db_name}.{table_name} (
>>>>     col1()
>>>>     values(
>>>>     col1 )
>>>> FROM temp_inserts_new_records_{db_name}_{table_name}
>>>> """)
>>>> print(log_message=f"Total no of new records inserted: {
>>>> new_records_insertion.select('num_inserted_rows').collect()[0][0]}")
>>>>
>>>> # COMMAND ----------
>>>>
>>>> source_df.unpersist()
>>>> source_with_target_df.unpersist()
>>>>
>>>

Reply via email to