I might have misunderstood the issue.
Spark indeed will repartition the data while writing, what it won't do is
write precisely 10 files inside each date partition folder sorted by col x.
Typically this kind of fine grained write config is useful if there's a
downstream consumer that wi
Henryk Česnolovič
08:30 (5 hours ago)
to Soumasish
Ok nvm. Seems we don't need to do repartition, as spark handles itself.
df.writeTo("some_table").partitionedBy(col("date"), col("x"), bucket(10,
col("y"))).using("iceberg").createOrRepl
Henryk,
I could reproduce your issue and achieve the desired result using SQL DDL.
Here's the workaround.
package replicator
import org.apache.spark.sql.SparkSession
object Bucketing extends App {
val spark = SparkSession.builder()
.appName("ReproduceError")
.master("local[*]")
Hello.
Maybe somebody has faced the same issue. Trying to write data to the table
while using DataFrame API v2. Table is partitioned by buckets using
df.writeTo("some_table").partitionedBy(col("date"), col("x"), bucket(10,
col("y"))).using("iceberg").createOrReplace()
Can I somehow prepare df in
,
the so called vcores. So it depends on the number of nodes you are using in
your spark cluster.
Without doing a PoC you would not need to worry about repartition(10) in
your writeStream. I suggest that for now you remove that parameter and
observe the spark processing through Spark GUI (default port
atest/optimizations-oss.html#language-python
> [1]:
> https://delta.io/blog/2023-01-25-delta-lake-small-file-compaction-optimize/
>
> On Thu, Oct 5, 2023 at 12:53 PM Raghavendra Ganesh
> wrote:
> >
> > Hi,
> > What is the purpose for which you want to use repartition()
-python
[1]: https://delta.io/blog/2023-01-25-delta-lake-small-file-compaction-optimize/
On Thu, Oct 5, 2023 at 12:53 PM Raghavendra Ganesh
wrote:
>
> Hi,
> What is the purpose for which you want to use repartition() .. to reduce the
> number of files in delta?
> Also note t
Hi all on user@spark:
We are looking for advice and suggestions on how to tune the
.repartition() parameter.
We are using Spark Streaming on our data pipeline to consume messages
and persist them to a Delta Lake
(https://delta.io/learn/getting-started/).
We read messages from a Kafka topic
Hi,
What is the purpose for which you want to use repartition() .. to reduce
the number of files in delta?
Also note that there is an alternative option of using coalesce() instead
of repartition().
--
Raghavendra
On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong
wrote:
> Hi all on user@sp
Hi all on user@spark:
We are looking for advice and suggestions on how to tune the
.repartition() parameter.
We are using Spark Streaming on our data pipeline to consume messages
and persist them to a Delta Lake
(https://delta.io/learn/getting-started/).
We read messages from a Kafka topic
scenarios cause few hundreds rows to be
duplicated and same amount to be dropped(since one precision shift, might
shift few hundreds of rows in local sort done by repartiton(n))
Maybe what I'm trying to say is that repartition documentation is not
hinting in any way that this might happen and mayb
Eh, there is a huge caveat - you are making your input non-deterministic,
where determinism is assumed. I don't think that supports such a drastic
statement.
On Wed, Jun 22, 2022 at 12:39 PM Igor Berman wrote:
> Hi All
> tldr; IMHO repartition(n) should be deprecated or red-flagg
Hi All
tldr; IMHO repartition(n) should be deprecated or red-flagged, so that
everybody will understand consequences of usage of this method
Following conversation in https://issues.apache.org/jira/browse/SPARK-38388
(still relevant for recent versions of spark) I think it's very importa
When Spark loads data into object storage systems like HDFS, S3 etc, it can
result in large number of small files. To solve this problem, a common method
is to repartition before writing the results. However, this may cause data
skew. If the number of distinct value of the repartitioned key is
Thanks Sean.I just realized it. Let me try that.
On Mon, Mar 22, 2021 at 12:31 PM Sean Owen wrote:
> You need to do something with the result of repartition. You haven't
> changed textDF
>
> On Mon, Mar 22, 2021, 12:15 PM KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com&g
You need to do something with the result of repartition. You haven't
changed textDF
On Mon, Mar 22, 2021, 12:15 PM KhajaAsmath Mohammed
wrote:
> Hi,
>
> I have a use case where there are large files in hdfs.
>
> Size of the file is 3 GB.
>
> It is an existing code in
Hi,
I have a use case where there are large files in hdfs.
Size of the file is 3 GB.
It is an existing code in production and I am trying to improve the
performance of the job.
Sample Code:
textDF=dataframe ( This is dataframe that got created from hdfs path)
logging.info("Number of partitions"
is not because SPARK-23243.
As you said ,if it was caused by ‘first’ before ‘repartition’, then how to
solve this problem fundamentally. And is there any workaround?
> 2021年1月18日 上午10:35,Shiao-An Yuan 写道:
>
> Hi,
> I am using Spark 2.4.4 standalone mode.
>
> On Mon, Jan 1
import scala.sys.process._
>> import org.apache.spark.sql.functions._
>> import com.google.common.hash.Hashing
>> val murmur3 = Hashing.murmur3_32()
>>
>> // create a Dataset with the cardinality of the second element equals
>> 5.
>> val ds = spark.range(0, 10, 1, 130).ma
ing(pkey)) // generate key
> .reduceGroups(_.merge(_)) //
> spark.sql.shuffle.partitions=200
> .map(_._2) // drop key
>
> newSnapshot
> .repartition(60) // (1)
> .write
e(0, 10, 1, 130).map(i =>
> (murmur3.hashLong(i).asInt(), i/2))
>
> ds.groupByKey(_._2)
> .agg(first($"_1").as[Long])
> .repartition(200)
> .map { x =>
> if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitio
Dataset with the cardinality of the second element equals
> 5.
> val ds = spark.range(0, 10, 1, 130).map(i =>
> (murmur3.hashLong(i).asInt(), i/2))
>
> ds.groupByKey(_._2)
> .agg(first($"_1").as[Long])
> .repartition(200)
> .map { x
uot;).as[Long])
.repartition(200)
.map { x =>
if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId
== 100 && TaskContext.get.stageAttemptNumber == 0) {
throw new Exception("pkill -f CoarseGrainedExecutorBackend".!!)
}
x
}
.map(_._2).distinct(
00 partitions)
>>> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]
>>>
>>> // multiple small parquet files
>>> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
>>>
>>> val newSnapshot = currentSnapshot.union(newAddedLog)
>>&g
parquet files
>> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
>>
>> val newSnapshot = currentSnapshot.union(newAddedLog)
>> .groupByKey(new String(pkey)) // generate key
>> .reduceGroups(_.merge(_))//
>
// generate key
> .reduceGroups(_.merge(_))//
> spark.sql.shuffle.partitions=200
> .map(_._2) // drop key
>
> newSnapshot
> .repartition(60) // (1)
> .write.parquet(newPath)
> ```
>
Snapshot = currentSnapshot.union(newAddedLog)
.groupByKey(new String(pkey)) // generate key
.reduceGroups(_.merge(_))//
spark.sql.shuffle.partitions=200
.map(_._2) // drop key
newSnapshot
.repart
As a generic answer in a distributed environment like spark, making sure
that data is distributed evenly among all nodes (assuming every node is the
same or similar) can help performance
repartition thus controls the data distribution among all nodes. However,
it is not that straight forward
Hi,
Just need some advise.
- When we have multiple spark nodes running code, under what conditions a
repartition make sense?
- Can we repartition and cache the result --> df = spark.sql("select from
...").repartition(4).cache
- If we choose a repartition (4), will tha
f executor memory when we tried to
aggregate. Repartition after the data explosion to address the data skew is
killing us.
What other ways can we address this problem ?
Note : A transaction is marked as an ATM transaction or a cross border
transaction by a boolean value.
--
Sent from: http://ap
rt again I believe.. Can someone
> confirm this?
>
> However what happens when dataframe repartition was done using (date, id)
> columns, but window function which follows repartition needs a partition by
> clause with (date, id, col3, col4) columns ? Would spark reshuffle the
> d
Hi All,
If dataframe is repartitioned in memory by (date, id) columns and then if I
use multiple window functions which uses partition by clause with (date,
id) columns --> we can avoid shuffle/sort again I believe.. Can someone
confirm this?
However what happens when dataframe repartition
Hello, There is another link here that I hope will help you.
https://stackoverflow.com/questions/33831561/pyspark-repartition-vs-partitionby
In particular, when you are faced with possible data skew or have some
partitioned parameters that need to be obtained at runtime, you can refer to
this
Hello, there is another link to discuss the difference between the two
methods.
Https://stackoverflow.com/questions/33831561/pyspark-repartition-vs-partitionby
In particular, when you are faced with possible data skew or have some
partitioned parameters that need to be obtained at runtime, you
hello,
thanks for quick reply.
got it . partitionBy is to create something like hive partitions.
but when do we use repartition actually?
how to decide whether to do repartition or not?
because in development we are getting sample data.
also what number should I give while repartition.
thanks
On
Hello, I think you can refer to this link and hope to help you.
https://stackoverflow.com/questions/40416357/spark-sql-difference-between-df-repartition-and-dataframewriter-partitionby/40417992
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com
Hi All,
Can anyone explain?
thanks
rajat
On Sun, 21 Apr 2019, 00:18 kumar.rajat20del Hi Spark Users,
>
> repartition and partitionBy seems to be very same in Df.
> In which scenario we use one?
>
> As per my understanding repartition is very expensive operation as it needs
>
Hi Spark Users,
repartition and partitionBy seems to be very same in Df.
In which scenario we use one?
As per my understanding repartition is very expensive operation as it needs
full shuffle then when do we use repartition ?
Thanks
Rajat
--
Sent from: http://apache-spark-user-list.1001560
Hi, I have spark job that produces duplicates when one or tasks from
repartition stage fails.
Here is simplified code.
sparkContext.setCheckpointDir("hdfs://path-to-checkpoint-dir")
*val *inputRDDs: List[RDD[String]] = *List*.*empty *// an RDD per input dir
*val *updatedRDDs = inp
How to repartition Spark DStream Kafka ConsumerRecord RDD. I am getting
uneven size of Kafka topics.. We want to repartition the input RDD based on
some logic.
But when I try to apply the repartition I am getting "object not serializable
(
Hi Ravi,
RDDs are always immutable, so you cannot change them, instead you create new
ones by transforming one. Repartition is a transformation, so it lazily
evaluated, hence computed only when you call an action on it.
Thanks.
Vamshi Talla
On Jul 8, 2018, at 12:26 PM, mailto:ryanda
Hi,
Can anyone clarify how repartition works please ?
* I have a DataFrame df which has only one partition:
// Returns 1
df.rdd.getNumPartitions
* I repartitioned it by passing "3" and assigned it a new DataFrame
newdf
val newdf = df.rep
I prefer not to do a .cache() due to memory limits. But I did try a
persist() with DISK_ONLY
I did the repartition(), followed by a .count() followed by a persist() of
DISK_ONLY
That didn't change the number of tasks either
On Sun, Jul 1, 2018, 15:50 Alexander Czech
wrote:
> You coul
t; time to process a single record goes up-to 5mins on 1 executor
>> >
>> > I'm trying to increase the partitions that my data is in so that I have
>> at
>> > maximum 1 record per executor (currently it sets 2 tasks, and hence 2
>> > executors... I want it
t; executors... I want it to split it into at least 100 tasks at a time so I
> > get 5 records per task => ~20min per task)
>
> Maybe you can try repartition(100) after broadcast join, the task number
> should change to 100 fo
executor
>
> I'm trying to increase the partitions that my data is in so that I have at
> maximum 1 record per executor (currently it sets 2 tasks, and hence 2
> executors... I want it to split it into at least 100 tasks at a time so I
> get 5 records per task => ~20min per task)
My entire CSV is less than 20KB.
By somewhere in between, I do a broadcast join with 3500 records in another
file.
After the broadcast join I have a lot of processing to do. Overall, the
time to process a single record goes up-to 5mins on 1 executor
I'm trying to increase the partitions that my da
Abdeali Kothari wrote
> I am using Spark 2.3.0 and trying to read a CSV file which has 500
> records.
> When I try to read it, spark says that it has two stages: 10, 11 and then
> they join into stage 12.
What's your CSV size per file? I think Spark optimizer may put many files
into one task when
and then save the file as
parquet.
The stages 10 and 11 have only 2 tasks according to spark. I have a
max-executors possible of 20 on my cluster. I would like Spark to use all
20 executors for this task.
*1csv+Repartition*: Right after reading the file, if I do a repartition, it
still takes *2
uot;mapred.max.split.size", 1024*50)
My question is:
How to achieve the same behavior (to get the desired number of partitions)
when using Spark 1.6 (without repartition method and without any method
that incurs shuffling)?
I look forward for your answers.
Regards,
Florin
The primary role of a sink is storing output tuples. Consider groupByKey and
map/flatMapGroupsWithState instead.
-Chris
From: karthikjay
Sent: Friday, April 20, 2018 4:49:49 PM
To: user@spark.apache.org
Subject: [Structured Streaming] [Kafka] How to repartition
Any help appreciated. please find the question in the link:
https://stackoverflow.com/questions/49951022/spark-structured-streaming-with-kafka-how-to-repartition-the-data-and-distribu
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com
/ convert
>>>>> json to df
>>>>> JavaRDD rowJavaRDD = df.javaRDD().map... //add some new
>>>>> fields
>>>>> StructType type = df.schema()...; // constuct new type for new
>>>>> added fields
>>>>> Dat
oreachRDD(VoldFunction
>>>> stringjavardd->{
>>>> Dataset df = spark.read().json( stringjavardd ); // convert
>>>> json to df
>>>> JavaRDD rowJavaRDD = df.javaRDD().map... //add some new fields
>>>> StructType type = df.schema()...;
type = df.schema()...; // constuct new type for new added
>>> fields
>>> Dataset>> //create new dataframe
>>> newdf.repatition(taskNum).write().mode(SaveMode.Append).pati
>>> tionedBy("appname").parquet(savepath);
ionedBy("appname").parquet(savepath); // save to parquet
>> })
>
>
>
> However, if I remove the repartition method of newdf in writing parquet
> stage, the program always throw nullpointerexception error in json convert
> line:
>
> Java.lang.NullPointerExceptio
(SaveMode.Append).patitionedBy("appname").parquet(savepath);
> // save to parquet
> })
However, if I remove the repartition method of newdf in writing parquet
stage, the program always throw nullpointerexception error in json convert
line:
Java.lang.NullPointerExcepti
Hi all,
I have 2 spark jobs one is pre-process and the second is the process.
Process job needs to calculate for each user in the data.
I want to avoid shuffle like groupBy so I think about to save the result
of the pre-process as bucket by user in Parquet or to re-partition by user
and save the r
Hi all,
Can somebody put some light on this pls?
Thanks,
Aakash.
-- Forwarded message --
From: "Aakash Basu"
Date: 15-Jun-2017 2:57 PM
Subject: Repartition vs PartitionBy Help/Understanding needed
To: "user"
Cc:
Hi all,
>
> Everybody is giving a diffe
Hi all,
Everybody is giving a difference between coalesce and repartition, but
nowhere I found a difference between partitionBy and repartition. My
question is, is it better to write a data set in parquet partitioning by a
column and then reading the respective directories to work on that column
Hello there,
I am trying to understand the difference between the following
reparition()...
a. def repartition(partitionExprs: Column*): Dataset[T]
b. def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
c. def repartition(numPartitions: Int): Dataset[T]
My understanding is
e', u'f', u'g', u'h', u'i', u'j', u'k', u'l']
> >>> sc.textFile("outc", use_unicode=False).collect()
> ['a', 'b', 'c', 'd', 'e', 'f', 'g'
is to partition
the data I receive. So I used repartition() to do the above task but this
resulted in duplicating the same set of data across all partitions rather
than dividing (i.e., hashing) it according to the number of partitions
specified. Could anyone please explain a solution? I have shown a s
#x27;d', u'e', u'f', u'g', u'h', u'i', u'j', u'k', u'l']
>>> sc.textFile("outc", use_unicode=False).collect()
['a', 'b', 'c', 'd', 'e', 'f', 'g
;
> d.mcbe...@elsevier.com> wrote:
>
>> I’m using Spark 2.0.
>>
>> I’ve created a dataset from a parquet file and repartition on one of the
>> columns (docId) and persist the repartitioned dataset.
>>
>> val om = ds.repartition($"docId”).persist(Storage
ml>that
walks through what we do expose and how it is used by the query planner.
Michael
On Tue, Sep 20, 2016 at 11:22 AM, McBeath, Darin W (ELS-STL) <
d.mcbe...@elsevier.com> wrote:
> I’m using Spark 2.0.
>
> I’ve created a dataset from a parquet file and repartition on o
I'm using Spark 2.0.
I've created a dataset from a parquet file and repartition on one of the
columns (docId) and persist the repartitioned dataset.
val om = ds.repartition($"docId").persist(StorageLevel.MEMORY_AND_DISK)
When I try to confirm the partitioner, with
om.rd
, August 9, 2016 12:19 AM
Subject: Re: SparkR error when repartition is called
To: Sun Rui mailto:sunrise_...@163.com>>
Cc: User mailto:user@spark.apache.org>>
Sun,
I am using spark in yarn client mode in a 2-node cluster with hadoop-2.7.2. My
R version is 3.3.1.
I have the followi
nvironment information?
On Aug 9, 2016, at 11:35, Shane Lee wrote:
Hi All,
I am trying out SparkR 2.0 and have run into an issue with repartition.
Here is the R code (essentially a port of the pi-calculating scala example in
the spark package) that can reproduce the behavior:
schema <- structTyp
I can’t reproduce your issue with len=1 in local mode.
Could you give more environment information?
> On Aug 9, 2016, at 11:35, Shane Lee wrote:
>
> Hi All,
>
> I am trying out SparkR 2.0 and have run into an issue with repartition.
>
> Here is the R code (essenti
Hi All,
I am trying out SparkR 2.0 and have run into an issue with repartition.
Here is the R code (essentially a port of the pi-calculating scala example in
the spark package) that can reproduce the behavior:
schema <- structType(structField("input", "integer"),
Hi,
I'm looking for documentation or best practices about choosing a key or
keys for repartition of dataframe or rdd
Thank you
MBAREK nihed
--
M'BAREK Med Nihed,
Fedora Ambassador, TUNISIA, Northern Africa
http://www.nihed.com
<http://tn.linkedin.com/in/nihed>
1 topTags_CSV/part-00016.csv
>
> $
>
> numRowsPerCSVFile = 100
>
> numRows = resultDF.count()
>
> quotient, remander = divmod(numRows, numRowsPerCSVFile)
>
> numPartitions = (quotient + 1) if remander > 0 else quotient
>
>
>
> debugStr = ("numRows
1 topTags_CSV/part-00016.csv
$
numRowsPerCSVFile = 100
numRows = resultDF.count()
quotient, remander = divmod(numRows, numRowsPerCSVFile)
numPartitions = (quotient + 1) if remander > 0 else quotient
debugStr = ("numRows:{0} quotient:{1} remander:{2} repartition({3})"
Any insights on this ?
On Fri, Jan 29, 2016 at 1:08 PM, Abhishek Anand
wrote:
> Hi All,
>
> Can someone help me with the following doubts regarding checkpointing :
>
> My code flow is something like follows ->
>
> 1) create direct stream from kafka
> 2) repartition k
Hi All,
Can someone help me with the following doubts regarding checkpointing :
My code flow is something like follows ->
1) create direct stream from kafka
2) repartition kafka stream
3) mapToPair followed by reduceByKey
4) filter
5) reduceByKeyAndWindow without the inverse functio
Hi Andy,
This link explains the difference well.
https://bzhangusc.wordpress.com/2015/08/11/repartition-vs-coalesce/
Simply the difference is whether it "repartitions" partitions or not.
Actually coalesce() with suffering performs exactly woth repartition().
On 29 Dec 2015 08
Hi Michael
I’ll try 1.6 and report back.
The java doc does not say much about coalesce() or repartition(). When I use
reparation() just before I save my output everything runs as expected
I though coalesce() is an optimized version of reparation() and should be
used when ever we know we are
, use spark.sql.shuffle.partitions.
I recommend you do not do any explicit partitioning or mess with these
values until you find a need for it. If executors are sitting idle, that's
a sign you may need to repartition.
On Tue, Dec 8, 2015 at 9:35 PM, Zhiliang Zhu
wrote:
> Thanks very much for Yong'
}#yiv1938266569
div.yiv1938266569WordSection1 {}#yiv1938266569 Shuffling large amounts of data
over the network is expensive, yes. The cost is lower if you are just using a
single node where no networking needs to be involved to do the repartition
(using Spark as a multithreading engine). In general
Shuffling large amounts of data over the network is expensive, yes. The cost is
lower if you are just using a single node where no networking needs to be
involved to do the repartition (using Spark as a multithreading engine).
In general you need to do performance testing to see if a
Hi All,
I need to do optimize objective function with some linear constraints by
genetic algorithm. I would like to make as much parallelism for it by spark.
repartition / shuffle may be used sometimes in it, however, is repartition API
very cost ?
Thanks in advance!Zhiliang
yes i am trying to do so. but it will try to repartition whole data.. can't
we split a large partition(data skewed partition) into multiple partitions
(any idea on this.).
On Sun, Oct 18, 2015 at 1:55 AM, Adrian Tanase wrote:
> If the dataset allows it you can try to write a custom par
If the dataset allows it you can try to write a custom partitioner to help
spark distribute the data more uniformly.
Sent from my iPhone
On 17 Oct 2015, at 16:14, shahid ashraf
mailto:sha...@trialx.com>> wrote:
yes i know about that,its in case to reduce partitions. the point here is the
data
yes i know about that,its in case to reduce partitions. the point here is
the data is skewed to few partitions..
On Sat, Oct 17, 2015 at 6:27 PM, Raghavendra Pandey <
raghavendra.pan...@gmail.com> wrote:
> You can use coalesce function, if you want to reduce the number of
> partitions. This one
You can use coalesce function, if you want to reduce the number of
partitions. This one minimizes the data shuffle.
-Raghav
On Sat, Oct 17, 2015 at 1:02 PM, shahid qadri
wrote:
> Hi folks
>
> I need to reparation large set of data around(300G) as i see some portions
> have large data(data skew)
Hi folks
I need to reparation large set of data around(300G) as i see some portions have
large data(data skew)
i have pairRDDs [({},{}),({},{}),({},{})]
what is the best way to solve the the problem
-
To unsubscribe, e-mail: us
uot;user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: DataFrame repartition not repartitioning
Hello,
I'm trying to load in an Avro file and write it out as Parquet. I would like to
have enough partitions to properly parallelize on. When I do the simple load
and save I get 1
Hello,
I'm trying to load in an Avro file and write it out as Parquet. I would
like to have enough partitions to properly parallelize on. When I do the
simple load and save I get 1 partition out. I thought I would be able to
use repartition like the following:
val avr
I don't know what you are doing in the rest of the code :
- There is lot of non hdfs writes, it comes from the rest of
your code
and/or repartittion(). Repartition involve a shuffling and
creation of
files on disk. I would have said that the problem co
and/or repartittion(). Repartition involve a shuffling and
creation of
files on disk. I would have said that the problem come from that
but I
just checked and checkpoint() is supposed to delete shuffle files :
https://forums.databricks.com/questions/277/how-do
preparation for more memory-intensive
>> operations (I'm happy to wait, I just need the job to complete).
>> Repartition seems to cause an OOM for me?
>> Could someone shed light/or speculate/ why this would happen -- I thought
>> we repartition higher to r
However, I'd prefer to
> increase the number of partitions in preparation for more memory-intensive
> operations (I'm happy to wait, I just need the job to complete).
> Repartition seems to cause an OOM for me?
> Could someone shed light/or speculate/ why this would happen -- I thoug
the job to complete).
Repartition seems to cause an OOM for me?
Could someone shed light/or speculate/ why this would happen -- I thought
we repartition higher to relieve memory pressure?
Im using Spark1.4.1 CDH4 if that makes a difference
This works
val res2 = sqlContext.parquetFile(lst
first
transformation/action on directkafkastream (be it repartition() or
mapPartition()) made directKafkaStream to evaluate and then repartition
made the data to shuffled and then mapPartition is called on shuffled data.
On Fri, Sep 4, 2015 at 10:33 PM, Cody Koeninger wrote:
> The answer alre
The answer already given is correct. You shouldn't doubt this, because
you've already seen the shuffle data change accordingly.
On Fri, Sep 4, 2015 at 11:25 AM, Shushant Arora
wrote:
> But Kafka stream has underlyng RDD which consists of offsets reanges only-
> so how does r
But Kafka stream has underlyng RDD which consists of offsets reanges only-
so how does repartition works ?
1. First it evaluates the transformation and then repartition
2.or first it repartition and then transform. - In this case data should
not be transformed rather offset ranges only should be
Yes not the offset ranges, but the real data will be shuffled when you
using repartition().
Thanks
Saisai
On Fri, Sep 4, 2015 at 12:42 PM, Shushant Arora
wrote:
> 1.Does repartitioning on direct kafka stream shuffles only the offsets or
> exact kafka messages across executors?
>
>
ons in kafka.
Now only the offset ranges should be shuffled to executors not exact kafka
messages? But I am seeing a very large size of shuffles data read/write on
streaming ui. When I remove this repartition - shuffle read /write becomes
0.
) from shutdown hook
15/09/02 21:12:43 INFO TaskSchedulerImpl: Cancelling stage 10
15/09/02 21:12:43 INFO Executor: Executor is trying to kill task 4.0 in
stage 10.0 (TID 64)
15/09/02 21:12:43 INFO TaskSchedulerImpl: Stage 10 was cancelled
15/09/02 21:12:43 INFO DAGScheduler: ShuffleM
1 - 100 of 155 matches
Mail list logo