Thanks David,

I did experiment with the .cache() keyword and have to admit I didn't see
any marked improvement on the sample that I was running, so yes I am a bit
apprehensive including it (not even sure why I actually left it in).

When you say "do the count as the final step", are you referring to getting
the counts of the individual data frames, or from the already outputted
parquet?

Thanks and I appreciate your reply

On Thu, Feb 13, 2020 at 4:15 PM David Edwards <edwardsdj...@googlemail.com>
wrote:

> Hi Ashley,
>
> I'm not an expert but think this is because spark does lazy execution and
> doesn't actually perform any actions until you do some kind of write, count
> or other operation on the dataframe.
>
> If you remove the count steps it will work out a more efficient execution
> plan reducing the number of task steps.
>
> if you can do the count as a final step I would do that. I think you may
> also not need the .cache() statements and you might want to experiment
> reducing the number spark.sql.shuffle.partitions too.
>
> Thanks
> Dave
>
>
>
>
>
>
>
>
> On Thu, 13 Feb 2020, 04:09 Ashley Hoff, <ash.j.h...@gmail.com> wrote:
>
>> Hi,
>>
>> I am currently working on an app using PySpark to produce an insert and
>> update daily delta capture, being outputted as Parquet.  This is running on
>> a 8 core 32 GB Linux server in standalone mode (set to 6 worker cores of
>> 2GB memory each) running Spark 2.4.3.
>>
>> This is being achieved by reading in data from a TSQL database, into a
>> dataframe, which has a hash of all records appended to it and comparing it
>> to a dataframe from yesterdays data (which has been saved also as
>> parquet).
>>
>> As part of the monitoring and logging, I am trying to count the number of
>> records for the respective actions.  Example code:
>>
>> df_source = spark_session.read.format('jdbc').....
>> df_reference = sql_context.read.parquet('/path/to/reference.parquet')
>>
>> df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('', 
>> *df_source.columns))) \
>>             .cache()
>>
>> df_inserts = df_source_hashed.join(df_reference, pk_list, how='left_anti') \
>>                     .select(lit('Insert').alias('_action'), 
>> *df_source_hashed) \
>>                     .dropDuplicates() \
>>                     .cache()
>> inserts_count = df_inserts.count()
>>
>> df_updates = df_source_hashed.alias('a').join(df_reference.alias('b'), 
>> pk_list, how="inner") \
>>                         .select(lit('Update').alias('_action'), 
>> *df_source_hashed) \
>>                         .where(col('a.hashkey') != col('b.hashkey')) \
>>                         .dropDuplicates() \
>>                         .cache()
>> updates_count = df_updates.count()
>>
>> df_output = df_inserts.union(df_updates)
>>
>> df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')
>>
>> The above code is running two occurrences concurrently via Python
>> threading.Thread (this is to try and overcome the network bottle neck
>> connecting to the database server).
>>
>> What I am finding is I am getting some very inconsistent behavior with
>> the counts.  Occasionally, it appears that it will freeze up on a count
>> operation for a few minutes and quite often that specific data frame will
>> have zero records in it.  According to the DAG (which I am not 100% sure
>> how to read) the following is the processing flow:
>>
>> Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0  =>
>> WholeStageCodegen/MapPartitionsRDD [75]count at
>> NativeMethodAccessorImpl.java:0  => InMemoryTableScan/MapPartitionsRDD
>> [78]count at NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count
>> at NativeMethodAccessorImpl.java:0 => WholeStageCodegen/MapPartitionsRDD
>> [80]count at NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD
>> [81]count at NativeMethodAccessorImpl.java:0
>>
>> The other observation I have found that if I remove the counts from the
>> data frame operations and instead open the outputted parquet field and
>> count using a
>> `sql_context.read.load('/path/to/output.parquet').filter(col("_action") ==
>> "Insert").count()` command, I am reducing my run-times by around 20 to
>> 30%.  In my feeble mind, opening up the outputs and re-reading them seems
>> counter-intuitive.
>>
>> Is anyone able to give me some guidance on why or how to ensure that I am
>> doing the above as efficiently as possible?
>>
>> Best Regards
>> Ashley
>>
>

-- 
Kustoms On Silver <https://www.facebook.com/KustomsOnSilver>

Reply via email to