Hi Tyson,
The broadcast variable should remain in-memory of the executors and reused
unless you unpersist, destroy it or it goes out of context.
Hope this helps.
Thanks
Ankur
On Wed, Jun 10, 2020 at 5:28 PM wrote:
> We have a case where data the is small enough to be broadcasted in joined
> w
Hello,
We have a use case where we need to do a Cartesian join and for some reason
we are not able to get it work with Dataset API's. We have similar use case
implemented and working with RDD.
We have two dataset:
- one data set with 2 string columns say c1, c2. It is a small data set
with ~1 mil
Hi Amit,
Spark keeps the partition that it is working on in memory (and does not
spill to disk even if it is running OOM). Also since you are getting OOM
when using partitionBy (and not when you just use flatMap), there should be
one (or few) dates on which your partition size is bigger than the h
Hi Christoph,
I am not an expert in ML and have not used Spark KMeans but your problem
seems to be an issue of local minimum vs global minimum. You should run
K-means multiple times with random starting point and also try with
multiple values of K (unless you are already sure).
Hope this helps.
You should instead broadcast your list and then use the broadcast variable
in the flatmap function.
Thanks
Ankur
On Fri, Apr 14, 2017 at 4:32 AM, Soheila S. wrote:
> Hello all,
> Can someone help me to solve the following fundamental problem?
>
>
> I have a JavaRDD and as a flatMap method, I ca
If I understand your question you should look at withColumn of dataframe
api.
df.withColumn("len", len("l"))
Thanks
Ankur
On Fri, Apr 14, 2017 at 6:07 AM, issues solution
wrote:
> Hi ,
> how you can create column inside map function
>
>
> like that :
>
> df.map(lambd l : len(l) ) .
>
> bu
You can use zipWithIndex or the approach Tim suggested or even the one you
are using but I believe the issue is that tableA is being materialized
every time you for the new transformations. Are you caching/persisting the
table A? If you do that you should not see this behavior.
Thanks
Ankur
On Fr
Hi Stephen,
If you use aggregate functions or reduceGroup on KeyValueGroupedDataset it
behaves as reduceByKey on RDD.
Only if you use flatMapGroups and mapGroups it behaves as groupByKey on
RDD and if you read the API documentation it warns of using the API.
Hope this helps.
Thanks
Ankur
On F
The fix for this make your class Serializable. The reason being the
closures you have defined in the class need to be serialized and copied
over to all executor nodes.
Hope this helps.
Thanks
Ankur
On Mon, Mar 6, 2017 at 1:06 PM, Mina Aslani wrote:
> Hi,
>
> I am trying to start with spark and
;
>> On Sat, 4 Mar 2017 at 2:32 pm, Ankur Srivastava
>> wrote:
>> Adding DEV.
>>
>> Or is there any other way to do subtractByKey using Dataset APIs?
>>
>> Thanks
>> Ankur
>>
>> On Wed, Mar 1, 2017 at 1:28 PM, Ankur Srivastava
&g
Adding DEV.
Or is there any other way to do subtractByKey using Dataset APIs?
Thanks
Ankur
On Wed, Mar 1, 2017 at 1:28 PM, Ankur Srivastava wrote:
> Hi Users,
>
> We are facing an issue with left_outer join using Spark Dataset api in 2.0
> Java API. Below is the code we have
Hi Users,
We are facing an issue with left_outer join using Spark Dataset api in 2.0
Java API. Below is the code we have
Dataset badIds = filteredDS.groupBy(col("id").alias("bid")).count()
.filter((FilterFunction) row -> (Long) row.getAs("count") > 75000);
_logger.info("Id count with over
The one issue with using Neo4j is that you need to persist the whole graph
on one single machine i.e you can not shard the graph. I am not sure what
is the size of your graph but if it is huge one way to shard could be to
use the Component Id to shard. You can generate Component Id by running
Conne
function.
Thanks
Ankur
On Fri, Jan 27, 2017 at 12:15 PM, Richard Xin
wrote:
> try
> Row newRow = RowFactory.create(row.getString(0), row.getString(1),
> row.getMap(2));
>
>
>
> On Friday, January 27, 2017 10:52 AM, Ankur Srivastava <
> ankur.srivast...@gmail.com> w
+ DEV Mailing List
On Thu, Jan 26, 2017 at 5:12 PM, Ankur Srivastava <
ankur.srivast...@gmail.com> wrote:
> Hi,
>
> I am trying to map a Dataset with rows which have a map attribute. When I
> try to create a Row with the map attribute I get cast errors. I am able to
> rep
Hi,
I am trying to map a Dataset with rows which have a map attribute. When I
try to create a Row with the map attribute I get cast errors. I am able to
reproduce the issue with the below sample code. The surprising thing is
with same schema I am able to create a dataset from the List of rows.
I
ed to at lease return `MapType` instead of `StructType`.
> The stacktrace you showed explicitly say this type unmatch.
>
> // maropu
>
>
>> On Thu, Jan 26, 2017 at 12:07 PM, Ankur Srivastava
>> wrote:
>> Hi,
>>
>> I have a dataset with tuple of ID and
Hi,
I have a dataset with tuple of ID and Timestamp. I want to do a group by on
ID and then create a map with frequency per hour for the ID.
Input:
1| 20160106061005
1| 20160106061515
1| 20160106064010
1| 20160106050402
1| 20160106040101
2| 20160106040101
3| 20160106051451
Expected Output:
1|{20
AM, Steve Loughran
wrote:
>
> On 5 Jan 2017, at 21:10, Ankur Srivastava
> wrote:
>
> Yes I did try it out and it choses the local file system as my checkpoint
> location starts with s3n://
>
> I am not sure how can I make it load the S3FileSystem.
>
>
> set fs.de
Adding DEV mailing list to see if this is a defect with ConnectedComponent
or if they can recommend any solution.
Thanks
Ankur
On Thu, Jan 5, 2017 at 1:10 PM, Ankur Srivastava wrote:
> Yes I did try it out and it choses the local file system as my checkpoint
> location starts with s3n://
by chance run just the delete to see if it fails
>
> FileSystem.get(sc.hadoopConfiguration)
> .delete(new Path(somepath), true)
> ------
> *From:* Ankur Srivastava
> *Sent:* Thursday, January 5, 2017 10:05:03 AM
> *To:* Felix Cheung
> *Cc:* user@spa
m s3 from Spark?
>
> _____
> From: Ankur Srivastava
> Sent: Wednesday, January 4, 2017 9:23 PM
> Subject: Re: Spark GraphFrame ConnectedComponents
> To: Felix Cheung
> Cc:
>
>
>
> This is the exact trace from the driver logs
>
> Exception in thread &quo
parkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
And I am running spark v 1.6.2 and graphframes v 0.3.0-spark1.6-s_2.10
Thanks
Ankur
On Wed, Jan 4, 2017 at 8:03 PM, Ankur Srivast
wrote:
> Do you have more of the exception stack?
>
>
> --
> *From:* Ankur Srivastava
> *Sent:* Wednesday, January 4, 2017 4:40:02 PM
> *To:* user@spark.apache.org
> *Subject:* Spark GraphFrame ConnectedComponents
>
> Hi,
>
> I am tr
Hi,
I am trying to use the ConnectedComponent algorithm of GraphFrames but by
default it needs a checkpoint directory. As I am running my spark cluster
with S3 as the DFS and do not have access to HDFS file system I tried using
a s3 directory as checkpoint directory but I run into below exception:
Hi
I am working on two different use cases where the basic problem is same but
scale is very different.
In case 1 we have two entities that can have many to many relation and we would
want to identify all subgraphs in the full graph and then further prune the
graph to find the best relation. T
to
cores?
Thanks
Ankur
On Tue, Oct 11, 2016 at 11:16 PM, Ankur Srivastava <
ankur.srivast...@gmail.com> wrote:
> Hi,
>
> I am upgrading my jobs to Spark 1.6 and am running into shuffle issues. I
> have tried all options and now am falling back to legacy memory model but
> st
Hi,
I am upgrading my jobs to Spark 1.6 and am running into shuffle issues. I
have tried all options and now am falling back to legacy memory model but
still running into same issue.
I have set spark.shuffle.blockTransferService to nio.
16/10/12 06:00:10 INFO MapOutputTrackerMaster: Size of outp
Hi,
I am running a simple job on Spark 1.6 in which I am trying to leftOuterJoin a
big RDD with a smaller one. I am not yet broadcasting the smaller RDD yet
but I am stilling running into FetchFailed errors with finally the job
getting killed.
I have already partitioned the data to 5000 partition
Hi,
We have a use case where we broadcast ~4GB of data and we are on m3.2xlarge
so your object size is not an issue. Also based on your explanation does
not look like a broadcast issue as it works when your partition size is
small.
Are you caching any other data? Because boradcast variable use th
Hi,
You are creating a logger instance on driver and then trying to use that
instance in a transformation function which is executed on the executor.
You should create logger instance in the transformation function itself but
then the logs will go to separate files on each worker node.
Hope this
Hi Suraj,
Spark uses a lot of ports to communicate between nodes. Probably your
security group is restrictive and does not allow instances to communicate
on all networks. The easiest way to resolve it is to add a Rule to allow
all Inbound traffic on all ports (0-65535) to instances in same securit
PairRdd.values is what you need.
Ankur
On Tue, Sep 22, 2015, 11:25 PM Zhang, Jingyu
wrote:
> Hi All,
>
> I want to extract the "value" RDD from PairRDD in Java
>
> Please let me know how can I get it easily.
>
> Thanks
>
> Jingyu
>
>
> This message and its attachments may contain legally privi
Good to know it worked for you.
CC'ed user group so that the thread reaches a closure.
Thanks
Ankur
On Wed, Sep 16, 2015 at 6:13 AM, Thiago Diniz
wrote:
> Nailed it.
>
> Thank you!
>
> 2015-09-15 14:39 GMT-03:00 Ankur Srivastava :
>
>> Hi,
>>
>> The
Hi,
The signatures are perfect. I also tried same code on eclipse and for some
reason eclipse did not import java.util.Iterator. Once I imported it, it is
fine. Might be same issue with NetBeans.
Thanks
Ankur
On Tue, Sep 15, 2015 at 10:11 AM, dinizthiagobr
wrote:
> Can't get this one to work a
t; …
>
> }
>
> });
>
> return featureScoreRDD;
>
> }
>
>
>
> }
>
>
>
> Thanks again for all your help and advice.
>
>
>
> Regards,
>
&g
Hi Rachana
I didn't get you r question fully but as the error says you can not perform
a rdd transformation or action inside another transformation. In your
example you are performing an action "rdd2.values.count()" in side the "map"
transformation. It is not allowed and in any case this will be v
Thanks Ankur,
>
> But I grabbed some keys from the Spark results and ran "nodetool -h
> getendpoints " and it showed the data is coming from at least 2 nodes?
> Regards,
> Alaa
>
> On Thu, Sep 3, 2015 at 12:06 PM, Ankur Srivastava <
> ankur.srivast...@gm
Hi Alaa,
Partition when using CassandraRDD depends on your partition key in
Cassandra table.
If you see only 1 partition in the RDD it means all the rows you have
selected have same partition_key in C*
Thanks
Ankur
On Thu, Sep 3, 2015 at 11:54 AM, Alaa Zubaidi (PDF)
wrote:
> Hi,
>
> I testin
r during the computation. Hard to say without knowing more details.
>
> You could try increasing the timeout for the failed askWithReply by
> increasing "spark.akka.lookupTimeout" (defaults to 30), but that would most
> likely be treating a symptom, not the root cause.
>
>
mode don't use this config.
>
> To debug this, please type "ps auxw | grep
> org.apache.spark.deploy.master.[M]aster" in master machine.
> You can see the Xmx and Xms option.
>
> Wisely Chen
>
>
>
>
>
>
> On Mon, Mar 30, 2015 at 3:55 AM, Ankur
y Chen
>
> 2015-03-28 15:39 GMT+08:00 Ankur Srivastava :
>
>> Hi Wisely,
>> I have 26gb for driver and the master is running on m3.2xlarge machines.
>>
>> I see OOM errors on workers and even they are running with 26th of memory.
>>
>> Thanks
>>
to master node and
> broadcast to each slaves. It is very common situation that the master node
> don't have enough memory .
>
> What is your master node settings?
>
> Wisely Chen
>
> Ankur Srivastava 於 2015年3月28日 星期六寫道:
>
> I have increased the "spark.stor
SerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
Thanks
Ankur
On Fri, Mar 27, 2015 at 2:52 PM, Ankur Srivastava <
ankur.srivast...@gmail.com> wrote:
> Hi All,
>
> I am running a spark cluster on EC2 instanc
Hi All,
I am running a spark cluster on EC2 instances of type: m3.2xlarge. I have
given 26gb of memory with all 8 cores to my executors. I can see that in
the logs too:
*15/03/27 21:31:06 INFO AppClient$ClientActor: Executor added:
app-20150327213106-/0 on worker-20150327212934-10.x.y.z-40128
roadcasting big
objects of 3-4 gb?
Thanks
Ankur
On Wed, Mar 11, 2015 at 8:58 AM, Ankur Srivastava <
ankur.srivast...@gmail.com> wrote:
> Thank you everyone!! I have started implementing the join using the
> geohash and using the first 4 alphabets of the HASH as the key.
>
> Can I assig
Thank you everyone!! I have started implementing the join using the geohash
and using the first 4 alphabets of the HASH as the key.
Can I assign a Confidence factor in terms of distance based on number of
characters matching in the HASH code?
I will also look at the other options listed here.
Th
Hi,
I am trying to join data based on the latitude and longitude. I have
reference data which has city information with their latitude and longitude.
I have a data source with user information with their latitude and
longitude. I want to find the nearest city to the user's latitude and
longitude.
Thanks a lot Ted!!
On Tue, Mar 3, 2015 at 9:53 AM, Ted Yu wrote:
> If you can use hadoop 2.6.0 binary, you can use s3a
>
> s3a is being polished in the upcoming 2.7.0 release:
> https://issues.apache.org/jira/browse/HADOOP-11571
>
> Cheers
>
> On Tue, Mar 3, 2015 at 9
Hi,
We recently upgraded to Spark 1.2.1 - Hadoop 2.4 binary. We are not having
any other dependency on hadoop jars, except for reading our source files
from S3.
Since we have upgraded to the latest version our reads from S3 have
considerably slowed down. For some jobs we see the read from S3 is s
Li, I cannot tell you the reason for this exception but have seen these
kind of errors when using HASH based shuffle manager (which is default)
until v 1.2. Try the SORT shuffle manager.
Hopefully that will help
Thanks
Ankur
Anyone has idea on where I can find the detailed log of that lost
execu
Hi Manoj,
You can set the number of partitions you want your sql query to use. By
default it is 200 and thus you see that number. You can update it using the
spark.sql.shuffle.partitions property
spark.sql.shuffle.partitions200Configures the number of partitions to use
when shuffling data for joi
I am running on m3.xlarge instances on AWS with 12 gb worker memory and 10
gb executor memory.
On Sun, Feb 1, 2015, 12:41 PM Arush Kharbanda
wrote:
> What is the machine configuration you are running it on?
>
> On Mon, Feb 2, 2015 at 1:46 AM, Ankur Srivastava <
> ankur.sriva
be some issue it would be in log4j and
not in spark.
Thanks
Arush
On Fri, Jan 30, 2015 at 4:24 AM, Ankur Srivastava <
ankur.srivast...@gmail.com> wrote:
> Hi,
>
> When ever I enable DEBUG level logs for my spark cluster, on running a job
> all the executors die with the b
Hi,
I am no expert but have a small application working with Spark and
Cassandra.
I faced these issues when we were deploying our cluster on EC2 instances
with some machines on public network and some on private.
This seems to be a similar issue as you are trying to connect to "
10.34.224.249" w
Hi,
When ever I enable DEBUG level logs for my spark cluster, on running a job
all the executors die with the below exception. On disabling the DEBUG logs
my jobs move to the next step.
I am on spark-1.1.0
Is this a known issue with spark?
Thanks
Ankur
2015-01-29 22:27:42,467 [main] INFO org
Hi,
I wanted to understand how the join on two pair rdd's work? Would it result
in shuffling data from both the RDD's with same key into same partition? If
that is the case would it be better to use partitionBy function to
partition (by the join attribute) the RDD at creation for lesser shuffling?
fixed the issue
for us.
Thanks
Ankur
On Mon, Jan 12, 2015 at 9:04 AM, Ankur Srivastava <
ankur.srivast...@gmail.com> wrote:
> Hi Akhil,
>
> Thank you for the pointers. Below is how we are saving data to Cassandra.
>
> javaFunctions(rddToSave).writerBuilder
nks
> Best Regards
>
> On Mon, Jan 12, 2015 at 7:45 AM, Ankur Srivastava <
> ankur.srivast...@gmail.com> wrote:
>
>> Hi Akhil, thank you for your response.
>>
>> Actually we are first reading from cassandra and then writing back after
>> doing some proc
> On Sat, Jan 10, 2015 at 8:44 AM, Ankur Srivastava <
> ankur.srivast...@gmail.com> wrote:
>
>> Hi,
>>
>> We are currently using spark to join data in Cassandra and then write the
>> results back into Cassandra. While reads happen with out any error durin
Hi,
We are currently using spark to join data in Cassandra and then write the
results back into Cassandra. While reads happen with out any error during
the writes we see many exceptions like below. Our environment details are:
- Spark v 1.1.0
- spark-cassandra-connector-java_2.10 v 1.1.0
We are
actually being set, especially if you're on mesos
> (because of https://issues.apache.org/jira/browse/MESOS-123 ) Find the
> pid of the executor process, and cat /proc//limits
>
> set spark.shuffle.consolidateFiles = true
>
> try spark.shuffle.manager = sort
>
>
&
Hello,
We are currently running our data pipeline on spark which uses Cassandra as
the data source.
We are currently facing issue with the step where we create an rdd on data
in cassandra table and then try to run "flatMapToPair" to transform the
data but we are running into "Too many open files"
Is this because I am calling a transformation function on an rdd from
inside another transformation function?
Is it not allowed?
Thanks
Ankut
On Oct 21, 2014 1:59 PM, "Ankur Srivastava"
wrote:
> Hi Gerard,
>
> this is the code that may be helpful.
>
> public class Ref
Hi,
I am creating a cassandra java rdd and transforming it using the where
clause.
It works fine when I run it outside the mapValues, but when I put the code
in mapValues I get an error while creating the transformation.
Below is my sample code:
CassandraJavaRDD cassandraRefTable = javaFuncti
titioner based on keys so that you can avoid
> shuffling and optimize join performance.
>
> HTH
>
> Best Regards,
> Sonal
> Nube Technologies <http://www.nubetech.co>
>
> <http://in.linkedin.com/in/sonalgoyal>
>
>
>
> On Fri, Oct 17, 2014 at
Hi,
I have a rdd which is my application data and is huge. I want to join this
with reference data which is also huge to fit in-memory and thus I do not
want to use Broadcast variable.
What other options do I have to perform such joins?
I am using Cassandra as my data store, so should I just que
ble to access it using the hostnames then you won't be able access it
> with the IP address either i believe.
> What are you trying to do here? running your eclipse locally and
> connecting to your ec2 cluster?
>
> Thanks
> Best Regards
>
> On Tue, Oct 7, 2014 a
Hi,
I have started a Spark Cluster on EC2 using Spark Standalone cluster
manager but spark is trying to identify the worker threads using the
hostnames which are not accessible publicly.
So when I try to submit jobs from eclipse it is failing, is there some way
spark can use IP address instead of
with sorting still remains.
So after I have partitioned the RDD, I invoke partitionedRdd.sortByKey(),
but now each partition only has pairs which have same key.
one thing I wanted to mention that I am using CassandraJavaRDD for this.
Thanks
- Ankur
On Wed, Oct 1, 2014 at 10:12 PM, Ankur Srivastava
Hi,
I am using custom partitioner to partition my JavaPairRDD where key is a
String.
I use hashCode of the sub-string of the key to derive the partition index
but I have noticed that my partition contains keys which have a different
partitionIndex returned by the partitioner.
Another issue I am
71 matches
Mail list logo