Using storage decommissioning on K8S cluster

2025-02-19 Thread Enrico Minack
Hi Spark users, I am trying to use the executor and storage decommissioning feature in conjunction with an S3 fallback storage, on a K8S cluster. I am wondering how mature this feature is considered, given it is around for quite some time. Has this been used in anger on K8S? What is your exper

Re: External Spark shuffle service for k8s

2024-04-07 Thread Enrico Minack
There is Apache incubator project Uniffle: https://github.com/apache/incubator-uniffle It stores shuffle data on remote servers in memory, on local disk and HDFS. Cheers, Enrico Am 06.04.24 um 15:41 schrieb Mich Talebzadeh: I have seen some older references for shuffle service for k8s, althou

Re: AQE coalesce 60G shuffle data into a single partition

2024-02-24 Thread Enrico Minack
Hi Shay, maybe this is related to the small number of output rows (1,250) of the last exchange step that consume those 60GB shuffle data. Looks like your outer transformation is something like df.groupBy($"id").agg(collect_list($"prop_name")) Have you tried adding a repartition as an attempt t

Re: ordering of rows in dataframe

2023-12-05 Thread Enrico Minack
Looks like what you want is to add a column that, when ordered by that column, the current order of the dateframe is preserved. All you need is the monotonically_increasing_id() function: spark.range(0, 10, 1, 5).withColumn("row", monotonically_increasing_id()).show() +---+---+ | id|

Re: [PySpark][Spark Dataframe][Observation] Why empty dataframe join doesn't let you get metrics from observation?

2023-12-05 Thread Enrico Minack
#39] where the join also gets optimized away, but table df is still filtered for col1 = 'c', which iterates over the rows and collects the metrics for observation 1. Hope this helps to understand why there are no observed metrics for Observation("1") in your case. Enrico Am 04

Re: [PySpark][Spark Dataframe][Observation] Why empty dataframe join doesn't let you get metrics from observation?

2023-12-04 Thread Enrico Minack
Hi Michail, observations as well as ordinary accumulators only observe / process rows that are iterated / consumed by downstream stages. If the query plan decides to skip one side of the join, that one will be removed from the final plan completely. Then, the Observation will not retrieve any met

Re: Apache Spark not reading UTC timestamp from MongoDB correctly

2023-06-08 Thread Enrico Minack
Sean is right, casting timestamps to strings (which is what show() does) uses the local timezone, either the Java default zone `user.timezone`, the Spark default zone `spark.sql.session.timeZone` or the default DataFrameWriter zone `timeZone`(when writing to file). You say you are in PST, whic

Re: Incremental Value dependents on another column of Data frame Spark

2023-05-24 Thread Enrico Minack
Hi, given your dataset: val df=Seq( (1, 20230523, "M01"), (2, 20230523, "M01"), (3, 20230523, "M01"), (4, 20230523, "M02"), (5, 20230523, "M02"), (6, 20230523, "M02"), (7, 20230523, "M01"), (8, 20230523, "M01"), (9, 20230523, "M02"), (10, 20230523, "M02"), (11, 20230523, "M02"), (12, 20230523

Re: Write custom JSON from DataFrame in PySpark

2023-05-04 Thread Enrico Minack
Hi, You could rearrange the DataFrame so that writing the DataFrame as-is produces your structure: df = spark.createDataFrame([(1, "a1"), (2, "a2"), (3, "a3")], "id int, datA string") +---++ | id|datA| +---++ |  1|  a1| |  2|  a2| |  3|  a3| +---++ df2 = df.select(df.id, struct(df.d

Re: Use Spark Aggregator in PySpark

2023-04-24 Thread Enrico Minack
Hi, For an aggregating UDF, use spark.udf.registerJavaUDAF(name, className). Enrico Am 23.04.23 um 23:42 schrieb Thomas Wang: Hi Spark Community, I have implemented a custom Spark Aggregator (a subclass to |org.apache.spark.sql.expressions.Aggregator|). Now I'm trying to use it in a PySpa

Re: How to explode array columns of a dataframe having the same length

2023-02-16 Thread Enrico Minack
You have to take each row and zip the lists, each element of the result becomes one new row. So turn write a method that turns   Row(List("A","B","null"), List("C","D","null"), List("E","null","null")) into   List(List("A","C","E"), List("B","D","null"), List("null","null","null")) and use flatm

Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-12 Thread Enrico Minack
OK, what do you mean by " do your outer for loop in parallel "? btw this didn't work: for (String columnName : df.columns()) {     df= df.withColumn(columnName, collect_set(col(columnName)).as(columnName)); } Le dim. 12 févr. 2023 à 20:36, Enrico Minack

Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-12 Thread Enrico Minack
en you have a single DataFrame that computes all columns in a single Spark job. But this reads all distinct values into a single partition, which has the same downside as collect, so this is as bad as using collect. Cheers, Enrico Am 12.02.23 um 18:05 schrieb sam smith: @Enrico Minack <m

Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-11 Thread Enrico Minack
You could do the entire thing in DataFrame world and write the result to disk. All you need is unpivot (to be released in Spark 3.4.0, soon). Note this is Scala but should be straightforward to translate into Java: import org.apache.spark.sql.functions.collect_set val df = Seq((1, 10, 123), (2

SQL GROUP BY alias with dots, was: Spark SQL question

2023-02-07 Thread Enrico Minack
Hi, you are right, that is an interesting question. Looks like GROUP BY is doing something funny / magic here (spark-shell 3.3.1 and 3.5.0-SNAPSHOT): With an alias, it behaves as you have pointed out: spark.range(3).createTempView("ids_without_dots") spark.sql("SELECT * FROM ids_without_dots

Re: The Dataset unit test is much slower than the RDD unit test (in Scala)

2022-11-01 Thread Enrico Minack
Hi Tanin, running your test with option "spark.sql.planChangeLog.level" set to "info" or "warn" (depending on your Spark log level) will show you insights into the planning (which rules are applied, how long rules take, how many iterations are done). Hoping this helps, Enrico Am 25.10.22 u

Re: Reading too many files

2022-10-05 Thread Enrico Minack
Hi, Spark is fine with that many Parquet files in general: # generate 100,000 small Parquet files spark.range(0, 100, 1, 10).write.parquet("too-many-files.parquet") # read 100,000 Parquet files val df = spark.read.parquet("too-many-files.parquet") df.show() df.count() Reading the files

Re: [Spark Internals]: Is sort order preserved after partitioned write?

2022-09-17 Thread Enrico Minack
0796737203| +---+---+-+---+ Thanks, Swetha On Fri, Sep 16, 2022 at 1:45 AM Enrico Minack wrote: Yes, you can expect each partition file to be sorted by "col1" and "col2". However, values

Re: Splittable or not?

2022-09-17 Thread Enrico Minack
If with "won't affect the performance" you mean "parquet is splittable though it uses snappy", then yes. Splittable files allow for optimal parallelization, which "won't affect performance". Spark writing data will split the data into multiple files already (here parquet files). Even if each f

Re: [Spark Internals]: Is sort order preserved after partitioned write?

2022-09-15 Thread Enrico Minack
Yes, you can expect each partition file to be sorted by "col1" and "col2". However, values for "col1" will be "randomly" allocated to partition files, but all rows with the same value for "col1" will reside in the same one partition file. What kind of unexpected sort order do you observe? En

Re: reading each JSON file from dataframe...

2022-07-12 Thread Enrico Minack
FS wrapper and the JAR library uses different dependencies. Hope this findings helps others as well. Thanks, Muthu On Mon, 11 Jul 2022 at 14:11, Enrico Minack wrote: All you need to do is implement a method readJson that reads a single file given its path. Than, you map the values

Re: reading each JSON file from dataframe...

2022-07-11 Thread Enrico Minack
All you need to do is implement a method readJson that reads a single file given its path. Than, you map the values of column file_path to the respective JSON content as a string. This can be done via an UDF or simply Dataset.map: case class RowWithJsonUri(entity_id: String, file_path: String,

Re: Will it lead to OOM error?

2022-06-22 Thread Enrico Minack
might lead to OOM error? Thanks, Sid On Wed, Jun 22, 2022 at 6:40 PM Enrico Minack wrote: The RAM and disk memory consumtion depends on what you do with the data after reading them. Your particular action will read 20 lines from the first partition and show them. So it will no

Re: Will it lead to OOM error?

2022-06-22 Thread Enrico Minack
The RAM and disk memory consumtion depends on what you do with the data after reading them. Your particular action will read 20 lines from the first partition and show them. So it will not use any RAM or disk, no matter how large the CSV is. If you do a count instead of show, it will iterate

Re: input file size

2022-06-19 Thread Enrico Minack
Maybe a   .as[String].mapPartitions(it => if (it.hasNext) Iterator(it.next) else Iterator.empty) might be faster than the   .distinct.as[String] Enrico Am 19.06.22 um 08:59 schrieb Enrico Minack: Given you already know your input files (input_file_name), why not getting their size

Re: input file size

2022-06-18 Thread Enrico Minack
Given you already know your input files (input_file_name), why not getting their size and summing this up? |import java.io.File ||import java.net.URI| |import| org.apache.spark.sql.functions.input_file_name |ds.select(input_file_name.as("filename")) .distinct.as[String] .map(filename => new F

Re: API Problem

2022-06-13 Thread Enrico Minack
                response = call_to_cust_bulk_api(policyUrl, custRequestBody)                 print(response)                 finalDFStatus = finalDF.withColumn("edl_timestamp", to_timestamp(lit(F.TimeNow(.withColumn(                     "status_for_each_batch",            

Re: API Problem

2022-06-10 Thread Enrico Minack
I am expecting the payload to be as a JSON string to be a record like below: {"A":"some_value","B":"some_value"} Where A and B are the columns in my dataset. On Fri, Jun 10, 2022 at 6:09 PM Enrico Minack wrote: Sid, just recognized you are us

Re: API Problem

2022-06-10 Thread Enrico Minack
use of a column expression. What do you expect |print(payload)| to be? I recommend to split that complex command into multiple commands to find out what "an error of column not iterable" refers to. Enrico Am 10.06.22 um 13:39 schrieb Enrico Minack: Hi Sid, ||finalDF = finalDF.r

Re: API Problem

2022-06-10 Thread Enrico Minack
Hi Sid, ||finalDF = finalDF.repartition(finalDF.rdd.getNumPartitions()) .withColumn("status_for_batch", call_to_cust_bulk_api(policyUrl, to_json(struct(*colsListToBePassed | | You are calling ||withColumn|| with the result of ||call_to_cust_bulk_api|| as the second argument. That result

Re: partitionBy creating lot of small files

2022-06-04 Thread Enrico Minack
You refer to df.write.partitionBy, which creates for each value of "col" a directory, and in worst-case writes one file per DataFrame partition. So the number of output files is controlled by cardinality of "col", which is your data and hence out of control, and the number of partitions of your

Re: How to convert a Dataset to a Dataset?

2022-06-04 Thread Enrico Minack
path"); will yield StringType as a type for column c1 similarly for c6 I want to return the true type of each column by first discarding the "+" I use Dataset after filtering the rows (removing "+") because i can re-read the new dataset using .csv() method. Any better ide

Re: How to convert a Dataset to a Dataset?

2022-06-04 Thread Enrico Minack
Can you provide an example string (row) and the expected inferred schema? Enrico Am 04.06.22 um 18:36 schrieb marc nicole: How to do just that? i thought we only can inferSchema when we first read the dataset, or am i wrong? Le sam. 4 juin 2022 à 18:10, Sean Owen a écrit : It sounds li

Re: PartitionBy and SortWithinPartitions

2022-06-03 Thread Enrico Minack
Nikhil, What are you trying to achieve with this in the first place? What are your goals? What is the problem with your approach? Are you concerned about the 1000 files in each written col2-partition? The write.partitionBy is something different that df.repartition or df.coalesce. The df p

Re: Writing Custom Spark Readers and Writers

2022-04-06 Thread Enrico Minack
Another project implementing DataSource V2 in Scala with Python wrapper: https://github.com/G-Research/spark-dgraph-connector Cheers, Enrico Am 06.04.22 um 12:01 schrieb Cheng Pan: There are some projects based on Spark DataSource V2 that I hope will help you. https://github.com/datastax/spa

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-03-31 Thread Enrico Minack
How well Spark can scale up with your data (in terms of years of data) depends on two things: the operations performed on the data, and characteristics of the data, like value distributions. Failing tasks smell like you are using operations that do not scale (e.g. Cartesian product of your dat

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-03-30 Thread Enrico Minack
> Wrt looping: if I want to process 3 years of data, my modest cluster will never do it one go , I would expect? > I have to break it down in smaller pieces and run that in a loop (1 day is already lots of data). Well, that is exactly what Spark is made for. It splits the work up and processes

Re: GraphX Support

2022-03-22 Thread Enrico Minack
Right, GraphFrames is not very active and maintainers don't even have the capacity to make releases. Enrico Am 22.03.22 um 00:10 schrieb Sean Owen: GraphX is not active, though still there and does continue to build and test with each Spark release. GraphFrames kind of superseded it, but is

Re: 回复:Re: calculate correlation between multiple columns and one specific column after groupby the spark data frame

2022-03-16 Thread Enrico Minack
If you have a list of Columns called `columns`, you can pass them to the `agg` method as:   agg(columns.head, columns.tail: _*) Enrico Am 16.03.22 um 08:02 schrieb ckgppl_...@sina.cn: Thanks, Sean. I modified the codes and have generated a list of columns. I am working on convert a list of c

Re: Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Enrico Minack
Sid, Your Aggregation Query selects all employees where less than three distinct salaries exist that are larger. So, both queries seem to do the same. The Windowing Query is explicit in what it does: give me the rank for salaries per department in the given order and pick the top 3 per depa

Re: [Spark Core] Why no spark.read.delta / df.write.delta?

2020-10-05 Thread Enrico Minack
Though spark.read. refers to "built-in" data sources, there is nothing that prevents 3rd party libraries to "extend" spark.read in Scala or Python. As users know the Spark-way to read built-in data sources, it feels natural to hook 3rd party data sources under the same scheme, to give users a h

Re: Query about Spark

2020-09-07 Thread Enrico Minack
You could use Horovod to distribute your ML algorithm on a cluster, while Horovod also supports Spark clusters. Enrico Am 06.09.20 um 15:30 schrieb Ankur Das: Good Evening Sir/Madam, Hope you are doing well, I am experimenting on some ML techniques where I need to test it on a distributed e

Re: regexp_extract regex for extracting the columns from string

2020-08-10 Thread Enrico Minack
You can remove the <1000> first and then turn the string into a map (interpret the string as key-values). From that map you can access each key and turn it into a separate column: Seq(("<1000> date=2020-08-01 time=20:50:04 name=processing id=123 session=new packt=20 orgin=null address=null des

Re: Unablee to get to_timestamp with Timezone Information

2020-04-02 Thread Enrico Minack
Once parsed into a Timestamp the timestamp is store internally as UTC and printed as your local timezone (e.g. as defined by spark.sql.session.timeZone). Spark is good at hiding timezone information from you. You can get the timezone information via date_format(column, format): import org.apa

Re: Issue with UDF Int Conversion - Str to Int

2020-03-23 Thread Enrico Minack
Ayan, no need for UDFs, the SQL API provides all you need (sha1, substring, conv): https://spark.apache.org/docs/2.4.5/api/python/pyspark.sql.html >>> df.select(conv(substring(sha1(col("value_to_hash")), 33, 8), 16, 10).cast("long").alias("sha2long")).show() +--+ |  sha2long| +

Re: Time-based frequency table at scale

2020-03-11 Thread Enrico Minack
An interesting puzzle indeed. What is your measure of "that scales"? Does not fail, does not spill, does not need a huge amount of memory / disk, is O(N), processes X records per second and core? Enrico Am 11.03.20 um 16:59 schrieb sakag: Hi all, We have a rather interesting use case, an

Re: Spark driver thread

2020-03-06 Thread Enrico Minack
James, If you are having multithreaded code in your driver, then you should allocate multiple cores. In cluster mode you share the node with other jobs. If you allocate fewer cores than you are using in your driver, then that node gets over-allocated and you are stealing other applications' r

Re: Compute the Hash of each row in new column

2020-03-02 Thread Enrico Minack
n Fri, Feb 28, 2020 at 7:28 PM Enrico Minack <mailto:m...@enrico.minack.dev>> wrote: This computes the md5 hash of a given column id of Dataset ds: ds.withColumn("id hash", md5($"id")).show(false) Test with this Dataset ds: import org.apache.spa

Re:

2020-03-02 Thread Enrico Minack
Looks like the schema of some files is unexpected. You could either run parquet-tools on each of the files and extract the schema to find the problematic files: |hdfs |||-stat "%n"| |hdfs://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet

Re: Compute the Hash of each row in new column

2020-02-28 Thread Enrico Minack
This computes the md5 hash of a given column id of Dataset ds: ds.withColumn("id hash", md5($"id")).show(false) Test with this Dataset ds: import org.apache.spark.sql.types._ val ds = spark.range(10).select($"id".cast(StringType)) Available are md5, sha, sha1, sha2 and hash: https://spark.apa

Re: Convert each partition of RDD to Dataframe

2020-02-27 Thread Enrico Minack
sequentially in Driver program and transform/write to hdfs one after the other * Or the current approach mentioned in the previous mail What will be the performance implications ? Regards Manjunath *From:* Enrico Minack

Re: Convert each partition of RDD to Dataframe

2020-02-27 Thread Enrico Minack
Hi Manjunath, why not creating 10 DataFrames loading the different tables in the first place? Enrico Am 27.02.20 um 14:53 schrieb Manjunath Shetty H: Hi Vinodh, Thanks for the quick response. Didn't got what you meant exactly, any reference or snippet  will be helpful. To explain the pr

[SPARK-30957][SQL] Null-safe variant of Dataset.join(Dataset[_], Seq[String])

2020-02-26 Thread Enrico Minack
I have created a jira to track this request: https://issues.apache.org/jira/browse/SPARK-30957 Enrico Am 08.02.20 um 16:56 schrieb Enrico Minack: Hi Devs, I am forwarding this from the user mailing list. I agree that the <=> version of join(Dataset[_], Seq[String]) would be useful.

Re: Questions about count() performance with dataframes and parquet files

2020-02-17 Thread Enrico Minack
very large tables ? Is caching faster than recomputing both insert/update ? Thanks Enrico Minack writes: Ashley, I want to suggest a few optimizations. The problem might go away but at least performance should improve. The freeze problems could have many reasons, the Spark UI SQL pages and s

Re: Questions about count() performance with dataframes and parquet files

2020-02-13 Thread Enrico Minack
Ashley, I want to suggest a few optimizations. The problem might go away but at least performance should improve. The freeze problems could have many reasons, the Spark UI SQL pages and stages detail pages would be useful. You can send them privately, if you wish. 1. the repartition(1) shoul

Re: Reading 7z file in spark

2020-01-14 Thread Enrico Minack
Hi, Spark does not support 7z natively, but you can read any file in Spark: def read(stream: PortableDataStream):Iterator[String] = {Seq(stream.getPath()).iterator } spark.sparkContext .binaryFiles("*.7z") .flatMap(file => read(file._2)) .toDF("path") .show(false) This scales with the

Re: [pyspark2.4+] When to choose RDD over Dataset, was: A lot of tasks failed, but job eventually completes

2020-01-06 Thread Enrico Minack
7;s always some or the other way to use windows on data frames. I always get confused as to when to fall back on RDD approach? Any use case in your experience warrant for RDD use, for better performance? Thanks, Rishi On Mon, Jan 6, 2020 at 4:18 AM Enrico Minack <mailto:m...@enrico.minac

Re: OrderBy Year and Month is not displaying correctly

2020-01-06 Thread Enrico Minack
The distinct transformation does not preserve order, you need to distinct first, then orderby. Enrico Am 06.01.20 um 00:39 schrieb Mich Talebzadeh: Hi, I am working out monthly outgoing etc from an account and I am using the following code import org.apache.spark.sql.expressions.Window va

Re: [pyspark2.4+] A lot of tasks failed, but job eventually completes

2020-01-06 Thread Enrico Minack
Note that repartitioning helps to increase the number of partitions (and hence to reduce the size of partitions and required executor memory), but subsequent transformations like join will repartition data again with the configured number of partitions (|spark.sql.shuffle.partitions|), virtuall

Re: Identify bottleneck

2019-12-19 Thread Enrico Minack
, 18 Dec 2019 at 9:14 pm, Enrico Minack mailto:m...@enrico.minack.dev>> wrote: How many withColumn statements do you have? Note that it is better to use a single select, rather than lots of withColumn. This also makes drops redundant. Reading 25m CSV lines and w

Re: Identify bottleneck

2019-12-18 Thread Enrico Minack
ut it's not reasonable for maintaining purpose. I will try on a local instance and let you know. Thanks  for the help. *De: *"Enrico Minack" mailto:m...@enrico.minack.dev>> *À: *us

Re: Identify bottleneck

2019-12-18 Thread Enrico Minack
How many withColumn statements do you have? Note that it is better to use a single select, rather than lots of withColumn. This also makes drops redundant. Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is really slow. Can you try this on a single machine, i.e. run wit "

Re: Issue With mod function in Spark SQL

2019-12-17 Thread Enrico Minack
I think some example code would help to understand what you are doing. Am 18.12.19 um 08:12 schrieb Tzahi File: no.. there're 100M records both even and odd On Tue, Dec 17, 2019 at 8:13 PM Russell Spitzer mailto:russell.spit...@gmail.com>> wrote: Is there a chance your data is all even o