Hi, I am currently working on an app using PySpark to produce an insert and update daily delta capture, being outputted as Parquet. This is running on a 8 core 32 GB Linux server in standalone mode (set to 6 worker cores of 2GB memory each) running Spark 2.4.3.
This is being achieved by reading in data from a TSQL database, into a dataframe, which has a hash of all records appended to it and comparing it to a dataframe from yesterdays data (which has been saved also as parquet). As part of the monitoring and logging, I am trying to count the number of records for the respective actions. Example code: df_source = spark_session.read.format('jdbc')..... df_reference = sql_context.read.parquet('/path/to/reference.parquet') df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('', *df_source.columns))) \ .cache() df_inserts = df_source_hashed.join(df_reference, pk_list, how='left_anti') \ .select(lit('Insert').alias('_action'), *df_source_hashed) \ .dropDuplicates() \ .cache() inserts_count = df_inserts.count() df_updates = df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list, how="inner") \ .select(lit('Update').alias('_action'), *df_source_hashed) \ .where(col('a.hashkey') != col('b.hashkey')) \ .dropDuplicates() \ .cache() updates_count = df_updates.count() df_output = df_inserts.union(df_updates) df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet') The above code is running two occurrences concurrently via Python threading.Thread (this is to try and overcome the network bottle neck connecting to the database server). What I am finding is I am getting some very inconsistent behavior with the counts. Occasionally, it appears that it will freeze up on a count operation for a few minutes and quite often that specific data frame will have zero records in it. According to the DAG (which I am not 100% sure how to read) the following is the processing flow: Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0 => WholeStageCodegen/MapPartitionsRDD [75]count at NativeMethodAccessorImpl.java:0 => InMemoryTableScan/MapPartitionsRDD [78]count at NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count at NativeMethodAccessorImpl.java:0 => WholeStageCodegen/MapPartitionsRDD [80]count at NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD [81]count at NativeMethodAccessorImpl.java:0 The other observation I have found that if I remove the counts from the data frame operations and instead open the outputted parquet field and count using a `sql_context.read.load('/path/to/output.parquet').filter(col("_action") == "Insert").count()` command, I am reducing my run-times by around 20 to 30%. In my feeble mind, opening up the outputs and re-reading them seems counter-intuitive. Is anyone able to give me some guidance on why or how to ensure that I am doing the above as efficiently as possible? Best Regards Ashley