Re: Performance of PySpark jobs on the Kubernetes cluster
Hi Mich, I don't quite understand why the driver node is using so much CPU, but it may be unrelated to your executors being underused. About your executors being underused, I would check that your job generated enough tasks. Then I would check spark.executor.cores and spark.tasks.cpus parameters to see if I can give more work to the executors. Cheers, David Le mar. 10 août 2021 à 12:20, Khalid Mammadov a écrit : > Hi Mich > > I think you need to check your code. > If code does not use PySpark API effectively you may get this. I.e. if you > use pure Python/pandas api rather than Pyspark i.e. > transform->transform->action. e.g df.select(..).withColumn(...)...count() > > Hope this helps to put you on right direction. > > Cheers > Khalid > > > > > On Mon, 9 Aug 2021, 20:20 Mich Talebzadeh, > wrote: > >> Hi, >> >> I have a basic question to ask. >> >> I am running a Google k8s cluster (AKA GKE) with three nodes each having >> configuration below >> >> e2-standard-2 (2 vCPUs, 8 GB memory) >> >> >> spark-submit is launched from another node (actually a data proc single >> node that I have just upgraded to e2-custom (4 vCPUs, 8 GB mem). We call >> this the launch node >> >> OK I know that the cluster is not much but Google was complaining about >> the launch node hitting 100% cpus. So I added two more cpus to it. >> >> It appears that despite using k8s as the computational cluster, the >> burden falls upon the launch node! >> >> The cpu utilisation for launch node shown below >> >> [image: image.png] >> The dip is when 2 more cpus were added to it so it had to reboot. so >> around %70 usage >> >> The combined cpu usage for GKE nodes is shown below: >> >> [image: image.png] >> >> Never goes above 20%! >> >> I can see that the drive and executors as below: >> >> k get pods -n spark >> NAME READY STATUSRESTARTS >> AGE >> pytest-c958c97b2c52b6ed-driver 1/1 Running 0 >> 69s >> randomdatabigquery-e68a8a7b2c52f468-exec-1 1/1 Running 0 >> 51s >> randomdatabigquery-e68a8a7b2c52f468-exec-2 1/1 Running 0 >> 51s >> randomdatabigquery-e68a8a7b2c52f468-exec-3 0/1 Pending 0 >> 51s >> >> It is a PySpark 3.1.1 image using java 8 and pushing random data >> generated into Google BigQuery data warehouse. The last executor (exec-3) >> seems to be just pending. The spark-submit is as below: >> >> spark-submit --verbose \ >>--properties-file ${property_file} \ >>--master k8s://https://$KUBERNETES_MASTER_IP:443 \ >>--deploy-mode cluster \ >>--name pytest \ >>--conf >> spark.yarn.appMasterEnv.PYSPARK_PYTHON=./pyspark_venv/bin/python \ >>--py-files $CODE_DIRECTORY/DSBQ.zip \ >>--conf spark.kubernetes.namespace=$NAMESPACE \ >>--conf spark.executor.memory=5000m \ >>--conf spark.network.timeout=300 \ >>--conf spark.executor.instances=3 \ >>--conf spark.kubernetes.driver.limit.cores=1 \ >>--conf spark.driver.cores=1 \ >>--conf spark.executor.cores=1 \ >>--conf spark.executor.memory=2000m \ >>--conf spark.kubernetes.driver.docker.image=${IMAGEGCP} \ >>--conf spark.kubernetes.executor.docker.image=${IMAGEGCP} \ >>--conf spark.kubernetes.container.image=${IMAGEGCP} \ >>--conf >> spark.kubernetes.authenticate.driver.serviceAccountName=spark-bq \ >>--conf >> spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \ >>--conf >> spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" >> \ >>--conf spark.sql.execution.arrow.pyspark.enabled="true" \ >>$CODE_DIRECTORY/${APPLICATION} >> >> Aren't the driver and executors running on K8s cluster? So why is the >> launch node heavily used but k8s cluster is underutilized? >> >> Thanks >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >
Trying to hash cross features with mllib
Hello everyone, In MLLib, I’m trying to rely essentially on pipelines to create features out of the Titanic dataset, and show-case the power of feature hashing. I want to: - Apply bucketization on some columns (QuantileDiscretizer is fine) - Then I want to cross all my columns with each other to have cross features. - Then I would like to hash all of these cross features into a vector. - Then give it to a logistic regression. Looking at the documentation, it looks like the only way to hash features is the *FeatureHasher* transformation. It takes multiple columns as input, type can be numeric, bool, string (but no vector/array). But now I’m left wondering how I can create my cross-feature columns. I’m looking at a transformation that could take two columns as input, and return a numeric, bool, or string. I didn't manage to find anything that does the job. There are multiple transformations such as VectorAssembler, that operate on vector, but this is not a typeaccepted by the FeatureHasher. Of course, I could try to combine columns directly in my dataframe (before the pipeline kicks-in), but then I would not be able to benefit any more from QuantileDiscretizer and other cool functions. Am I missing something in the transformation api ? Or is my approach to hashing wrong ? Or should we consider to extend the api somehow ? Thank you, kind regards, David
Re: Trying to hash cross features with mllib
Hello Sean, Thank you for the heads-up ! Interaction transform won't help for my use case as it returns a vector that I won't be able to hash. I will definitely dig further into custom transformations though. Thanks ! David Le ven. 1 oct. 2021 à 15:49, Sean Owen a écrit : > Are you looking for > https://spark.apache.org/docs/latest/ml-features.html#interaction ? > That's the closest built in thing I can think of. Otherwise you can make > custom transformations. > > On Fri, Oct 1, 2021, 8:44 AM David Diebold > wrote: > >> Hello everyone, >> >> In MLLib, I’m trying to rely essentially on pipelines to create features >> out of the Titanic dataset, and show-case the power of feature hashing. I >> want to: >> >> - Apply bucketization on some columns (QuantileDiscretizer is >> fine) >> >> - Then I want to cross all my columns with each other to have >> cross features. >> >> - Then I would like to hash all of these cross features into a >> vector. >> >> - Then give it to a logistic regression. >> >> Looking at the documentation, it looks like the only way to hash features >> is the *FeatureHasher* transformation. It takes multiple columns as >> input, type can be numeric, bool, string (but no vector/array). >> >> But now I’m left wondering how I can create my cross-feature columns. I’m >> looking at a transformation that could take two columns as input, and >> return a numeric, bool, or string. I didn't manage to find anything that >> does the job. There are multiple transformations such as VectorAssembler, >> that operate on vector, but this is not a typeaccepted by the FeatureHasher. >> >> Of course, I could try to combine columns directly in my dataframe >> (before the pipeline kicks-in), but then I would not be able to benefit any >> more from QuantileDiscretizer and other cool functions. >> >> >> Am I missing something in the transformation api ? Or is my approach to >> hashing wrong ? Or should we consider to extend the api somehow ? >> >> >> >> Thank you, kind regards, >> >> David >> >
question about data skew and memory issues
Hello all, I was wondering if it possible to encounter out of memory exceptions on spark executors when doing some aggregation, when a dataset is skewed. Let's say we have a dataset with two columns: - key : int - value : float And I want to aggregate values by key. Let's say that we have a tons of key equal to 0. Is it possible to encounter some out of memory exception after the shuffle ? My expectation would be that the executor responsible of aggregating the '0' partition could indeed have some oom exception if it tries to put all the files of this partition in memory before processing them. But why would it need to put them in memory when doing in aggregation ? It looks to me that aggregation can be performed in a stream fashion, so I would not expect any oom at all.. Thank you in advance for your lights :) David
Re: Pyspark debugging best practices
Hello Andy, Are you sure you want to perform lots of join operations, and not simple unions ? Are you doing inner joins or outer joins ? Can you provide us with a rough amount of your list size plus each individual dataset size ? Have a look at execution plan would help, maybe the high amount of join operations makes execution plan too complicated at the end of the day ; checkpointing could help there ? Cheers, David Le jeu. 30 déc. 2021 à 16:56, Andrew Davidson a écrit : > Hi Gourav > > I will give databricks a try. > > Each data gets loaded into a data frame. > I select one column from the data frame > I join the column to the accumulated joins from previous data frames in > the list > > To debug. I think am gaining to put an action and log statement after each > join. I do not think it will change the performance. I believe the physical > plan will be the same how ever hopefully it will shed some light. > > At the very least I will know if it making progress or not. And hopefully > where it is breaking > > Happy new year > > Andy > > On Tue, Dec 28, 2021 at 4:19 AM Gourav Sengupta > wrote: > >> Hi Andrew, >> >> Any chance you might give Databricks a try in GCP? >> >> The above transformations look complicated to me, why are you adding >> dataframes to a list? >> >> >> Regards, >> Gourav Sengupta >> >> >> >> On Sun, Dec 26, 2021 at 7:00 PM Andrew Davidson >> wrote: >> >>> Hi >>> >>> >>> >>> I am having trouble debugging my driver. It runs correctly on smaller >>> data set but fails on large ones. It is very hard to figure out what the >>> bug is. I suspect it may have something do with the way spark is installed >>> and configured. I am using google cloud platform dataproc pyspark >>> >>> >>> >>> The log messages are not helpful. The error message will be something >>> like >>> "User application exited with status 1" >>> >>> >>> >>> And >>> >>> >>> >>> jsonPayload: { >>> >>> class: "server.TThreadPoolServer" >>> >>> filename: "hive-server2.log" >>> >>> message: "Error occurred during processing of message." >>> >>> thread: "HiveServer2-Handler-Pool: Thread-40" >>> >>> } >>> >>> >>> >>> I am able to access the spark history server however it does not capture >>> anything if the driver crashes. I am unable to figure out how to access >>> spark web UI. >>> >>> >>> >>> My driver program looks something like the pseudo code bellow. A long >>> list of transforms with a single action, (i.e. write) at the end. Adding >>> log messages is not helpful because of lazy evaluations. I am tempted to >>> add something like >>> >>> >>> >>> Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and >>> inline some sort of diagnostic message. >>> >>> >>> >>> What do you think? >>> >>> >>> >>> Is there a better way to debug this? >>> >>> >>> >>> Kind regards >>> >>> >>> >>> Andy >>> >>> >>> >>> def run(): >>> >>> listOfDF = [] >>> >>> for filePath in listOfFiles: >>> >>> df = spark.read.load( filePath, ...) >>> >>> listOfDF.append(df) >>> >>> >>> >>> >>> >>> list2OfDF = [] >>> >>> for df in listOfDF: >>> >>> df2 = df.select( ) >>> >>> lsit2OfDF.append( df2 ) >>> >>> >>> >>> # will setting list to None free cache? >>> >>> # or just driver memory >>> >>> listOfDF = None >>> >>> >>> >>> >>> >>> df3 = list2OfDF[0] >>> >>> >>> >>> for i in range( 1, len(list2OfDF) ): >>> >>> df = list2OfDF[i] >>> >>> df3 = df3.join(df ...) >>> >>> >>> >>> # will setting to list to None free cache? >>> >>> # or just driver memory >>> >>> List2OfDF = None >>> >>> >>> >>> >>> >>> lots of narrow transformations on d3 >>> >>> >>> >>> return df3 >>> >>> >>> >>> def main() : >>> >>> df = run() >>> >>> df.write() >>> >>> >>> >>> >>> >>> >>> >>
Re: groupMapReduce
Hello, In RDD api, you must be looking for reduceByKey. Cheers Le ven. 14 janv. 2022 à 11:56, frakass a écrit : > Is there a RDD API which is similar to Scala's groupMapReduce? > https://blog.genuine.com/2019/11/scalas-groupmap-and-groupmapreduce/ > > Thank you. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Question about spark.sql min_by
Hello all, I'm trying to use the spark.sql min_by aggregation function with pyspark. I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2 I have a dataframe made of these columns: - productId : int - sellerId : int - price : double For each product, I want to get the seller who sells the product for the cheapest price. Naive approach would be to do this, but I would expect two shuffles: import spark.sql.functions as F cheapest_prices_df = df.groupby('productId').agg(F.min('price').alias('price')) cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId', 'price']) I would had loved to do this instead : import spark.sql.functions as F cheapest_sellers_df = df.groupby('productId').agg(F.min('price'), F.min_by('sellerId', 'price')) Unfortunately min_by does not seem available in pyspark sql functions, whereas I can see it in the doc : https://spark.apache.org/docs/latest/api/sql/index.html I have managed to use min_by with this approach but it looks slow (maybe because of temp table creation ?): df.createOrReplaceTempView("table") cheapest_sellers_df = spark.sql("select min_by(sellerId, price) sellerId, min(price) from table group by productId") Is there a way I can rely on min_by directly in groupby ? Is there some code missing in pyspark wrapper to make min_by visible somehow ? Thank you in advance for your help. Cheers David
Re: Question about spark.sql min_by
Thank you for your answers. Indeed windowing should help there. Also, I just realized maybe I can try to create a struct column with both price and sellerId and apply min() on it, ordering would consider price first for the ordering (https://stackoverflow.com/a/52669177/2015762) Cheers! Le lun. 21 févr. 2022 à 16:12, ayan guha a écrit : > Why this can not be done by window function? Or is min by is just a short > hand? > > On Tue, 22 Feb 2022 at 12:42 am, Sean Owen wrote: > >> From the source code, looks like this function was added to pyspark in >> Spark 3.3, up for release soon. It exists in SQL. You can still use it in >> SQL with `spark.sql(...)` in Python though, not hard. >> >> On Mon, Feb 21, 2022 at 4:01 AM David Diebold >> wrote: >> >>> Hello all, >>> >>> I'm trying to use the spark.sql min_by aggregation function with pyspark. >>> I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2 >>> >>> I have a dataframe made of these columns: >>> - productId : int >>> - sellerId : int >>> - price : double >>> >>> For each product, I want to get the seller who sells the product for the >>> cheapest price. >>> >>> Naive approach would be to do this, but I would expect two shuffles: >>> >>> import spark.sql.functions as F >>> cheapest_prices_df = >>> df.groupby('productId').agg(F.min('price').alias('price')) >>> cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId', >>> 'price']) >>> >>> I would had loved to do this instead : >>> >>> import spark.sql.functions as F >>> cheapest_sellers_df = df.groupby('productId').agg(F.min('price'), >>> F.min_by('sellerId', 'price')) >>> >>> Unfortunately min_by does not seem available in pyspark sql functions, >>> whereas I can see it in the doc : >>> https://spark.apache.org/docs/latest/api/sql/index.html >>> >>> I have managed to use min_by with this approach but it looks slow (maybe >>> because of temp table creation ?): >>> >>> df.createOrReplaceTempView("table") >>> cheapest_sellers_df = spark.sql("select min_by(sellerId, price) >>> sellerId, min(price) from table group by productId") >>> >>> Is there a way I can rely on min_by directly in groupby ? >>> Is there some code missing in pyspark wrapper to make min_by visible >>> somehow ? >>> >>> Thank you in advance for your help. >>> >>> Cheers >>> David >>> >> -- > Best Regards, > Ayan Guha >
Question about bucketing and custom partitioners
Hello, I have a few questions related to bucketing and custom partitioning in dataframe api. I am considering bucketing to perform one-side free shuffle join in incremental jobs, but there is one thing that I'm not happy with. Data is likely to grow/skew over time. At some point, i would need to change amount of buckets which would provoke shuffle. Instead of this, I would like to use a custom partitioner, that would replace shuffle by narrow transformation. That is something that was feasible with RDD developer api. For example, I could use such partitioning scheme: partition_id = (nb_partitions-1) * ( hash(column) - Int.minValue) / (Int.maxValue - Int.minValue) When I multiply amount of partitions by 2 each new partition depends only on one partition from parent (=> narrow transformation) So, here are my questions : 1/ Is it possible to use custom partitioner when saving a dataframe with bucketing ? 2/ Still with the API dataframe, is it possible to apply custom partitioner to a dataframe ? Is it possible to repartition the dataframe with a narrow transformation like what could be done with RDD ? Is there some sort of dataframe developer API ? Do you have any pointers on this ? Thanks ! David
Writing protobuf RDD to parquet
Hello, I'm trying to write to parquet some RDD[T] where T is a protobuf message, in scala. I am wondering what is the best option to do this, and I would be interested by your lights. So far, I see two possibilities: - use PairRDD method *saveAsNewAPIHadoopFile*, and I guess I need to call *ParquetOutputFormat.setWriteSupportClass *and *ProtoParquetOutputFormat.setProtobufClass *before. But in that case, I'm not sure I have much control on how to partition files in different folders on file system. - or convert the RDD to dataframe then use *write.parquet ; *in that case, I have more control, in case rely on *partitionBy *to arrange the files in different folders. But I'm not sure there is some built-in way to convert rdd of protobuf to dataframe in spark ? I would need to rely on this : https://github.com/saurfang/sparksql-protobuf. What do you think ? Kind regards, David