Hi,

I am currently working on an app using PySpark to produce an insert and
update daily delta capture, being outputted as Parquet.  This is running on
a 8 core 32 GB Linux server in standalone mode (set to 6 worker cores of
2GB memory each) running Spark 2.4.3.

This is being achieved by reading in data from a TSQL database, into a
dataframe, which has a hash of all records appended to it and comparing it
to a dataframe from yesterdays data (which has been saved also as
parquet).

As part of the monitoring and logging, I am trying to count the number of
records for the respective actions.  Example code:

df_source = spark_session.read.format('jdbc').....
df_reference = sql_context.read.parquet('/path/to/reference.parquet')

df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('',
*df_source.columns))) \
            .cache()

df_inserts = df_source_hashed.join(df_reference, pk_list, how='left_anti') \
                    .select(lit('Insert').alias('_action'), *df_source_hashed) \
                    .dropDuplicates() \
                    .cache()
inserts_count = df_inserts.count()

df_updates = df_source_hashed.alias('a').join(df_reference.alias('b'),
pk_list, how="inner") \
                        .select(lit('Update').alias('_action'),
*df_source_hashed) \
                        .where(col('a.hashkey') != col('b.hashkey')) \
                        .dropDuplicates() \
                        .cache()
updates_count = df_updates.count()

df_output = df_inserts.union(df_updates)

df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')

The above code is running two occurrences concurrently via Python
threading.Thread (this is to try and overcome the network bottle neck
connecting to the database server).

What I am finding is I am getting some very inconsistent behavior with the
counts.  Occasionally, it appears that it will freeze up on a count
operation for a few minutes and quite often that specific data frame will
have zero records in it.  According to the DAG (which I am not 100% sure
how to read) the following is the processing flow:

Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0  =>
WholeStageCodegen/MapPartitionsRDD [75]count at
NativeMethodAccessorImpl.java:0  => InMemoryTableScan/MapPartitionsRDD
[78]count at NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count
at NativeMethodAccessorImpl.java:0 => WholeStageCodegen/MapPartitionsRDD
[80]count at NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD
[81]count at NativeMethodAccessorImpl.java:0

The other observation I have found that if I remove the counts from the
data frame operations and instead open the outputted parquet field and
count using a
`sql_context.read.load('/path/to/output.parquet').filter(col("_action") ==
"Insert").count()` command, I am reducing my run-times by around 20 to
30%.  In my feeble mind, opening up the outputs and re-reading them seems
counter-intuitive.

Is anyone able to give me some guidance on why or how to ensure that I am
doing the above as efficiently as possible?

Best Regards
Ashley

Reply via email to