Re: repartition before writing to table with bucketed partitioning

2024-12-01 Thread Soumasish
I might have misunderstood the issue. Spark indeed will repartition the data while writing, what it won't do is write precisely 10 files inside each date partition folder sorted by col x. Typically this kind of fine grained write config is useful if there's a downstream consumer that wi

Re: repartition before writing to table with bucketed partitioning

2024-12-01 Thread Henryk Česnolovič
Henryk Česnolovič 08:30 (5 hours ago) to Soumasish Ok nvm. Seems we don't need to do repartition, as spark handles itself. df.writeTo("some_table").partitionedBy(col("date"), col("x"), bucket(10, col("y"))).using("iceberg").createOrRepl

Re: repartition before writing to table with bucketed partitioning

2024-11-30 Thread Soumasish
Henryk, I could reproduce your issue and achieve the desired result using SQL DDL. Here's the workaround. package replicator import org.apache.spark.sql.SparkSession object Bucketing extends App { val spark = SparkSession.builder() .appName("ReproduceError") .master("local[*]")

repartition before writing to table with bucketed partitioning

2024-11-29 Thread Henryk Česnolovič
Hello. Maybe somebody has faced the same issue. Trying to write data to the table while using DataFrame API v2. Table is partitioned by buckets using df.writeTo("some_table").partitionedBy(col("date"), col("x"), bucket(10, col("y"))).using("iceberg").createOrReplace() Can I somehow prepare df in

Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-05 Thread Mich Talebzadeh
, the so called vcores. So it depends on the number of nodes you are using in your spark cluster. Without doing a PoC you would not need to worry about repartition(10) in your writeStream. I suggest that for now you remove that parameter and observe the spark processing through Spark GUI (default port

Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-05 Thread Perez
atest/optimizations-oss.html#language-python > [1]: > https://delta.io/blog/2023-01-25-delta-lake-small-file-compaction-optimize/ > > On Thu, Oct 5, 2023 at 12:53 PM Raghavendra Ganesh > wrote: > > > > Hi, > > What is the purpose for which you want to use repartition()

Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Shao Yang Hong
-python [1]: https://delta.io/blog/2023-01-25-delta-lake-small-file-compaction-optimize/ On Thu, Oct 5, 2023 at 12:53 PM Raghavendra Ganesh wrote: > > Hi, > What is the purpose for which you want to use repartition() .. to reduce the > number of files in delta? > Also note t

[PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Shao Yang Hong
Hi all on user@spark: We are looking for advice and suggestions on how to tune the .repartition() parameter. We are using Spark Streaming on our data pipeline to consume messages and persist them to a Delta Lake (https://delta.io/learn/getting-started/). We read messages from a Kafka topic

Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Raghavendra Ganesh
Hi, What is the purpose for which you want to use repartition() .. to reduce the number of files in delta? Also note that there is an alternative option of using coalesce() instead of repartition(). -- Raghavendra On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong wrote: > Hi all on user@sp

[PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Shao Yang Hong
Hi all on user@spark: We are looking for advice and suggestions on how to tune the .repartition() parameter. We are using Spark Streaming on our data pipeline to consume messages and persist them to a Delta Lake (https://delta.io/learn/getting-started/). We read messages from a Kafka topic

Re: repartition(n) should be deprecated/alerted

2022-06-22 Thread Igor Berman
scenarios cause few hundreds rows to be duplicated and same amount to be dropped(since one precision shift, might shift few hundreds of rows in local sort done by repartiton(n)) Maybe what I'm trying to say is that repartition documentation is not hinting in any way that this might happen and mayb

Re: repartition(n) should be deprecated/alerted

2022-06-22 Thread Sean Owen
Eh, there is a huge caveat - you are making your input non-deterministic, where determinism is assumed. I don't think that supports such a drastic statement. On Wed, Jun 22, 2022 at 12:39 PM Igor Berman wrote: > Hi All > tldr; IMHO repartition(n) should be deprecated or red-flagg

repartition(n) should be deprecated/alerted

2022-06-22 Thread Igor Berman
Hi All tldr; IMHO repartition(n) should be deprecated or red-flagged, so that everybody will understand consequences of usage of this method Following conversation in https://issues.apache.org/jira/browse/SPARK-38388 (still relevant for recent versions of spark) I think it's very importa

[Spark DataFrame]: How to solve data skew after repartition?

2021-11-01 Thread ly
When Spark loads data into object storage systems like HDFS, S3 etc, it can result in large number of small files. To solve this problem, a common method is to repartition before writing the results. However, this may cause data skew. If the number of distinct value of the repartitioned key is

Re: Repartition or Coalesce not working

2021-03-22 Thread KhajaAsmath Mohammed
Thanks Sean.I just realized it. Let me try that. On Mon, Mar 22, 2021 at 12:31 PM Sean Owen wrote: > You need to do something with the result of repartition. You haven't > changed textDF > > On Mon, Mar 22, 2021, 12:15 PM KhajaAsmath Mohammed < > mdkhajaasm...@gmail.com&g

Re: Repartition or Coalesce not working

2021-03-22 Thread Sean Owen
You need to do something with the result of repartition. You haven't changed textDF On Mon, Mar 22, 2021, 12:15 PM KhajaAsmath Mohammed wrote: > Hi, > > I have a use case where there are large files in hdfs. > > Size of the file is 3 GB. > > It is an existing code in

Repartition or Coalesce not working

2021-03-22 Thread KhajaAsmath Mohammed
Hi, I have a use case where there are large files in hdfs. Size of the file is 3 GB. It is an existing code in production and I am trying to improve the performance of the job. Sample Code: textDF=dataframe ( This is dataframe that got created from hdfs path) logging.info("Number of partitions"

Re: Correctness bug on Shuffle+Repartition scenario

2021-01-18 Thread 王长春
is not because SPARK-23243. As you said ,if it was caused by ‘first’ before ‘repartition’, then how to solve this problem fundamentally. And is there any workaround? > 2021年1月18日 上午10:35,Shiao-An Yuan 写道: > > Hi, > I am using Spark 2.4.4 standalone mode. > > On Mon, Jan 1

Re: Correctness bug on Shuffle+Repartition scenario

2021-01-17 Thread Shiao-An Yuan
import scala.sys.process._ >> import org.apache.spark.sql.functions._ >> import com.google.common.hash.Hashing >> val murmur3 = Hashing.murmur3_32() >> >> // create a Dataset with the cardinality of the second element equals >> 5. >> val ds = spark.range(0, 10, 1, 130).ma

Re: Correctness bug on Shuffle+Repartition scenario

2021-01-17 Thread Mich Talebzadeh
ing(pkey)) // generate key > .reduceGroups(_.merge(_)) // > spark.sql.shuffle.partitions=200 > .map(_._2) // drop key > > newSnapshot > .repartition(60) // (1) > .write

Re: Correctness bug on Shuffle+Repartition scenario

2021-01-17 Thread Gourav Sengupta
e(0, 10, 1, 130).map(i => > (murmur3.hashLong(i).asInt(), i/2)) > > ds.groupByKey(_._2) > .agg(first($"_1").as[Long]) > .repartition(200) > .map { x => > if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitio

Re: Correctness bug on Shuffle+Repartition scenario

2021-01-17 Thread Sean Owen
Dataset with the cardinality of the second element equals > 5. > val ds = spark.range(0, 10, 1, 130).map(i => > (murmur3.hashLong(i).asInt(), i/2)) > > ds.groupByKey(_._2) > .agg(first($"_1").as[Long]) > .repartition(200) > .map { x

Re: Correctness bug on Shuffle+Repartition scenario

2021-01-17 Thread Shiao-An Yuan
uot;).as[Long]) .repartition(200) .map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId == 100 && TaskContext.get.stageAttemptNumber == 0) { throw new Exception("pkill -f CoarseGrainedExecutorBackend".!!) } x } .map(_._2).distinct(

Re: Correctness bug on Shuffle+Repartition scenario

2020-12-29 Thread Sean Owen
00 partitions) >>> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log] >>> >>> // multiple small parquet files >>> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log] >>> >>> val newSnapshot = currentSnapshot.union(newAddedLog) >>&g

Re: Correctness bug on Shuffle+Repartition scenario

2020-12-29 Thread Shiao-An Yuan
parquet files >> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log] >> >> val newSnapshot = currentSnapshot.union(newAddedLog) >> .groupByKey(new String(pkey)) // generate key >> .reduceGroups(_.merge(_))// >

Re: Correctness bug on Shuffle+Repartition scenario

2020-12-29 Thread Sean Owen
// generate key > .reduceGroups(_.merge(_))// > spark.sql.shuffle.partitions=200 > .map(_._2) // drop key > > newSnapshot > .repartition(60) // (1) > .write.parquet(newPath) > ``` >

Correctness bug on Shuffle+Repartition scenario

2020-12-29 Thread Shiao-An Yuan
Snapshot = currentSnapshot.union(newAddedLog) .groupByKey(new String(pkey)) // generate key .reduceGroups(_.merge(_))// spark.sql.shuffle.partitions=200 .map(_._2) // drop key newSnapshot .repart

Re: repartition in Spark

2020-11-09 Thread Mich Talebzadeh
As a generic answer in a distributed environment like spark, making sure that data is distributed evenly among all nodes (assuming every node is the same or similar) can help performance repartition thus controls the data distribution among all nodes. However, it is not that straight forward

repartition in Spark

2020-11-09 Thread ashok34...@yahoo.com.INVALID
Hi, Just need some advise. - When we have multiple spark nodes running code, under what conditions a repartition make sense? - Can we repartition and cache the result --> df = spark.sql("select from ...").repartition(4).cache - If we choose a repartition (4), will tha

Data Explosion and repartition before group bys

2020-06-26 Thread lsn24
f executor memory when we tried to aggregate. Repartition after the data explosion to address the data skew is killing us. What other ways can we address this problem ? Note : A transaction is marked as an ATM transaction or a cross border transaction by a boolean value. -- Sent from: http://ap

Re: [pyspark 2.3+] repartition followed by window function

2019-05-22 Thread Shraddha Shah
rt again I believe.. Can someone > confirm this? > > However what happens when dataframe repartition was done using (date, id) > columns, but window function which follows repartition needs a partition by > clause with (date, id, col3, col4) columns ? Would spark reshuffle the > d

[pyspark 2.3+] repartition followed by window function

2019-05-22 Thread Rishi Shah
Hi All, If dataframe is repartitioned in memory by (date, id) columns and then if I use multiple window functions which uses partition by clause with (date, id) columns --> we can avoid shuffle/sort again I believe.. Can someone confirm this? However what happens when dataframe repartition

Re: repartition in df vs partitionBy in df

2019-04-24 Thread moqi
Hello, There is another link here that I hope will help you. https://stackoverflow.com/questions/33831561/pyspark-repartition-vs-partitionby In particular, when you are faced with possible data skew or have some partitioned parameters that need to be obtained at runtime, you can refer to this

Re: repartition in df vs partitionBy in df

2019-04-24 Thread moqi
Hello, there is another link to discuss the difference between the two methods. Https://stackoverflow.com/questions/33831561/pyspark-repartition-vs-partitionby In particular, when you are faced with possible data skew or have some partitioned parameters that need to be obtained at runtime, you

Re: repartition in df vs partitionBy in df

2019-04-24 Thread rajat kumar
hello, thanks for quick reply. got it . partitionBy is to create something like hive partitions. but when do we use repartition actually? how to decide whether to do repartition or not? because in development we are getting sample data. also what number should I give while repartition. thanks On

Re: repartition in df vs partitionBy in df

2019-04-24 Thread moqi
Hello, I think you can refer to this link and hope to help you. https://stackoverflow.com/questions/40416357/spark-sql-difference-between-df-repartition-and-dataframewriter-partitionby/40417992 -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com

Re: repartition in df vs partitionBy in df

2019-04-24 Thread rajat kumar
Hi All, Can anyone explain? thanks rajat On Sun, 21 Apr 2019, 00:18 kumar.rajat20del Hi Spark Users, > > repartition and partitionBy seems to be very same in Df. > In which scenario we use one? > > As per my understanding repartition is very expensive operation as it needs >

repartition in df vs partitionBy in df

2019-04-20 Thread kumar.rajat20del
Hi Spark Users, repartition and partitionBy seems to be very same in Df. In which scenario we use one? As per my understanding repartition is very expensive operation as it needs full shuffle then when do we use repartition ? Thanks Rajat -- Sent from: http://apache-spark-user-list.1001560

Spark 2.x duplicates output when task fails at "repartition" stage. Checkpointing is enabled before repartition.

2019-02-05 Thread Serega Sheypak
Hi, I have spark job that produces duplicates when one or tasks from repartition stage fails. Here is simplified code. sparkContext.setCheckpointDir("hdfs://path-to-checkpoint-dir") *val *inputRDDs: List[RDD[String]] = *List*.*empty *// an RDD per input dir *val *updatedRDDs = inp

How to repartition Spark DStream Kafka ConsumerRecord RDD.

2018-09-28 Thread Alchemist
 How to repartition Spark DStream Kafka ConsumerRecord RDD.  I am getting uneven size of Kafka topics.. We want to repartition the input RDD based on some logic.  But when I try to apply the repartition I am getting "object not serializable (

Re: repartition

2018-07-08 Thread Vamshi Talla
Hi Ravi, RDDs are always immutable, so you cannot change them, instead you create new ones by transforming one. Repartition is a transformation, so it lazily evaluated, hence computed only when you call an action on it. Thanks. Vamshi Talla On Jul 8, 2018, at 12:26 PM, mailto:ryanda

repartition

2018-07-08 Thread ryandam.9
Hi, Can anyone clarify how repartition works please ? * I have a DataFrame df which has only one partition: // Returns 1 df.rdd.getNumPartitions * I repartitioned it by passing "3" and assigned it a new DataFrame newdf val newdf = df.rep

Re: Repartition not working on a csv file

2018-07-01 Thread Abdeali Kothari
I prefer not to do a .cache() due to memory limits. But I did try a persist() with DISK_ONLY I did the repartition(), followed by a .count() followed by a persist() of DISK_ONLY That didn't change the number of tasks either On Sun, Jul 1, 2018, 15:50 Alexander Czech wrote: > You coul

Re: Repartition not working on a csv file

2018-07-01 Thread Alexander Czech
t; time to process a single record goes up-to 5mins on 1 executor >> > >> > I'm trying to increase the partitions that my data is in so that I have >> at >> > maximum 1 record per executor (currently it sets 2 tasks, and hence 2 >> > executors... I want it

Re: Repartition not working on a csv file

2018-06-30 Thread Abdeali Kothari
t; executors... I want it to split it into at least 100 tasks at a time so I > > get 5 records per task => ~20min per task) > > Maybe you can try repartition(100) after broadcast join, the task number > should change to 100 fo

Re: Repartition not working on a csv file

2018-06-30 Thread yujhe.li
executor > > I'm trying to increase the partitions that my data is in so that I have at > maximum 1 record per executor (currently it sets 2 tasks, and hence 2 > executors... I want it to split it into at least 100 tasks at a time so I > get 5 records per task => ~20min per task)

Re: Repartition not working on a csv file

2018-06-30 Thread Abdeali Kothari
My entire CSV is less than 20KB. By somewhere in between, I do a broadcast join with 3500 records in another file. After the broadcast join I have a lot of processing to do. Overall, the time to process a single record goes up-to 5mins on 1 executor I'm trying to increase the partitions that my da

Re: Repartition not working on a csv file

2018-06-30 Thread yujhe.li
Abdeali Kothari wrote > I am using Spark 2.3.0 and trying to read a CSV file which has 500 > records. > When I try to read it, spark says that it has two stages: 10, 11 and then > they join into stage 12. What's your CSV size per file? I think Spark optimizer may put many files into one task when

Repartition not working on a csv file

2018-06-18 Thread Abdeali Kothari
and then save the file as parquet. The stages 10 and 11 have only 2 tasks according to spark. I have a max-executors possible of 20 on my cluster. I would like Spark to use all 20 executors for this task. *1csv+Repartition*: Right after reading the file, if I do a repartition, it still takes *2

Spark 1.6 change the number partitions without repartition and without shuffling

2018-06-13 Thread Spico Florin
uot;mapred.max.split.size", 1024*50) My question is: How to achieve the same behavior (to get the desired number of partitions) when using Spark 1.6 (without repartition method and without any method that incurs shuffling)? I look forward for your answers. Regards, Florin

Re: [Structured Streaming] [Kafka] How to repartition the data and distribute the processing among worker nodes

2018-04-20 Thread Bowden, Chris
The primary role of a sink is storing output tuples. Consider groupByKey and map/flatMapGroupsWithState instead. -Chris From: karthikjay Sent: Friday, April 20, 2018 4:49:49 PM To: user@spark.apache.org Subject: [Structured Streaming] [Kafka] How to repartition

[Structured Streaming] [Kafka] How to repartition the data and distribute the processing among worker nodes

2018-04-20 Thread karthikjay
Any help appreciated. please find the question in the link: https://stackoverflow.com/questions/49951022/spark-structured-streaming-with-kafka-how-to-repartition-the-data-and-distribu -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com

Re: Nullpointerexception error when in repartition

2018-04-12 Thread Junfeng Chen
/ convert >>>>> json to df >>>>> JavaRDD rowJavaRDD = df.javaRDD().map... //add some new >>>>> fields >>>>> StructType type = df.schema()...; // constuct new type for new >>>>> added fields >>>>> Dat

Re: Nullpointerexception error when in repartition

2018-04-12 Thread Tathagata Das
oreachRDD(VoldFunction >>>> stringjavardd->{ >>>> Dataset df = spark.read().json( stringjavardd ); // convert >>>> json to df >>>> JavaRDD rowJavaRDD = df.javaRDD().map... //add some new fields >>>> StructType type = df.schema()...;

Re: Nullpointerexception error when in repartition

2018-04-12 Thread Junfeng Chen
type = df.schema()...; // constuct new type for new added >>> fields >>> Dataset>> //create new dataframe >>> newdf.repatition(taskNum).write().mode(SaveMode.Append).pati >>> tionedBy("appname").parquet(savepath);

Re: Nullpointerexception error when in repartition

2018-04-12 Thread Tathagata Das
ionedBy("appname").parquet(savepath); // save to parquet >> }) > > > > However, if I remove the repartition method of newdf in writing parquet > stage, the program always throw nullpointerexception error in json convert > line: > > Java.lang.NullPointerExceptio

Nullpointerexception error when in repartition

2018-04-11 Thread Junfeng Chen
(SaveMode.Append).patitionedBy("appname").parquet(savepath); > // save to parquet > }) However, if I remove the repartition method of newdf in writing parquet stage, the program always throw nullpointerexception error in json convert line: Java.lang.NullPointerExcepti

Bucket vs repartition

2017-10-31 Thread אורן שמון
Hi all, I have 2 spark jobs one is pre-process and the second is the process. Process job needs to calculate for each user in the data. I want to avoid shuffle like groupBy so I think about to save the result of the pre-process as bucket by user in Parquet or to re-partition by user and save the r

Fwd: Repartition vs PartitionBy Help/Understanding needed

2017-06-16 Thread Aakash Basu
Hi all, Can somebody put some light on this pls? Thanks, Aakash. -- Forwarded message -- From: "Aakash Basu" Date: 15-Jun-2017 2:57 PM Subject: Repartition vs PartitionBy Help/Understanding needed To: "user" Cc: Hi all, > > Everybody is giving a diffe

Repartition vs PartitionBy Help/Understanding needed

2017-06-15 Thread Aakash Basu
Hi all, Everybody is giving a difference between coalesce and repartition, but nowhere I found a difference between partitionBy and repartition. My question is, is it better to write a data set in parquet partitioning by a column and then reading the respective directories to work on that column

Spark repartition question...

2017-04-30 Thread Muthu Jayakumar
Hello there, I am trying to understand the difference between the following reparition()... a. def repartition(partitionExprs: Column*): Dataset[T] b. def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] c. def repartition(numPartitions: Int): Dataset[T] My understanding is

Re: Pyspark 2.1.0 weird behavior with repartition

2017-03-11 Thread Olivier Girardot
e', u'f', u'g', u'h', u'i', u'j', u'k', u'l'] > >>> sc.textFile("outc", use_unicode=False).collect() > ['a', 'b', 'c', 'd', 'e', 'f', 'g'

Repartition function duplicates data

2017-02-12 Thread F. Amara
is to partition the data I receive. So I used repartition() to do the above task but this resulted in duplicating the same set of data across all partitions rather than dividing (i.e., hashing) it according to the number of partitions specified. Could anyone please explain a solution? I have shown a s

Pyspark 2.1.0 weird behavior with repartition

2017-01-30 Thread Blaž Šnuderl
#x27;d', u'e', u'f', u'g', u'h', u'i', u'j', u'k', u'l'] >>> sc.textFile("outc", use_unicode=False).collect() ['a', 'b', 'c', 'd', 'e', 'f', 'g

Re: Dataset doesn't have partitioner after a repartition on one of the columns

2016-09-28 Thread Igor Berman
; > d.mcbe...@elsevier.com> wrote: > >> I’m using Spark 2.0. >> >> I’ve created a dataset from a parquet file and repartition on one of the >> columns (docId) and persist the repartitioned dataset. >> >> val om = ds.repartition($"docId”).persist(Storage

Re: Dataset doesn't have partitioner after a repartition on one of the columns

2016-09-28 Thread Michael Armbrust
ml>that walks through what we do expose and how it is used by the query planner. Michael On Tue, Sep 20, 2016 at 11:22 AM, McBeath, Darin W (ELS-STL) < d.mcbe...@elsevier.com> wrote: > I’m using Spark 2.0. > > I’ve created a dataset from a parquet file and repartition on o

Dataset doesn't have partitioner after a repartition on one of the columns

2016-09-20 Thread McBeath, Darin W (ELS-STL)
I'm using Spark 2.0. I've created a dataset from a parquet file and repartition on one of the columns (docId) and persist the repartitioned dataset. val om = ds.repartition($"docId").persist(StorageLevel.MEMORY_AND_DISK) When I try to confirm the partitioner, with om.rd

Re: SparkR error when repartition is called

2016-08-09 Thread Felix Cheung
, August 9, 2016 12:19 AM Subject: Re: SparkR error when repartition is called To: Sun Rui mailto:sunrise_...@163.com>> Cc: User mailto:user@spark.apache.org>> Sun, I am using spark in yarn client mode in a 2-node cluster with hadoop-2.7.2. My R version is 3.3.1. I have the followi

Re: SparkR error when repartition is called

2016-08-09 Thread Shane Lee
nvironment information? On Aug 9, 2016, at 11:35, Shane Lee wrote: Hi All, I am trying out SparkR 2.0 and have run into an issue with repartition.  Here is the R code (essentially a port of the pi-calculating scala example in the spark package) that can reproduce the behavior: schema <- structTyp

Re: SparkR error when repartition is called

2016-08-08 Thread Sun Rui
I can’t reproduce your issue with len=1 in local mode. Could you give more environment information? > On Aug 9, 2016, at 11:35, Shane Lee wrote: > > Hi All, > > I am trying out SparkR 2.0 and have run into an issue with repartition. > > Here is the R code (essenti

SparkR error when repartition is called

2016-08-08 Thread Shane Lee
Hi All, I am trying out SparkR 2.0 and have run into an issue with repartition.  Here is the R code (essentially a port of the pi-calculating scala example in the spark package) that can reproduce the behavior: schema <- structType(structField("input", "integer"), 

Best practices repartition key

2016-04-22 Thread nihed mbarek
Hi, I'm looking for documentation or best practices about choosing a key or keys for repartition of dataframe or rdd Thank you MBAREK nihed -- M'BAREK Med Nihed, Fedora Ambassador, TUNISIA, Northern Africa http://www.nihed.com <http://tn.linkedin.com/in/nihed>

Re: data frame problem preserving sort order with repartition() and coalesce()

2016-03-29 Thread Takeshi Yamamuro
1 topTags_CSV/part-00016.csv > > $ > > numRowsPerCSVFile = 100 > > numRows = resultDF.count() > > quotient, remander = divmod(numRows, numRowsPerCSVFile) > > numPartitions = (quotient + 1) if remander > 0 else quotient > > ​ > > debugStr = ("numRows

data frame problem preserving sort order with repartition() and coalesce()

2016-03-29 Thread Andy Davidson
1 topTags_CSV/part-00016.csv $ numRowsPerCSVFile = 100 numRows = resultDF.count() quotient, remander = divmod(numRows, numRowsPerCSVFile) numPartitions = (quotient + 1) if remander > 0 else quotient ​ debugStr = ("numRows:{0} quotient:{1} remander:{2} repartition({3})"

Re: Repartition taking place for all previous windows even after checkpointing

2016-02-01 Thread Abhishek Anand
Any insights on this ? On Fri, Jan 29, 2016 at 1:08 PM, Abhishek Anand wrote: > Hi All, > > Can someone help me with the following doubts regarding checkpointing : > > My code flow is something like follows -> > > 1) create direct stream from kafka > 2) repartition k

Repartition taking place for all previous windows even after checkpointing

2016-01-28 Thread Abhishek Anand
Hi All, Can someone help me with the following doubts regarding checkpointing : My code flow is something like follows -> 1) create direct stream from kafka 2) repartition kafka stream 3) mapToPair followed by reduceByKey 4) filter 5) reduceByKeyAndWindow without the inverse functio

Re: what is the difference between coalese() and repartition() ?Re: trouble understanding data frame memory usage ³java.io.IOException: Unable to acquire memory²

2015-12-28 Thread Hyukjin Kwon
Hi Andy, This link explains the difference well. https://bzhangusc.wordpress.com/2015/08/11/repartition-vs-coalesce/ Simply the difference is whether it "repartitions" partitions or not. Actually coalesce() with suffering performs exactly woth repartition(). On 29 Dec 2015 08

what is the difference between coalese() and repartition() ?Re: trouble understanding data frame memory usage ³java.io.IOException: Unable to acquire memory²

2015-12-28 Thread Andy Davidson
Hi Michael I’ll try 1.6 and report back. The java doc does not say much about coalesce() or repartition(). When I use reparation() just before I save my output everything runs as expected I though coalesce() is an optimized version of reparation() and should be used when ever we know we are

Re: is repartition very cost

2015-12-09 Thread Daniel Siegmann
, use spark.sql.shuffle.partitions. I recommend you do not do any explicit partitioning or mess with these values until you find a need for it. If executors are sitting idle, that's a sign you may need to repartition. On Tue, Dec 8, 2015 at 9:35 PM, Zhiliang Zhu wrote: > Thanks very much for Yong'

Re: is repartition very cost

2015-12-08 Thread Zhiliang Zhu
}#yiv1938266569 div.yiv1938266569WordSection1 {}#yiv1938266569 Shuffling large amounts of data over the network is expensive, yes. The cost is lower if you are just using a single node where no networking needs to be involved to do the repartition (using Spark as a multithreading engine).   In general

RE: is repartition very cost

2015-12-08 Thread Young, Matthew T
Shuffling large amounts of data over the network is expensive, yes. The cost is lower if you are just using a single node where no networking needs to be involved to do the repartition (using Spark as a multithreading engine). In general you need to do performance testing to see if a

is repartition very cost

2015-12-08 Thread Zhiliang Zhu
Hi All, I need to do optimize objective function with some linear constraints by   genetic algorithm. I would like to make as much parallelism for it by spark. repartition / shuffle may be used sometimes in it, however, is repartition API very cost ? Thanks in advance!Zhiliang

Re: repartition vs partitionby

2015-10-18 Thread shahid ashraf
yes i am trying to do so. but it will try to repartition whole data.. can't we split a large partition(data skewed partition) into multiple partitions (any idea on this.). On Sun, Oct 18, 2015 at 1:55 AM, Adrian Tanase wrote: > If the dataset allows it you can try to write a custom par

Re: repartition vs partitionby

2015-10-17 Thread Adrian Tanase
If the dataset allows it you can try to write a custom partitioner to help spark distribute the data more uniformly. Sent from my iPhone On 17 Oct 2015, at 16:14, shahid ashraf mailto:sha...@trialx.com>> wrote: yes i know about that,its in case to reduce partitions. the point here is the data

Re: repartition vs partitionby

2015-10-17 Thread shahid ashraf
yes i know about that,its in case to reduce partitions. the point here is the data is skewed to few partitions.. On Sat, Oct 17, 2015 at 6:27 PM, Raghavendra Pandey < raghavendra.pan...@gmail.com> wrote: > You can use coalesce function, if you want to reduce the number of > partitions. This one

Re: repartition vs partitionby

2015-10-17 Thread Raghavendra Pandey
You can use coalesce function, if you want to reduce the number of partitions. This one minimizes the data shuffle. -Raghav On Sat, Oct 17, 2015 at 1:02 PM, shahid qadri wrote: > Hi folks > > I need to reparation large set of data around(300G) as i see some portions > have large data(data skew)

repartition vs partitionby

2015-10-17 Thread shahid qadri
Hi folks I need to reparation large set of data around(300G) as i see some portions have large data(data skew) i have pairRDDs [({},{}),({},{}),({},{})] what is the best way to solve the the problem - To unsubscribe, e-mail: us

Re: DataFrame repartition not repartitioning

2015-09-16 Thread Silvio Fiorito
uot;user@spark.apache.org<mailto:user@spark.apache.org>" Subject: DataFrame repartition not repartitioning Hello, I'm trying to load in an Avro file and write it out as Parquet. I would like to have enough partitions to properly parallelize on. When I do the simple load and save I get 1

DataFrame repartition not repartitioning

2015-09-16 Thread Steve Annessa
Hello, I'm trying to load in an Avro file and write it out as Parquet. I would like to have enough partitions to properly parallelize on. When I do the simple load and save I get 1 partition out. I thought I would be able to use repartition like the following: val avr

Re: Memory-efficient successive calls to repartition()

2015-09-08 Thread Aurélien Bellet
I don't know what you are doing in the rest of the code : - There is lot of non hdfs writes, it comes from the rest of your code and/or repartittion(). Repartition involve a shuffling and creation of files on disk. I would have said that the problem co

Re: Memory-efficient successive calls to repartition()

2015-09-08 Thread Aurélien Bellet
and/or repartittion(). Repartition involve a shuffling and creation of files on disk. I would have said that the problem come from that but I just checked and checkpoint() is supposed to delete shuffle files : https://forums.databricks.com/questions/277/how-do

Re: Problem with repartition/OOM

2015-09-06 Thread Yana Kadiyska
preparation for more memory-intensive >> operations (I'm happy to wait, I just need the job to complete). >> Repartition seems to cause an OOM for me? >> Could someone shed light/or speculate/ why this would happen -- I thought >> we repartition higher to r

Re: Problem with repartition/OOM

2015-09-05 Thread Yanbo Liang
However, I'd prefer to > increase the number of partitions in preparation for more memory-intensive > operations (I'm happy to wait, I just need the job to complete). > Repartition seems to cause an OOM for me? > Could someone shed light/or speculate/ why this would happen -- I thoug

Problem with repartition/OOM

2015-09-05 Thread Yana Kadiyska
the job to complete). Repartition seems to cause an OOM for me? Could someone shed light/or speculate/ why this would happen -- I thought we repartition higher to relieve memory pressure? Im using Spark1.4.1 CDH4 if that makes a difference This works val res2 = sqlContext.parquetFile(lst

Re: repartition on direct kafka stream

2015-09-04 Thread Shushant Arora
first transformation/action on directkafkastream (be it repartition() or mapPartition()) made directKafkaStream to evaluate and then repartition made the data to shuffled and then mapPartition is called on shuffled data. On Fri, Sep 4, 2015 at 10:33 PM, Cody Koeninger wrote: > The answer alre

Re: repartition on direct kafka stream

2015-09-04 Thread Cody Koeninger
The answer already given is correct. You shouldn't doubt this, because you've already seen the shuffle data change accordingly. On Fri, Sep 4, 2015 at 11:25 AM, Shushant Arora wrote: > But Kafka stream has underlyng RDD which consists of offsets reanges only- > so how does r

Re: repartition on direct kafka stream

2015-09-04 Thread Shushant Arora
But Kafka stream has underlyng RDD which consists of offsets reanges only- so how does repartition works ? 1. First it evaluates the transformation and then repartition 2.or first it repartition and then transform. - In this case data should not be transformed rather offset ranges only should be

Re: repartition on direct kafka stream

2015-09-03 Thread Saisai Shao
Yes not the offset ranges, but the real data will be shuffled when you using repartition(). Thanks Saisai On Fri, Sep 4, 2015 at 12:42 PM, Shushant Arora wrote: > 1.Does repartitioning on direct kafka stream shuffles only the offsets or > exact kafka messages across executors? > >

repartition on direct kafka stream

2015-09-03 Thread Shushant Arora
ons in kafka. Now only the offset ranges should be shuffled to executors not exact kafka messages? But I am seeing a very large size of shuffles data read/write on streaming ui. When I remove this repartition - shuffle read /write becomes 0.

ERROR WHILE REPARTITION

2015-09-02 Thread shahid ashraf
) from shutdown hook 15/09/02 21:12:43 INFO TaskSchedulerImpl: Cancelling stage 10 15/09/02 21:12:43 INFO Executor: Executor is trying to kill task 4.0 in stage 10.0 (TID 64) 15/09/02 21:12:43 INFO TaskSchedulerImpl: Stage 10 was cancelled 15/09/02 21:12:43 INFO DAGScheduler: ShuffleM

  1   2   >