You have got two separate things going on:

You did materialize

but

your unionByName is failing because the two DataFrames don’t actually have
the same column set/schema at the moment of the union, even if
printSchema() you looked at earlier seemed to match.
Why unionByName is complaining (common culprits in your snippet)


   - You change type2_changes with:
   -

   target_col_list = list(set(target_col_list) - {...} -
set(surrogate_key.split(',')))
   type2_changes = type2_changes.select(target_col_list)

   Using a set here can drop columns unpredictably (orderless; easy to miss
   a field). Also, target_col_list comes from the target schema, but
   type2_changes/source_new_non_migration_data_df come from the source schema.
   Those aren’t guaranteed to be identical.
   -

   You never re-project source_new_non_migration_data_df to target_col_list
   before the union. So the two frames can have different column sets.
   -

   If the is_check_banfield_data branch ever runs, this line:

   .select("new.*", "BusinessEffectiveStartDate")

   duplicates BusinessEffectiveStartDate (it’s already in new.*). That
   produces duplicate column names → unions/merges will blow up later.

- Quick, robust fix (align schemas explicitly before union)

Define the exact columns you plan to insert/update ONCE, then project both
DataFrames to that list in the same order:

from pyspark.sql import functions as F
# 1) Pick a canonical column list (don’t use set(), keep order
deterministic)
drop_cols =
{"DwCreatedYear","DwCreatedMonth","DwCreatedDay","IsDataMigrated","IsCurrent"}
| set(surrogate_key.split(','))
# Base this on the *source* if you’re inserting source-shaped rows:
insert_cols = [c for c in data_df.columns if c not in drop_cols]

# 2) Re-project BOTH sides to the same ordered list
type2_changes = type2_changes.select(*insert_cols)
source_new_non_migration_data_df =
source_new_non_migration_data_df.select(*insert_cols)

# (Optional but recommended) Cast to identical types before union
src_schema = {f.name: f.dataType for f in data_df.schema if f.name in
insert_cols}

def align_types(df, cols, schema_map):
    exprs = [
        (F.col(c).cast(schema_map[c]).alias(c) if c in df.columns
         else F.lit(None).cast(schema_map[c]).alias(c))
        for c in cols
    ]
    return df.select(*exprs)

type2_changes = align_types(type2_changes, insert_cols, src_schema)
source_new_non_migration_data_df =
align_types(source_new_non_migration_data_df, insert_cols, src_schema)

# 3) Now the unionByName will work
type2_changes = type2_changes.unionByName(source_new_non_migration_data_df)

Bottom line: your union error isn’t lazy-eval anymore — it’s a schema
alignment issue. Pick the canonical column list, project both DFs to it
(and cast types), then union.

HTH

Dr Mich Talebzadeh,
Architect | Data Science | Financial Crime | Forensic Analysis | GDPR

   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>





On Wed, 13 Aug 2025 at 15:54, Karthick N <kcekarth...@gmail.com> wrote:

> Thanks for support.
>
> I tried using action(count()) after persist to materialize the DataFrame
> results. However, I’m still facing a column count mismatch issue when
> performing unionByName. I have verified the column count and names in
> both DataFrames using printSchema, and it shows that both DataFrames have
> the same columns. Still, I’m not sure if there’s an underlying issue in the
> code. I have highlighted the section where I’m encountering the problem.
>
> Note: Here source(*data_df *), we are not getting data from the same
> table f"{catalog_name}.{db_name}.{table_name}, it is from a different table.
>
> Could you please take a look into the issue.
>
> if source_data_count > 0:
>     data_df = updateRowHashValues(data_df, type2_columns, primary_key)
>     data_df = data_df.drop("SourceTimeStamp")
>     target_df = spark.table(f"{catalog_name}.{db_name}.{table_name}")
> .filter(col("IsCurrent") == True) # here, we are taking data from target
> table
>     target_col_list = target_df.columns
>     source_with_target_df = data_df.alias("src").join(target_df.alias(
> "tgt"), on="PatientId", how="left")
>     joined_df = source_with_target_df.persist()
>     print(f"Joined source new data with active target data count: {
> joined_df.count()}")
>     source_new_data_df = joined_df.filter(col("tgt.DimPatientTypeIIKey").
> isNull()).select([col("src." + c).alias(c) for c in data_df.columns])
>     source_new_migration_data_df = source_new_data_df.filter((lower(col("
> ModifiedBy")).contains("migration")) | (lower(col("ModifiedBy")).contains(
> "migrated")) | (lower(col("CreatedBy")).contains("migration")) | (lower(
> col("CreatedBy")).contains("migrated")))
>     source_new_non_migration_data_df = source_new_data_df.alias("jdf").
> join(source_new_migration_data_df.alias("nrd"),col("jdf.PatientId") == col
> ("nrd.PatientId"),how="left_anti").select([col("jdf." + c).alias(c) for c
> in source_new_data_df.columns])
>
>     if is_check_banfield_data and (not source_new_migration_data_df.
> isEmpty()):
>         patient_df = spark.table(f"{banfield_catalog_name}.bfdw.patient").
> selectExpr("patientid as patient_patientid", "createdate as
> patient_createdate", "changedate as patient_changedate", "fw_modifiedts
> as patient_fw_modifiedts").withColumn('patient_patientid', upper(
> 'patient_patientid'))
>         banfield_patient_df = patient_df.persist()
>         window_spec = Window.partitionBy("patient_patientid").orderBy(col(
> "patient_changedate").desc(),col("patient_fw_modifiedts").desc())
>         banfield_patient_df = banfield_patient_df.withColumn("row_num",
> row_number().over(window_spec))
>         banfield_patient_df = banfield_patient_df.filter(col("row_num") ==
> 1).drop("row_num")
>         source_new_migration_data_df = source_new_migration_data_df.alias(
> "new").join(banfield_patient_df.alias("pat"),col("new.PatientId") == col(
> "pat.patient_patientid"),how="left")
>         source_new_migration_data_df = source_new_migration_data_df.
> withColumn("BusinessEffectiveStartDate",least(col("pat.patient_createdate"),
> col("BusinessEffectiveStartDate"))).select("new.*",
> "BusinessEffectiveStartDate")
>         incomming_new_migration_data_df = source_new_migration_data_df.
> persist()
>     else:
>         is_check_banfield_data = False
>
>     type2_changes = joined_df.filter((col("src.RowHashType2") != col(
> "tgt.RowHashType2")) & col("tgt.DimPatientTypeIIKey").isNotNull()).select(
> "src.*")
>     type1_changes = joined_df.filter((col("src.RowHashType2") == col(
> "tgt.RowHashType2")) & (col("src.RowHashType1") != col("tgt.RowHashType1"
> ))).select("src.*")
>     expired_records = joined_df.filter((col("src.RowHashType2") != col(
> "tgt.RowHashType2")) & col("tgt.DimPatientTypeIIKey").isNotNull() ).select
> (col("tgt.*"), col("src.BusinessEffectiveStartDate").alias(
> "NewBusinessEffectiveEndDate")).withColumn("BusinessEffectiveEndDate", col
> ("NewBusinessEffectiveEndDate")).withColumn("IsCurrent", lit(False)).drop(
> "NewBusinessEffectiveEndDate")
>
>     max_key = spark.table(f"{catalog_name}.{db_name}.{table_name}").agg(
> spark_max(surrogate_key)).collect()[0][0] or 0
>     starting_key = max_key + 1
>     target_col_list = list(set(target_col_list) - {"DwCreatedYear",
> "DwCreatedMonth", "DwCreatedDay", "IsDataMigrated", "IsCurrent"} - set
> (surrogate_key.split(',')))
>     print(f"type2_changes schema")
>     print(f"column count:{len(type2_changes.columns)}")
>     type2_changes.printSchema()# in this steps I am getting same column
> count and details
>     # print(f"incomming_new_migration_data_df schema")
>     # incomming_new_migration_data_df.printSchema()
>     type2_changes = type2_changes.select(target_col_list)
>     if is_check_banfield_data: # For my test cases, it is False with
> release
>         type2_changes = type2_changes.unionByName
> (incomming_new_migration_data_df)
>         patient_df.unpersist()
>         source_new_migration_data_df.unpersist()
>     print(f"source_new_non_migration_data_df schema")
>     print(f"column count:{len(source_new_non_migration_data_df.columns)}")
>     source_new_non_migration_data_df.printSchema() # in this steps I am
> getting same column count and details
>     type2_changes_records_count = type2_changes.count()
>     type2_changes = 
> type2_changes.unionByName(source_new_non_migration_data_df)
> # I am facing column mismatch issue in this steps
>     type2_changes = type2_changes.withColumn("IsDataMigrated", when(lower(
> col("ModifiedBy")).contains("migration") | lower(col("ModifiedBy")).
> contains("migrated") | lower(col("CreatedBy")).contains("migration") |
> lower(col("CreatedBy")).contains("migrated"),True).otherwise(False))
>     type2_changes = type2_changes.withColumn("BusinessEffectiveEndDate",
> lit("9999-12-31").cast("date")).withColumn("IsCurrent", lit(True)).
> withColumn(surrogate_key, row_number().over(Window.orderBy("PatientId")) +
> starting_key - 1)
>     type1_updates_columns = list(set(type1_changes.columns) - set
> (type2_columns))
>     type1_updates = type1_changes.select(*type1_updates_columns)
>     expired_records.createOrReplaceTempView(f"temp_updates_type2_expired_{
> db_name}_{table_name}")
>     type2_changes.createOrReplaceTempView(f"temp_inserts_new_records_{
> db_name}_{table_name}")
>     type1_updates.createOrReplaceTempView(f"temp_updates_type1_{db_name}_{
> table_name}")
>     expired_records_count = expired_records.count()
>     type1_updates_records_count = type1_updates.count()
>     print(log_message=f"{catalog_name}.{db_name}.{table_name} — "f"Existing
> records expired count: {expired_records_count}, "f"New/changed Type 2
> records count: {type2_changes_records_count}, "f"Type 1 updates records
> count: {type1_updates_records_count}")
>
> Thanks
>
>
> On Mon, Aug 11, 2025 at 9:42 PM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Hi Karthick,
>>
>> The problem seems to be that you were performing
>> transformation/recipe on three data frames without materialisation, then
>> writing back to that target table. Each MERGE re-evaluated its “recipe” at
>> a different time, so they saw different snapshots → flaky/empty results.
>>
>> Fix (short + sweet):
>>
>>    1. Freeze joined_df and each final DF *before any writes* (e.g., 
>> persist();
>>    count() or localCheckpoint(eager=True)).
>>    2.
>>
>>    Or write the three sources to temp tables first, then MERGE from those
>>
>>
>> This example below is built on Parquet but shows the concept
>>
>>
>> # 0)  Start Spark & clean a folder
>> from pyspark.sql import SparkSession, functions as F
>> import shutil
>>
>> spark = SparkSession.builder.appName("FreezeDemo-Parquet").getOrCreate()
>> spark.sparkContext.setLogLevel("ERROR")
>>
>> path = "/tmp/demo_table_parquet"
>> shutil.rmtree(path, ignore_errors=True)
>>
>> # 1) Seed data
>> spark.createDataFrame([(1,"A"), (2,"B")], "id INT, val STRING") \
>>      .write.mode("overwrite").parquet(path)
>>
>> # 2) Build DF BEFORE first append (will NOT see id=3 because Parquet path
>> listing was captured)
>> df1 = spark.read.parquet(path).filter(F.col("id") > 0)
>> print("== Old DF (built before append) — no id=3 ==")
>> df1.show()
>>
>> # 3) Append id=3 and build a NEW DF (this one sees 1,2,3)
>> spark.createDataFrame([(3,"C")], "id INT, val STRING") \
>>      .write.mode("append").parquet(path)
>> df2 = spark.read.parquet(path).filter(F.col("id") > 0)
>> print("== New DF (after append) — has id=3 ==")
>> df2.show()
>>
>> # 4) HARD FREEZE df2 BEFORE any further appends
>> spark.sparkContext.setCheckpointDir("/tmp/spark_checkpoints")  # any
>> writable path
>> df2_frozen = df2.localCheckpoint(eager=True)
>>
>> # 5) Now append id=4 AFTER freezing
>> spark.createDataFrame([(4,"D")], "id INT, val STRING") \
>>      .write.mode("append").parquet(path)
>>
>> print("== Frozen snapshot (should NOT include id=4) ==")
>> df2_frozen.show()   # expect only 1,2,3
>>
>> # Optional: prove live data now has 1,2,3,4
>> print("== Fresh read (should include id=4) ==")
>> spark.read.parquet(path).orderBy("id").show()
>>
>> Output ( Running Spark version 3.5.5)
>>
>> == Old DF (built before append) — no id=3 ==
>> +---+---+
>> | id|val|
>> +---+---+
>> |  1|  A|
>> |  2|  B|
>> +---+---+
>>
>> == New DF (after append) — has id=3 ==
>> +---+---+
>> | id|val|
>> +---+---+
>> |  1|  A|
>> |  3|  C|
>> |  2|  B|
>> +---+---+
>>
>> == Frozen snapshot (should NOT include id=4) ==
>> +---+---+
>> | id|val|
>> +---+---+
>> |  1|  A|
>> |  3|  C|
>> |  2|  B|
>> +---+---+
>>
>> == Fresh read (should include id=4) ==
>> +---+---+
>> | id|val|
>> +---+---+
>> |  1|  A|
>> |  2|  B|
>> |  3|  C|
>> |  4|  D|
>> +---+---+
>>
>> HTH
>>
>> Dr Mich Talebzadeh,
>> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>>
>>
>> On Mon, 11 Aug 2025 at 08:12, Karthick N <kcekarth...@gmail.com> wrote:
>>
>>> Hi *Ángel*,
>>>
>>> Thank you for checking on this. I’ll review the points you mentioned and
>>> get back to you with an update.
>>>
>>> Hi *Mich*,
>>> Looping you in here — could you please assist in reviewing this issue
>>> and share your inputs or suggestions? Your expertise would be really
>>> helpful in resolving it.
>>>
>>> Thanks in advance.
>>>
>>>
>>> On Mon, Aug 11, 2025 at 2:02 AM Ángel Álvarez Pascua <
>>> angel.alvarez.pas...@gmail.com> wrote:
>>>
>>>> Have you tried disabling AQE?
>>>>
>>>>
>>>> El dom, 10 ago 2025, 20:48, Karthick N <kcekarth...@gmail.com>
>>>> escribió:
>>>>
>>>>> Hi Team,
>>>>>
>>>>> I’m facing an issue with the execution order in the PySpark code
>>>>> snippet below. I’m not certain whether it’s caused by lazy evaluation,
>>>>> Spark plan optimization, or something else.
>>>>>
>>>>> *Issue:*
>>>>> For the same data and scenario, during some runs, one of the final
>>>>> views is not returning any data. This appears to be due to changes in the
>>>>> execution order, which in turn affects the final result. In the final
>>>>> steps, we have three different DataFrames derived from the same base
>>>>> DataFrame, and I’m not sure if this could be the cause.
>>>>>
>>>>> I tried using the persist option to hold intermediate results and
>>>>> avoid potential lazy evaluation issues, but the problem still persists.
>>>>>
>>>>> Could you please review this issue and suggest a solution to ensure
>>>>> consistent execution order and results?
>>>>>
>>>>> *Note:* Please let me know if you need any clarification or
>>>>> additional information on this.
>>>>>
>>>>> Code:
>>>>> source_df = spark.sql(f"""
>>>>>                       SELECT * FROM {catalog_name}.{db_name}.{
>>>>> table_name}""")  # Sample source query
>>>>>
>>>>> data_df = source_df.persist()
>>>>>
>>>>> # COMMAND ----------
>>>>>
>>>>> type2_columns = [
>>>>> ]
>>>>> data_df = updateRowHashValues(data_df, type2_columns, primary_key)
>>>>>
>>>>> # COMMAND ----------
>>>>>
>>>>> target_df = spark.table(f"{catalog_name}.{db_name}.{table_name}"
>>>>> ).filter(col("IsCurrent") == True)
>>>>> target_col_list = target_df.columns
>>>>> source_with_target_df = data_df.alias("src").join(target_df.alias(
>>>>> "tgt"), on="PatientId", how="left")
>>>>> joined_df = source_with_target_df.persist()
>>>>> filtered_joined_df = joined_df.filter(col("tgt.DimTypeIIKey"
>>>>> ).isNull()).select([col("src." + c).alias(c) for c in data_df
>>>>> .columns]).drop("SourceTimeStamp")
>>>>>
>>>>> new_records_df = filtered_joined_df.filter((lower(col("ModifiedBy"
>>>>> )).contains("migration")) | (lower(col("ModifiedBy")).contains(
>>>>> "migrated")) | (lower(col("CreatedBy")).contains("migration")) |
>>>>> (lower(col("CreatedBy")).contains("migrated")))
>>>>>
>>>>> new_records_source_df = filtered_joined_df.alias("jdf").join(
>>>>> new_records_df.alias("nrd"),col("jdf.PatientId") == col(
>>>>> "nrd.PatientId"),how="left_anti").select([col("jdf." + c).alias(c) for
>>>>> c in filtered_joined_df.columns]).drop('SourceTimeStamp')
>>>>>
>>>>> # COMMAND ----------
>>>>>
>>>>> if is_check_banfield_data and (not new_records_df.isEmpty()):    #This
>>>>> is_check_banfield_data may get change based on the environment
>>>>>     patient_df = spark.table(f"{banfield_catalog_name}.bfdw.patient"
>>>>> ).selectExpr("patientid as patient_patientid", "createdate as
>>>>> patient_createdate", "changedate as patient_changedate", "fw_modifiedts
>>>>> as patient_fw_modifiedts").withColumn('patient_patientid', upper(
>>>>> 'patient_patientid'))
>>>>>     banfield_patient_df = patient_df.persist()
>>>>>     window_spec = Window.partitionBy("patient_patientid").orderBy(col(
>>>>> "patient_changedate").desc(),col("patient_fw_modifiedts").desc())
>>>>>     banfield_patient_df = banfield_patient_df.withColumn("row_num",
>>>>> row_number().over(window_spec))
>>>>>     banfield_patient_df = banfield_patient_df.filter(col("row_num")
>>>>> == 1).drop("row_num")
>>>>>     new_records_df = new_records_df.alias("new").join(
>>>>> banfield_patient_df.alias("pat"),col("new.PatientId") == col(
>>>>> "pat.patient_patientid"),how="left")
>>>>>     new_records_df = new_records_df.withColumn(
>>>>> "BusinessEffectiveStartDate",coalesce(col("pat.patient_createdate"),
>>>>> col("BusinessEffectiveStartDate"))).select("new.*",
>>>>> "BusinessEffectiveStartDate")
>>>>>     incoming_new_df = new_records_df.persist()
>>>>> else:
>>>>>     is_check_banfield_data = False
>>>>>
>>>>> # COMMAND ----------
>>>>>
>>>>> type2_changes = joined_df.filter((col("src.RowHashType2") != col(
>>>>> "tgt.RowHashType2")) & col("tgt.DimTypeIIKey").isNotNull()).select(
>>>>> "src.*")
>>>>> type1_changes = joined_df.filter((col("src.RowHashType2") == col(
>>>>> "tgt.RowHashType2")) & (col("src.RowHashType1") != col(
>>>>> "tgt.RowHashType1"))).select("src.*")
>>>>> expired_records = joined_df.filter((col("src.RowHashType2") != col(
>>>>> "tgt.RowHashType2")) & col("tgt.DimTypeIIKey").isNotNull()
>>>>> ).select(col("tgt.*"), col("src.BusinessEffectiveStartDate").alias(
>>>>> "NewBusinessEffectiveEndDate")).withColumn("BusinessEffectiveEndDate",
>>>>> col("NewBusinessEffectiveEndDate")).withColumn("IsCurrent", lit(False
>>>>> )).drop("NewBusinessEffectiveEndDate")
>>>>>
>>>>> # COMMAND ----------
>>>>>
>>>>> max_key = spark.table(f"{catalog_name}.{db_name}.{table_name}"
>>>>> ).agg(spark_max(surrogate_key)).collect()[0][0] or 0
>>>>> starting_key = max_key + 1
>>>>> target_col_list = list(set(target_col_list) - {"DwCreatedYear",
>>>>> "DwCreatedMonth", "DwCreatedDay", "IsDataMigrated", "IsCurrent"} - set
>>>>> (surrogate_key.split(',')))
>>>>> type2_changes = type2_changes.select(target_col_list)
>>>>> if is_check_banfield_data:
>>>>>     type2_changes = type2_changes.unionByName(incoming_new_df)
>>>>>     patient_df.unpersist()
>>>>>     new_records_df.unpersist()
>>>>> type2_changes = type2_changes.unionByName(new_records_source_df)
>>>>> type2_changes = type2_changes.withColumn("IsDataMigrated",
>>>>> when(lower(col("ModifiedBy")).contains("migration") | lower(col(
>>>>> "ModifiedBy")).contains("migrated") | lower(col("CreatedBy"
>>>>> )).contains("migration") | lower(col("CreatedBy")).contains("migrated"
>>>>> ),True).otherwise(False))
>>>>> type2_changes = type2_changes.withColumn("BusinessEffectiveEndDate",
>>>>> lit("9999-12-31").cast("date")).withColumn("IsCurrent", 
>>>>> lit(True)).withColumn(surrogate_key,
>>>>> row_number().over(Window.orderBy("PatientId")) + starting_key - 1)
>>>>> type1_updates_columns = list(set(type1_changes.columns) - set(
>>>>> type2_columns))
>>>>> type1_updates = type1_changes.select(*type1_updates_columns)
>>>>> expired_records.createOrReplaceTempView(f"temp_updates_type2_expired_{
>>>>> db_name}_{table_name}")   # This are the three final temp views that
>>>>> will be used in the merge statements or inserts. In some run for one of 
>>>>> the
>>>>> views we don't getting data.
>>>>> type2_changes.createOrReplaceTempView(f"temp_inserts_new_records_{
>>>>> db_name}_{table_name}")
>>>>> type1_updates.createOrReplaceTempView(f"temp_updates_type1_{db_name}_{
>>>>> table_name}")
>>>>>
>>>>> # COMMAND ----------
>>>>>
>>>>> # DBTITLE 1,Type1 column changes update
>>>>> existing_records_update = spark.sql(f"""MERGE INTO {catalog_name}.{
>>>>> db_name}.{table_name} AS tgt
>>>>> USING temp_updates_type1_{db_name}_{table_name} AS src
>>>>> ON tgt.PatientId = src.PatientId AND tgt.IsCurrent = true
>>>>> WHEN MATCHED THEN UPDATE SET
>>>>>     tgt.col1 = src.col1,
>>>>>     tgt.col2 = src.col2,
>>>>>     tgt.col3 = src.col3,
>>>>>     tgt.col4 = src.col4,
>>>>>     .
>>>>>     .
>>>>>     .
>>>>>     .
>>>>>     tgt.RowHashType1 = src.RowHashType1""")
>>>>> print(f"Total no of records updated due to Type1 columns update: {
>>>>> existing_records_update.select('num_updated_rows').collect()[0][0]}")
>>>>>
>>>>> # COMMAND ----------
>>>>>
>>>>> # DBTITLE 1,Update Expired Record
>>>>> update_expired_record = spark.sql(f"""MERGE INTO {catalog_name}.{
>>>>> db_name}.{table_name} AS tgt
>>>>> USING temp_updates_type2_expired_{db_name}_{table_name} AS src
>>>>> ON tgt.PatientId = src.PatientId AND tgt.IsCurrent = true
>>>>> WHEN MATCHED THEN UPDATE SET
>>>>>     tgt.IsCurrent = false,
>>>>>     tgt.BusinessEffectiveEndDate = src.BusinessEffectiveEndDate,
>>>>>     tgt.DwModifiedts = src.DwModifiedts,
>>>>>     tgt.DwCreatedYear = year(src.DwModifiedts),
>>>>>     tgt.DwCreatedMonth = month(src.DwModifiedts),
>>>>>     tgt.DwCreatedDay = day(src.DwModifiedts)""")
>>>>> print(log_message=f"Total no of records marked IsCurrent as False due
>>>>> to type2 columns update: {update_expired_record.select(
>>>>> 'num_updated_rows').collect()[0][0]}")
>>>>>
>>>>> # COMMAND ----------
>>>>>
>>>>> # DBTITLE 1,Insert new records as type2 value changed and first time
>>>>> data arrival
>>>>> new_records_insertion = spark.sql(f"""INSERT INTO {catalog_name}.{
>>>>> db_name}.{table_name} (
>>>>>     col1()
>>>>>     values(
>>>>>     col1 )
>>>>> FROM temp_inserts_new_records_{db_name}_{table_name}
>>>>> """)
>>>>> print(log_message=f"Total no of new records inserted: {
>>>>> new_records_insertion.select('num_inserted_rows').collect()[0][0]}")
>>>>>
>>>>> # COMMAND ----------
>>>>>
>>>>> source_df.unpersist()
>>>>> source_with_target_df.unpersist()
>>>>>
>>>>

Reply via email to