Hi *Ángel*,

Thank you for checking on this. I’ll review the points you mentioned and
get back to you with an update.

Hi *Mich*,
Looping you in here — could you please assist in reviewing this issue and
share your inputs or suggestions? Your expertise would be really helpful in
resolving it.

Thanks in advance.


On Mon, Aug 11, 2025 at 2:02 AM Ángel Álvarez Pascua <
angel.alvarez.pas...@gmail.com> wrote:

> Have you tried disabling AQE?
>
>
> El dom, 10 ago 2025, 20:48, Karthick N <kcekarth...@gmail.com> escribió:
>
>> Hi Team,
>>
>> I’m facing an issue with the execution order in the PySpark code snippet
>> below. I’m not certain whether it’s caused by lazy evaluation, Spark plan
>> optimization, or something else.
>>
>> *Issue:*
>> For the same data and scenario, during some runs, one of the final views
>> is not returning any data. This appears to be due to changes in the
>> execution order, which in turn affects the final result. In the final
>> steps, we have three different DataFrames derived from the same base
>> DataFrame, and I’m not sure if this could be the cause.
>>
>> I tried using the persist option to hold intermediate results and avoid
>> potential lazy evaluation issues, but the problem still persists.
>>
>> Could you please review this issue and suggest a solution to ensure
>> consistent execution order and results?
>>
>> *Note:* Please let me know if you need any clarification or additional
>> information on this.
>>
>> Code:
>> source_df = spark.sql(f"""
>>                       SELECT * FROM {catalog_name}.{db_name}.{table_name}
>> """)  # Sample source query
>>
>> data_df = source_df.persist()
>>
>> # COMMAND ----------
>>
>> type2_columns = [
>> ]
>> data_df = updateRowHashValues(data_df, type2_columns, primary_key)
>>
>> # COMMAND ----------
>>
>> target_df = spark.table(f"{catalog_name}.{db_name}.{table_name}"
>> ).filter(col("IsCurrent") == True)
>> target_col_list = target_df.columns
>> source_with_target_df = data_df.alias("src").join(target_df.alias("tgt"),
>> on="PatientId", how="left")
>> joined_df = source_with_target_df.persist()
>> filtered_joined_df = joined_df.filter(col("tgt.DimTypeIIKey"
>> ).isNull()).select([col("src." + c).alias(c) for c in data_df
>> .columns]).drop("SourceTimeStamp")
>>
>> new_records_df = filtered_joined_df.filter((lower(col("ModifiedBy"
>> )).contains("migration")) | (lower(col("ModifiedBy")).contains("migrated"))
>> | (lower(col("CreatedBy")).contains("migration")) | (lower(col(
>> "CreatedBy")).contains("migrated")))
>>
>> new_records_source_df = filtered_joined_df.alias("jdf").join(
>> new_records_df.alias("nrd"),col("jdf.PatientId") == col("nrd.PatientId"),
>> how="left_anti").select([col("jdf." + c).alias(c) for c in
>> filtered_joined_df.columns]).drop('SourceTimeStamp')
>>
>> # COMMAND ----------
>>
>> if is_check_banfield_data and (not new_records_df.isEmpty()):    #This
>> is_check_banfield_data may get change based on the environment
>>     patient_df = spark.table(f"{banfield_catalog_name}.bfdw.patient"
>> ).selectExpr("patientid as patient_patientid", "createdate as
>> patient_createdate", "changedate as patient_changedate", "fw_modifiedts
>> as patient_fw_modifiedts").withColumn('patient_patientid', upper(
>> 'patient_patientid'))
>>     banfield_patient_df = patient_df.persist()
>>     window_spec = Window.partitionBy("patient_patientid").orderBy(col(
>> "patient_changedate").desc(),col("patient_fw_modifiedts").desc())
>>     banfield_patient_df = banfield_patient_df.withColumn("row_num",
>> row_number().over(window_spec))
>>     banfield_patient_df = banfield_patient_df.filter(col("row_num") == 1
>> ).drop("row_num")
>>     new_records_df = new_records_df.alias("new").join(banfield_patient_df
>> .alias("pat"),col("new.PatientId") == col("pat.patient_patientid"),how=
>> "left")
>>     new_records_df = new_records_df.withColumn(
>> "BusinessEffectiveStartDate",coalesce(col("pat.patient_createdate"), col(
>> "BusinessEffectiveStartDate"))).select("new.*",
>> "BusinessEffectiveStartDate")
>>     incoming_new_df = new_records_df.persist()
>> else:
>>     is_check_banfield_data = False
>>
>> # COMMAND ----------
>>
>> type2_changes = joined_df.filter((col("src.RowHashType2") != col(
>> "tgt.RowHashType2")) & col("tgt.DimTypeIIKey").isNotNull()).select(
>> "src.*")
>> type1_changes = joined_df.filter((col("src.RowHashType2") == col(
>> "tgt.RowHashType2")) & (col("src.RowHashType1") != col("tgt.RowHashType1"
>> ))).select("src.*")
>> expired_records = joined_df.filter((col("src.RowHashType2") != col(
>> "tgt.RowHashType2")) & col("tgt.DimTypeIIKey").isNotNull() ).select(col(
>> "tgt.*"), col("src.BusinessEffectiveStartDate").alias(
>> "NewBusinessEffectiveEndDate")).withColumn("BusinessEffectiveEndDate",
>> col("NewBusinessEffectiveEndDate")).withColumn("IsCurrent", lit(False
>> )).drop("NewBusinessEffectiveEndDate")
>>
>> # COMMAND ----------
>>
>> max_key = spark.table(f"{catalog_name}.{db_name}.{table_name}"
>> ).agg(spark_max(surrogate_key)).collect()[0][0] or 0
>> starting_key = max_key + 1
>> target_col_list = list(set(target_col_list) - {"DwCreatedYear",
>> "DwCreatedMonth", "DwCreatedDay", "IsDataMigrated", "IsCurrent"} - set
>> (surrogate_key.split(',')))
>> type2_changes = type2_changes.select(target_col_list)
>> if is_check_banfield_data:
>>     type2_changes = type2_changes.unionByName(incoming_new_df)
>>     patient_df.unpersist()
>>     new_records_df.unpersist()
>> type2_changes = type2_changes.unionByName(new_records_source_df)
>> type2_changes = type2_changes.withColumn("IsDataMigrated",
>> when(lower(col("ModifiedBy")).contains("migration") | lower(col(
>> "ModifiedBy")).contains("migrated") | lower(col("CreatedBy")).contains(
>> "migration") | lower(col("CreatedBy")).contains("migrated"),True
>> ).otherwise(False))
>> type2_changes = type2_changes.withColumn("BusinessEffectiveEndDate", lit(
>> "9999-12-31").cast("date")).withColumn("IsCurrent", 
>> lit(True)).withColumn(surrogate_key,
>> row_number().over(Window.orderBy("PatientId")) + starting_key - 1)
>> type1_updates_columns = list(set(type1_changes.columns) - set(
>> type2_columns))
>> type1_updates = type1_changes.select(*type1_updates_columns)
>> expired_records.createOrReplaceTempView(f"temp_updates_type2_expired_{
>> db_name}_{table_name}")   # This are the three final temp views that
>> will be used in the merge statements or inserts. In some run for one of the
>> views we don't getting data.
>> type2_changes.createOrReplaceTempView(f"temp_inserts_new_records_{db_name
>> }_{table_name}")
>> type1_updates.createOrReplaceTempView(f"temp_updates_type1_{db_name}_{
>> table_name}")
>>
>> # COMMAND ----------
>>
>> # DBTITLE 1,Type1 column changes update
>> existing_records_update = spark.sql(f"""MERGE INTO {catalog_name}.{
>> db_name}.{table_name} AS tgt
>> USING temp_updates_type1_{db_name}_{table_name} AS src
>> ON tgt.PatientId = src.PatientId AND tgt.IsCurrent = true
>> WHEN MATCHED THEN UPDATE SET
>>     tgt.col1 = src.col1,
>>     tgt.col2 = src.col2,
>>     tgt.col3 = src.col3,
>>     tgt.col4 = src.col4,
>>     .
>>     .
>>     .
>>     .
>>     tgt.RowHashType1 = src.RowHashType1""")
>> print(f"Total no of records updated due to Type1 columns update: {
>> existing_records_update.select('num_updated_rows').collect()[0][0]}")
>>
>> # COMMAND ----------
>>
>> # DBTITLE 1,Update Expired Record
>> update_expired_record = spark.sql(f"""MERGE INTO {catalog_name}.{db_name}
>> .{table_name} AS tgt
>> USING temp_updates_type2_expired_{db_name}_{table_name} AS src
>> ON tgt.PatientId = src.PatientId AND tgt.IsCurrent = true
>> WHEN MATCHED THEN UPDATE SET
>>     tgt.IsCurrent = false,
>>     tgt.BusinessEffectiveEndDate = src.BusinessEffectiveEndDate,
>>     tgt.DwModifiedts = src.DwModifiedts,
>>     tgt.DwCreatedYear = year(src.DwModifiedts),
>>     tgt.DwCreatedMonth = month(src.DwModifiedts),
>>     tgt.DwCreatedDay = day(src.DwModifiedts)""")
>> print(log_message=f"Total no of records marked IsCurrent as False due to
>> type2 columns update: {update_expired_record.select('num_updated_rows'
>> ).collect()[0][0]}")
>>
>> # COMMAND ----------
>>
>> # DBTITLE 1,Insert new records as type2 value changed and first time data
>> arrival
>> new_records_insertion = spark.sql(f"""INSERT INTO {catalog_name}.{db_name
>> }.{table_name} (
>>     col1()
>>     values(
>>     col1 )
>> FROM temp_inserts_new_records_{db_name}_{table_name}
>> """)
>> print(log_message=f"Total no of new records inserted: {
>> new_records_insertion.select('num_inserted_rows').collect()[0][0]}")
>>
>> # COMMAND ----------
>>
>> source_df.unpersist()
>> source_with_target_df.unpersist()
>>
>

Reply via email to