Java Generic T makes ClassNotFoundException

2019-06-26 Thread big data
Dear, I use Spark to deserialize some files to restore to my own Class object. The Spark code and Class deserialized code (using Apache Common Lang) like this: val fis = spark.sparkContext.binaryFiles("/folder/abc*.file") val RDD = fis.map(x => { val content = x._2.toArray() val b = Block.de

Re: Change parallelism number in Spark Streaming

2019-06-26 Thread Jungtaek Lim
Glad to help, Jacek. I'm happy you're doing similar thing, which means it could be pretty useful for others as well. Looks like it might be good enough to contribute state source and sink. I'll sort out my code and submit a PR. Thanks, Jungtaek Lim (HeartSaVioR) On Thu, Jun 27, 2019 at 7:54 AM

Re: Array[Byte] from BinaryFiles can not be deserialized on Spark Yarn mode

2019-06-26 Thread big data
Additional info about this problems: The deserialize code like this: public static Block deserializeFrom(byte[] bytes) { try { Block b = SerializationUtils.deserialize(bytes); System.out.println("b="+b); return b; } catch (ClassCastException e) { System.out

Re: Change parallelism number in Spark Streaming

2019-06-26 Thread Jacek Laskowski
Hi Jungtaek, That's very helpful to have the state source. As a matter of fact I've just this week been working on a similar tool (!) and have been wondering how to recreate the schema of the state key and value. You've helped me a lot. Thanks. Jacek On Wed, 26 Jun 2019, 23:58 Jungtaek Lim, wro

Re: Change parallelism number in Spark Streaming

2019-06-26 Thread Jacek Laskowski
Hi, No idea. I've just begun exploring the current state of state management in spark structured streaming. I'd not be surprised if what you're after were not possible. Stateful stream processing in SSS is fairly young. Jacek On Wed, 26 Jun 2019, 21:48 Rong, Jialei, wrote: > Thank you for your

Re: Change parallelism number in Spark Streaming

2019-06-26 Thread Rong, Jialei
Fantastic, thanks! From: Jungtaek Lim Date: Wednesday, June 26, 2019 at 2:59 PM To: "Rong, Jialei" Cc: Jacek Laskowski , "user @spark" Subject: Re: Change parallelism number in Spark Streaming Hi, you could consider state operator's partition numbers as "max parallelism", as parallelism can

Re: Change parallelism number in Spark Streaming

2019-06-26 Thread Jungtaek Lim
Hi, you could consider state operator's partition numbers as "max parallelism", as parallelism can be reduced via applying coalesce. It would be effectively working similar as key groups. If you're also considering offline query, there's a tool to manipulate state which enables reading and writin

How to make sure that function is executed on each active executor?

2019-06-26 Thread Parag Chaudhari
Hi, I am working on some use case where I want to perform some action on each active executor of application once. How to run some function on each active executor associated with current spark application? num_executors = len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1 if num_executor

Re: Change parallelism number in Spark Streaming

2019-06-26 Thread Rong, Jialei
Thank you for your quick reply! Is there any plan to improve this? I asked this question due to some investigation on comparing those state of art streaming systems, among which Flink and DataFlow allow changing parallelism number, and by my knowledge of Spark Streaming, it seems it is also able

Re: Change parallelism number in Spark Streaming

2019-06-26 Thread Jacek Laskowski
Hi, It's not allowed to change the numer of partitions after your streaming query is started. The reason is exactly the number of state stores which is exactly the number of partitions (perhaps multiplied by the number of stateful operators). I think you'll even get a warning or an exception whe

Change parallelism number in Spark Streaming

2019-06-26 Thread Rong, Jialei
Hi Dear Spark Expert I’m curious about a question regarding Spark Streaming/Structured Streaming: whether it allows to change parallelism number(the default one or the one specified in particular operator) in a stream having stateful transform/operator? Whether this will cause my checkpointed s

How to run spark on GPUs

2019-06-26 Thread Jorge Machado
Hi Guys, what is the current recommend way to use GPUs on spark ? Which scheduler should we use ? Mesos Or Kubernetes ? What are the approaches to follow until https://issues.apache.org/jira/browse/SPARK-24615 is in place. Thanks Jorge

check is empty effieciently

2019-06-26 Thread SNEHASISH DUTTA
Hi, which is more efficient? this is already defined since 2.4.0 *def isEmpty: Boolean = withAction("isEmpty", limit(1).groupBy().count().queryExecution) { plan => plan.executeCollect().head.getLong(0) == 0}* or * df.head(1).isEmpty* I am checking if a DF is empty and it is taking forever R

Re: [EXTERNAL] - Re: Problem with the ML ALS algorithm

2019-06-26 Thread Nick Pentreath
Generally I would say 10s is a bit low, while a few 100s+ starts to make sense. Of course it depends a lot on the specific use case, item catalogue etc, user experience / platform, etc. On Wed, Jun 26, 2019 at 3:57 PM Steve Pruitt wrote: > I should have mentioned this is a synthetic dataset I cr

RE: [EXTERNAL] - Re: Problem with the ML ALS algorithm

2019-06-26 Thread Steve Pruitt
I should have mentioned this is a synthetic dataset I create using some likelihood distributions of the rating values. I am only experimenting / learning. In practice though, the list of items is likely to be at least in the 10’s if not 100’s. Are even this item numbers to low? Thanks. -S

Re: [EXTERNAL] - Re: Problem with the ML ALS algorithm

2019-06-26 Thread Nick Pentreath
If the number of items is indeed 4, then another issue is the rank of the factors defaults to 10. Setting the "rank" parameter < 4 will help. However, if you only have 4 items, then I would propose that using ALS (or any recommendation model in fact) is not really necessary. There is not really en

RE: [EXTERNAL] - Re: Problem with the ML ALS algorithm

2019-06-26 Thread Steve Pruitt
Number of users is 1055 Number of items is 4 Ratings values are either 120, 20, 0 From: Nick Pentreath Sent: Wednesday, June 26, 2019 6:03 AM To: user@spark.apache.org Subject: [EXTERNAL] - Re: Problem with the ML ALS algorithm This means that the matrix that ALS is trying to factor is not posi

hadoop replication property from spark code not working

2019-06-26 Thread Divya Narayan
Hi, I have a use case for which I want to override the default hdfs replication factor from my spark code. For this I have set the hadoop replication like this: val sc = new SparkContext(conf) sc.hadoopConfiguration.set('dfs.replication','1'). Now my spark job runs as a cron job in some specific

Re: Problem with the ML ALS algorithm

2019-06-26 Thread Nick Pentreath
This means that the matrix that ALS is trying to factor is not positive definite. Try increasing regParam (try 0.1, 1.0 for example). What does the data look like? e.g. number of users, number of items, number of ratings, etc? On Wed, Jun 26, 2019 at 12:06 AM Steve Pruitt wrote: > I get an inex

Array[Byte] from BinaryFiles can not be deserialized on Spark Yarn mode

2019-06-26 Thread big data
I use Apache Commons Lang3's SerializationUtils in the code. SerializationUtils.serialize() to store a customized class as files into disk and SerializationUtils.deserialize(byte[]) to restore them again. In the Spark local Mode, all serialized files can be deserialized normally and no error ha