Re: Performance of PySpark jobs on the Kubernetes cluster

2021-08-11 Thread David Diebold
Hi Mich,

I don't quite understand why the driver node is using so much CPU, but it
may be unrelated to your executors being underused.
About your executors being underused, I would check that your job generated
enough tasks.
Then I would check spark.executor.cores and spark.tasks.cpus parameters to
see if I can give more work to the executors.

Cheers,
David



Le mar. 10 août 2021 à 12:20, Khalid Mammadov  a
écrit :

> Hi Mich
>
> I think you need to check your code.
> If code does not use PySpark API effectively you may get this. I.e. if you
> use pure Python/pandas api rather than Pyspark i.e.
> transform->transform->action. e.g df.select(..).withColumn(...)...count()
>
> Hope this helps to put you on right direction.
>
> Cheers
> Khalid
>
>
>
>
> On Mon, 9 Aug 2021, 20:20 Mich Talebzadeh, 
> wrote:
>
>> Hi,
>>
>> I have a basic question to ask.
>>
>> I am running a Google k8s cluster (AKA GKE) with three nodes each having
>> configuration below
>>
>> e2-standard-2 (2 vCPUs, 8 GB memory)
>>
>>
>> spark-submit is launched from another node (actually a data proc single
>> node that I have just upgraded to e2-custom (4 vCPUs, 8 GB mem). We call
>> this the launch node
>>
>> OK I know that the cluster is not much but Google was complaining about
>> the launch node hitting 100% cpus. So I added two more cpus to it.
>>
>> It appears that despite using k8s as the computational cluster, the
>> burden falls upon the launch node!
>>
>> The cpu utilisation for launch node shown below
>>
>> [image: image.png]
>> The dip is when 2 more cpus were added to  it so it had to reboot. so
>> around %70 usage
>>
>> The combined cpu usage for GKE nodes is shown below:
>>
>> [image: image.png]
>>
>> Never goes above 20%!
>>
>> I can see that the drive and executors as below:
>>
>> k get pods -n spark
>> NAME READY   STATUSRESTARTS
>>  AGE
>> pytest-c958c97b2c52b6ed-driver   1/1 Running   0
>> 69s
>> randomdatabigquery-e68a8a7b2c52f468-exec-1   1/1 Running   0
>> 51s
>> randomdatabigquery-e68a8a7b2c52f468-exec-2   1/1 Running   0
>> 51s
>> randomdatabigquery-e68a8a7b2c52f468-exec-3   0/1 Pending   0
>> 51s
>>
>> It is a PySpark 3.1.1 image using java 8 and pushing random data
>> generated into Google BigQuery data warehouse. The last executor (exec-3)
>> seems to be just pending. The spark-submit is as below:
>>
>> spark-submit --verbose \
>>--properties-file ${property_file} \
>>--master k8s://https://$KUBERNETES_MASTER_IP:443 \
>>--deploy-mode cluster \
>>--name pytest \
>>--conf
>> spark.yarn.appMasterEnv.PYSPARK_PYTHON=./pyspark_venv/bin/python \
>>--py-files $CODE_DIRECTORY/DSBQ.zip \
>>--conf spark.kubernetes.namespace=$NAMESPACE \
>>--conf spark.executor.memory=5000m \
>>--conf spark.network.timeout=300 \
>>--conf spark.executor.instances=3 \
>>--conf spark.kubernetes.driver.limit.cores=1 \
>>--conf spark.driver.cores=1 \
>>--conf spark.executor.cores=1 \
>>--conf spark.executor.memory=2000m \
>>--conf spark.kubernetes.driver.docker.image=${IMAGEGCP} \
>>--conf spark.kubernetes.executor.docker.image=${IMAGEGCP} \
>>--conf spark.kubernetes.container.image=${IMAGEGCP} \
>>--conf
>> spark.kubernetes.authenticate.driver.serviceAccountName=spark-bq \
>>--conf
>> spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
>>--conf
>> spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
>> \
>>--conf spark.sql.execution.arrow.pyspark.enabled="true" \
>>$CODE_DIRECTORY/${APPLICATION}
>>
>> Aren't the driver and executors running on K8s cluster? So why is the
>> launch node heavily used but k8s cluster is underutilized?
>>
>> Thanks
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>


Trying to hash cross features with mllib

2021-10-01 Thread David Diebold
Hello everyone,

In MLLib, I’m trying to rely essentially on pipelines to create features
out of the Titanic dataset, and show-case the power of feature hashing. I
want to:

-  Apply bucketization on some columns (QuantileDiscretizer is fine)

-  Then I want to cross all my columns with each other to have
cross features.

-  Then I would like to hash all of these cross features into a
vector.

-  Then give it to a logistic regression.

Looking at the documentation, it looks like the only way to hash features
is the *FeatureHasher* transformation. It takes multiple columns as input,
type can be numeric, bool, string (but no vector/array).

But now I’m left wondering how I can create my cross-feature columns. I’m
looking at a transformation that could take two columns as input, and
return a numeric, bool, or string. I didn't manage to find anything that
does the job. There are multiple transformations such as VectorAssembler,
that operate on vector, but this is not a typeaccepted by the FeatureHasher.

Of course, I could try to combine columns directly in my dataframe (before
the pipeline kicks-in), but then I would not be able to benefit any more
from QuantileDiscretizer and other cool functions.


Am I missing something in the transformation api ? Or is my approach to
hashing wrong ? Or should we consider to extend the api somehow ?



Thank you, kind regards,

David


Re: Trying to hash cross features with mllib

2021-10-04 Thread David Diebold
Hello Sean,

Thank you for the heads-up !
Interaction transform won't help for my use case as it returns a vector
that I won't be able to hash.
I will definitely dig further into custom transformations though.

Thanks !
David

Le ven. 1 oct. 2021 à 15:49, Sean Owen  a écrit :

> Are you looking for
> https://spark.apache.org/docs/latest/ml-features.html#interaction ?
> That's the closest built in thing I can think of.  Otherwise you can make
> custom transformations.
>
> On Fri, Oct 1, 2021, 8:44 AM David Diebold 
> wrote:
>
>> Hello everyone,
>>
>> In MLLib, I’m trying to rely essentially on pipelines to create features
>> out of the Titanic dataset, and show-case the power of feature hashing. I
>> want to:
>>
>> -  Apply bucketization on some columns (QuantileDiscretizer is
>> fine)
>>
>> -  Then I want to cross all my columns with each other to have
>> cross features.
>>
>> -  Then I would like to hash all of these cross features into a
>> vector.
>>
>> -  Then give it to a logistic regression.
>>
>> Looking at the documentation, it looks like the only way to hash features
>> is the *FeatureHasher* transformation. It takes multiple columns as
>> input, type can be numeric, bool, string (but no vector/array).
>>
>> But now I’m left wondering how I can create my cross-feature columns. I’m
>> looking at a transformation that could take two columns as input, and
>> return a numeric, bool, or string. I didn't manage to find anything that
>> does the job. There are multiple transformations such as VectorAssembler,
>> that operate on vector, but this is not a typeaccepted by the FeatureHasher.
>>
>> Of course, I could try to combine columns directly in my dataframe
>> (before the pipeline kicks-in), but then I would not be able to benefit any
>> more from QuantileDiscretizer and other cool functions.
>>
>>
>> Am I missing something in the transformation api ? Or is my approach to
>> hashing wrong ? Or should we consider to extend the api somehow ?
>>
>>
>>
>> Thank you, kind regards,
>>
>> David
>>
>


question about data skew and memory issues

2021-12-14 Thread David Diebold
Hello all,

I was wondering if it possible to encounter out of memory exceptions on
spark executors when doing some aggregation, when a dataset is skewed.
Let's say we have a dataset with two columns:
- key : int
- value : float
And I want to aggregate values by key.
Let's say that we have a tons of key equal to 0.

Is it possible to encounter some out of memory exception after the shuffle ?
My expectation would be that the executor responsible of aggregating the
'0' partition could indeed have some oom exception if it tries to put all
the files of this partition in memory before processing them.
But why would it need to put them in memory when doing in aggregation ? It
looks to me that aggregation can be performed in a stream fashion, so I
would not expect any oom at all..

Thank you in advance for your lights :)
David


Re: Pyspark debugging best practices

2022-01-03 Thread David Diebold
Hello Andy,

Are you sure you want to perform lots of join operations, and not simple
unions ?
Are you doing inner joins or outer joins ?
Can you provide us with a rough amount of your list size plus each
individual dataset size ?
Have a look at execution plan would help, maybe the high amount of join
operations makes execution plan too complicated at the end of the day ;
checkpointing could help there ?

Cheers,
David


Le jeu. 30 déc. 2021 à 16:56, Andrew Davidson  a
écrit :

> Hi Gourav
>
> I will give databricks a try.
>
> Each data gets loaded into a data frame.
> I select one column from the data frame
> I join the column to the  accumulated joins from previous data frames in
> the list
>
> To debug. I think am gaining to put an action and log statement after each
> join. I do not think it will change the performance. I believe the physical
> plan will be the same how ever hopefully it will shed some light.
>
> At the very least I will know if it making progress or not. And hopefully
> where it is breaking
>
> Happy new year
>
> Andy
>
> On Tue, Dec 28, 2021 at 4:19 AM Gourav Sengupta 
> wrote:
>
>> Hi Andrew,
>>
>> Any chance you might give Databricks a try in GCP?
>>
>> The above transformations look complicated to me, why are you adding
>> dataframes to a list?
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>>
>> On Sun, Dec 26, 2021 at 7:00 PM Andrew Davidson 
>> wrote:
>>
>>> Hi
>>>
>>>
>>>
>>> I am having trouble debugging my driver. It runs correctly on smaller
>>> data set but fails on large ones.  It is very hard to figure out what the
>>> bug is. I suspect it may have something do with the way spark is installed
>>> and configured. I am using google cloud platform dataproc pyspark
>>>
>>>
>>>
>>> The log messages are not helpful. The error message will be something
>>> like
>>> "User application exited with status 1"
>>>
>>>
>>>
>>> And
>>>
>>>
>>>
>>> jsonPayload: {
>>>
>>> class: "server.TThreadPoolServer"
>>>
>>> filename: "hive-server2.log"
>>>
>>> message: "Error occurred during processing of message."
>>>
>>> thread: "HiveServer2-Handler-Pool: Thread-40"
>>>
>>> }
>>>
>>>
>>>
>>> I am able to access the spark history server however it does not capture
>>> anything if the driver crashes. I am unable to figure out how to access
>>> spark web UI.
>>>
>>>
>>>
>>> My driver program looks something like the pseudo code bellow. A long
>>> list of transforms with a single action, (i.e. write) at the end. Adding
>>> log messages is not helpful because of lazy evaluations. I am tempted to
>>> add something like
>>>
>>>
>>>
>>> Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and
>>> inline some sort of diagnostic message.
>>>
>>>
>>>
>>> What do you think?
>>>
>>>
>>>
>>> Is there a better way to debug this?
>>>
>>>
>>>
>>> Kind regards
>>>
>>>
>>>
>>> Andy
>>>
>>>
>>>
>>> def run():
>>>
>>> listOfDF = []
>>>
>>> for filePath in listOfFiles:
>>>
>>> df = spark.read.load( filePath, ...)
>>>
>>> listOfDF.append(df)
>>>
>>>
>>>
>>>
>>>
>>> list2OfDF = []
>>>
>>> for df in listOfDF:
>>>
>>> df2 = df.select(  )
>>>
>>> lsit2OfDF.append( df2 )
>>>
>>>
>>>
>>> # will setting list to None free cache?
>>>
>>> # or just driver memory
>>>
>>> listOfDF = None
>>>
>>>
>>>
>>>
>>>
>>> df3 = list2OfDF[0]
>>>
>>>
>>>
>>> for i in range( 1, len(list2OfDF) ):
>>>
>>> df = list2OfDF[i]
>>>
>>> df3 = df3.join(df ...)
>>>
>>>
>>>
>>> # will setting to list to None free cache?
>>>
>>> # or just driver memory
>>>
>>> List2OfDF = None
>>>
>>>
>>>
>>>
>>>
>>> lots of narrow transformations on d3
>>>
>>>
>>>
>>> return df3
>>>
>>>
>>>
>>> def main() :
>>>
>>> df = run()
>>>
>>> df.write()
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>


Re: groupMapReduce

2022-01-14 Thread David Diebold
Hello,

In RDD api, you must be looking for reduceByKey.

Cheers

Le ven. 14 janv. 2022 à 11:56, frakass  a écrit :

> Is there a RDD API which is similar to Scala's groupMapReduce?
> https://blog.genuine.com/2019/11/scalas-groupmap-and-groupmapreduce/
>
> Thank you.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Question about spark.sql min_by

2022-02-21 Thread David Diebold
Hello all,

I'm trying to use the spark.sql min_by aggregation function with pyspark.
I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2

I have a dataframe made of these columns:
- productId : int
- sellerId : int
- price : double

For each product, I want to get the seller who sells the product for the
cheapest price.

Naive approach would be to do this, but I would expect two shuffles:

import spark.sql.functions as F
cheapest_prices_df  =
df.groupby('productId').agg(F.min('price').alias('price'))
cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId', 'price'])

I would had loved to do this instead :

import spark.sql.functions as F
cheapest_sellers_df  = df.groupby('productId').agg(F.min('price'),
F.min_by('sellerId', 'price'))

Unfortunately min_by does not seem available in pyspark sql functions,
whereas I can see it in the doc :
https://spark.apache.org/docs/latest/api/sql/index.html

I have managed to use min_by with this approach but it looks slow (maybe
because of temp table creation ?):

df.createOrReplaceTempView("table")
cheapest_sellers_df = spark.sql("select min_by(sellerId, price) sellerId,
min(price) from table group by productId")

Is there a way I can rely on min_by directly in groupby ?
Is there some code missing in pyspark wrapper to make min_by visible
somehow ?

Thank you in advance for your help.

Cheers
David


Re: Question about spark.sql min_by

2022-02-21 Thread David Diebold
Thank you for your answers.
Indeed windowing should help there.
Also, I just realized maybe I can try to create a struct column with both
price and sellerId and apply min() on it, ordering would consider price
first for the ordering (https://stackoverflow.com/a/52669177/2015762)

Cheers!

Le lun. 21 févr. 2022 à 16:12, ayan guha  a écrit :

> Why this can not be done by window function? Or is min by is just a short
> hand?
>
> On Tue, 22 Feb 2022 at 12:42 am, Sean Owen  wrote:
>
>> From the source code, looks like this function was added to pyspark in
>> Spark 3.3, up for release soon. It exists in SQL. You can still use it in
>> SQL with `spark.sql(...)` in Python though, not hard.
>>
>> On Mon, Feb 21, 2022 at 4:01 AM David Diebold 
>> wrote:
>>
>>> Hello all,
>>>
>>> I'm trying to use the spark.sql min_by aggregation function with pyspark.
>>> I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2
>>>
>>> I have a dataframe made of these columns:
>>> - productId : int
>>> - sellerId : int
>>> - price : double
>>>
>>> For each product, I want to get the seller who sells the product for the
>>> cheapest price.
>>>
>>> Naive approach would be to do this, but I would expect two shuffles:
>>>
>>> import spark.sql.functions as F
>>> cheapest_prices_df  =
>>> df.groupby('productId').agg(F.min('price').alias('price'))
>>> cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId',
>>> 'price'])
>>>
>>> I would had loved to do this instead :
>>>
>>> import spark.sql.functions as F
>>> cheapest_sellers_df  = df.groupby('productId').agg(F.min('price'),
>>> F.min_by('sellerId', 'price'))
>>>
>>> Unfortunately min_by does not seem available in pyspark sql functions,
>>> whereas I can see it in the doc :
>>> https://spark.apache.org/docs/latest/api/sql/index.html
>>>
>>> I have managed to use min_by with this approach but it looks slow (maybe
>>> because of temp table creation ?):
>>>
>>> df.createOrReplaceTempView("table")
>>> cheapest_sellers_df = spark.sql("select min_by(sellerId, price)
>>> sellerId, min(price) from table group by productId")
>>>
>>> Is there a way I can rely on min_by directly in groupby ?
>>> Is there some code missing in pyspark wrapper to make min_by visible
>>> somehow ?
>>>
>>> Thank you in advance for your help.
>>>
>>> Cheers
>>> David
>>>
>> --
> Best Regards,
> Ayan Guha
>


Question about bucketing and custom partitioners

2022-04-11 Thread David Diebold
Hello,

I have a few questions related to bucketing and custom partitioning in
dataframe api.

I am considering bucketing to perform one-side free shuffle join in
incremental jobs, but there is one thing that I'm not happy with.
Data is likely to grow/skew over time. At some point, i would need to
change amount of buckets which would provoke shuffle.

Instead of this, I would like to use a custom partitioner, that would
replace shuffle by narrow transformation.
That is something that was feasible with RDD developer api. For example, I
could use such partitioning scheme:
partition_id = (nb_partitions-1) * ( hash(column) - Int.minValue) /
(Int.maxValue - Int.minValue)
When I multiply amount of partitions by 2 each new partition depends only
on one partition from parent (=> narrow transformation)

So, here are my questions :

1/ Is it possible to use custom partitioner when saving a dataframe with
bucketing ?
2/ Still with the API dataframe, is it possible to apply custom partitioner
to a dataframe ?
Is it possible to repartition the dataframe with a narrow
transformation like what could be done with RDD ?
Is there some sort of dataframe developer API ? Do you have any
pointers on this ?

Thanks !
David


Writing protobuf RDD to parquet

2023-01-20 Thread David Diebold
Hello,

I'm trying to write to parquet some RDD[T] where T is a protobuf message,
in scala.
I am wondering what is the best option to do this, and I would be
interested by your lights.
So far, I see two possibilities:
- use PairRDD method *saveAsNewAPIHadoopFile*, and I guess I need to
call *ParquetOutputFormat.setWriteSupportClass
*and *ProtoParquetOutputFormat.setProtobufClass *before. But in that case,
I'm not sure I have much control on how to partition files in different
folders on file system.
- or convert the RDD to dataframe then use *write.parquet ; *in that case,
I have more control, in case rely on *partitionBy *to arrange the files in
different folders. But I'm not sure there is some built-in way to convert
rdd of protobuf to dataframe in spark ? I would need to rely on this :
https://github.com/saurfang/sparksql-protobuf.

What do you think ?
Kind regards,
David