Spark 1.6.0 op CDH 5.6.0

2016-03-22 Thread Michel Hubert
Hi,


I'm trying to run a Spark 1.6.0 application on a CDH 5.6.0 cluster.

How do I submit the uber-jar so it's totally self-reliant?

With kind regards,
Mitchel


spark-submit --class TEST --master yarn-cluster ./uber-TEST-1.0-SNAPSHOT.jar


Spark 1.6.1
Version: Cloudera Express 5.6.0

16/03/22 09:16:33 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster 
with FAILED (diag message: User class threw exception: 
java.lang.NoSuchMethodError: 
org.apache.spark.streaming.api.java.JavaPairDStream.mapWithState(Lorg/apache/spark/streaming/StateSpec;)Lorg/apache/spark/streaming/api/java/JavaMapWithStateDStream;)





Is there a way to submit spark job to your by YARN REST API?

2016-03-22 Thread tony....@tendcloud.com
Hi, All,
We are trying to build a data processing workflow which will call different 
spark jobs and we are using YARN. Because we want to constraint ACL for those 
spark jobs, so we need to submit spark job to use Yarn REST API( which we can 
pass application acl as parameters. So is there any Spark API which can support 
that?   If no, is there any third party solutions for that?


Thanks and Regards,




阎志涛(Tony)

北京腾云天下科技有限公司

邮箱:tony@tendcloud.com
电话:13911815695
微信: zhitao_yan
QQ : 4707059
地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
邮编:100027

TalkingData.com - 让数据说话


Re: Spark 1.6.0 op CDH 5.6.0

2016-03-22 Thread Sebastian YEPES FERNANDEZ
Hello Michel,

I had a similar issue when running my custom built Spark 1.6.1, at the end
I resolved the issue by building Spark and my Jar with the CDH built in jvm

export JAVA_HOME=/usr/java/jdk1.7.0_67-cloudera/


Regards and hope this helps,
Sebastian

On Tue, Mar 22, 2016 at 10:13 AM, Michel Hubert  wrote:

> Hi,
>
>
>
>
>
> I’m trying to run a Spark 1.6.0 application on a CDH 5.6.0 cluster.
>
>
>
> How do I submit the uber-jar so it’s totally self-reliant?
>
>
>
> With kind regards,
>
> ​​
> Mitchel
>
>
>
>
>
> spark-submit --class TEST --master yarn-cluster
> ./uber-TEST-1.0-SNAPSHOT.jar
>
>
>
>
>
> Spark 1.6.1
>
> *Version*: Cloudera Express 5.6.0
>
>
>
> 16/03/22 09:16:33 INFO yarn.ApplicationMaster: Unregistering
> ApplicationMaster with FAILED (diag message: User class threw exception:
> java.lang.NoSuchMethodError:
> org.apache.spark.streaming.api.java.JavaPairDStream.mapWithState(Lorg/apache/spark/streaming/StateSpec;)Lorg/apache/spark/streaming/api/java/JavaMapWithStateDStream;)
>
>
>
>
>
>
>


Issues facing while Running Spark Streaming Job in YARN cluster mode

2016-03-22 Thread Soni spark
Hi ,

I am able to run spark streaming job in local mode, when i try to run the
same job in my YARN cluster, its throwing errors.

Any help is appreciated in this regard

Here are my Exception logs:

Exception 1:

java.net.SocketTimeoutException: 48 millis timeout while waiting for
channel to be ready for write. ch :
java.nio.channels.SocketChannel[connected local=/172.16.28.192:50010
remote=/172.16.28.193:46147]
at
org.apache.hadoop.net.SocketIOWithTimeout.waitForIO(SocketIOWithTimeout.java:246)
at
org.apache.hadoop.net.SocketOutputStream.waitForWritable(SocketOutputStream.java:172)
at
org.apache.hadoop.net.SocketOutputStream.transferToFully(SocketOutputStream.java:220)
at
org.apache.hadoop.hdfs.server.datanode.BlockSender.sendPacket(BlockSender.java:559)
at
org.apache.hadoop.hdfs.server.datanode.BlockSender.sendBlock(BlockSender.java:728)
at
org.apache.hadoop.hdfs.server.datanode.DataXceiver.readBlock(DataXceiver.java:496)
at
org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opReadBlock(Receiver.java:116)
at
org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71)
at
org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:235)
at java.lang.Thread.run(Thread.java:745)


Exception 2:


2016-03-22 12:17:47,838 WARN org.apache.hadoop.hdfs.BlockReaderFactory: I/O
error constructing remote block reader.
java.nio.channels.ClosedByInterruptException
at
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:658)
at
org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)
at
org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3101)
at
org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:755)
at
org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:670)
at
org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:337)
at
org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
at
org.apache.hadoop.hdfs.DFSInputStream.seekToBlockSource(DFSInputStream.java:1460)
at
org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:773)
at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:806)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:847)
at java.io.DataInputStream.read(DataInputStream.java:100)
at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:84)
at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:52)
at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:112)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366)
at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:265)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:356)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2016-03-22 12:17:47,838 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container:
Container container_1458629096860_0001_01_01 transitioned from KILLING
to DONE
2016-03-22 12:17:47,841 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application:
Removing container_1458629096860_0001_01_01 from application
application_1458629096860_0001
2016-03-22 12:17:47,842 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Got
event CONTAINER_STOP for appId application_1458629096860_0001
2016-03-22 12:17:47,842 WARN org.apache.hadoop.hdfs.DFSClient: Failed to
connect to /node1:50010 for block, add to deadNodes and continue.
java.nio.channels.ClosedByInterruptException
java.nio.channels.ClosedByInterruptException


Re: [Streaming] Difference between windowed stream and stream with large batch size?

2016-03-22 Thread Jatin Kumar
Hello all,

I am also looking for the answer of the same. Can someone please answer on
the pros and cons of using a larger batch size or putting a window
operation on smaller batch size?

--
Thanks
Jatin

On Wed, Mar 16, 2016 at 2:30 PM, Hao Ren  wrote:

> Any ideas ?
>
> Feel free to ask me more details, if my questions are not clear.
>
> Thank you.
>
> On Mon, Mar 7, 2016 at 3:38 PM, Hao Ren  wrote:
>
>> I want to understand the advantage of using windowed stream.
>>
>> For example,
>>
>> Stream 1:
>> initial duration = 5 s,
>> and then transformed into a stream windowed by (*windowLength = *30s, 
>> *slideInterval
>> = *30s)
>>
>> Stream 2:
>> Duration = 30 s
>>
>> Questions:
>>
>> 1. Is Stream 1 equivalent to Stream 2 on behavior ? Do users observe the
>> same result ?
>> 2. If yes, what is the advantage of one vs. another ? Performance or
>> something else ?
>> 3. Is a stream with large batch reasonable , say 30 mins or even an hour ?
>>
>> Thank you.
>>
>> --
>> Hao Ren
>>
>> Data Engineer @ leboncoin
>>
>> Paris, France
>>
>
>
>
> --
> Hao Ren
>
> Data Engineer @ leboncoin
>
> Paris, France
>


Re: Issues facing while Running Spark Streaming Job in YARN cluster mode

2016-03-22 Thread Saisai Shao
I guess in local mode you're using local FS instead of HDFS, here the
exception mainly threw from HDFS when running on Yarn, I think it would be
better to check the status and configurations of HDFS to see if it normal
or not.

Thanks
Saisai

On Tue, Mar 22, 2016 at 5:46 PM, Soni spark 
wrote:

> Hi ,
>
> I am able to run spark streaming job in local mode, when i try to run the
> same job in my YARN cluster, its throwing errors.
>
> Any help is appreciated in this regard
>
> Here are my Exception logs:
>
> Exception 1:
>
> java.net.SocketTimeoutException: 48 millis timeout while waiting for
> channel to be ready for write. ch :
> java.nio.channels.SocketChannel[connected local=/172.16.28.192:50010
> remote=/172.16.28.193:46147]
> at
> org.apache.hadoop.net.SocketIOWithTimeout.waitForIO(SocketIOWithTimeout.java:246)
> at
> org.apache.hadoop.net.SocketOutputStream.waitForWritable(SocketOutputStream.java:172)
> at
> org.apache.hadoop.net.SocketOutputStream.transferToFully(SocketOutputStream.java:220)
> at
> org.apache.hadoop.hdfs.server.datanode.BlockSender.sendPacket(BlockSender.java:559)
> at
> org.apache.hadoop.hdfs.server.datanode.BlockSender.sendBlock(BlockSender.java:728)
> at
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.readBlock(DataXceiver.java:496)
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opReadBlock(Receiver.java:116)
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71)
> at
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:235)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Exception 2:
>
>
> 2016-03-22 12:17:47,838 WARN org.apache.hadoop.hdfs.BlockReaderFactory:
> I/O error constructing remote block reader.
> java.nio.channels.ClosedByInterruptException
> at
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:658)
> at
> org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)
> at
> org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3101)
> at
> org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:755)
> at
> org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:670)
> at
> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:337)
> at
> org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
> at
> org.apache.hadoop.hdfs.DFSInputStream.seekToBlockSource(DFSInputStream.java:1460)
> at
> org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:773)
> at
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:806)
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:847)
> at java.io.DataInputStream.read(DataInputStream.java:100)
> at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:84)
> at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:52)
> at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:112)
> at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366)
> at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:265)
> at
> org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
> at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:356)
> at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 2016-03-22 12:17:47,838 INFO
> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container:
> Container container_1458629096860_0001_01_01 transitioned from KILLING
> to DONE
> 2016-03-22 12:17:47,841 INFO
> org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application:
> Removing container_1458629096860_0001_01_01 from application
> application_1458629096860_0001
> 2016-03-22 12:17:47,842 INFO
> org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Got
> event CONTAINER_STOP for appId application_1458629096860_00

Add org.apache.spark.mllib model .predict() method to models in org.apache.spark.ml?

2016-03-22 Thread James Hammerton
Hi,

The machine learning models in org.apache.spark.mllib have a .predict()
method that can be applied to a Vector to return a prediction.

However this method does not appear on the new models on org.apache.spark.ml
and you have to wrap up a Vector in a DataFrame to send a prediction in.
This ties you into bringing in more of Spark's code as a dependency if you
wish to embed the models in production code outside of Spark.

Also if you wish to feed predictions in one at a time in that context it
makes the process a lot slower, thus it seems to me the old models are more
amenable to being used outside of Spark than the new models at this time.

Are there any plans to add the .predict() method back to the models in the
new API?

Regards,

James


Re: Is there a way to submit spark job to your by YARN REST API?

2016-03-22 Thread Saisai Shao
I'm afraid currently it is not supported by Spark to submit application
through Yarn REST API. However Yarn AMRMClient is functionally equal to
REST API, not sure which specific features are you referring?

Thanks
Saisai

On Tue, Mar 22, 2016 at 5:27 PM, tony@tendcloud.com <
tony@tendcloud.com> wrote:

> Hi, All,
> We are trying to build a data processing workflow which will call
> different spark jobs and we are using YARN. Because we want to constraint
> ACL for those spark jobs, so we need to submit spark job to use Yarn REST
> API( which we can pass application acl as parameters. So is there any Spark
> API which can support that?   If no, is there any third party solutions for
> that?
>
>
> Thanks and Regards,
>
>
> --
> 阎志涛(Tony)
>
> 北京腾云天下科技有限公司
> -
> ---
> 邮箱:tony@tendcloud.com
> 电话:13911815695
> 微信: zhitao_yan
> QQ : 4707059
> 地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
> 邮编:100027
>
> 
> TalkingData.com  - 让数据说话
>


trying to implement mini-batch GD in pyspark

2016-03-22 Thread sethirot
Hello,
I want to able to update an existing model with new data without the need to
do a batch GD again on all data. I would rather use the native mllib
functions and without the streaming module.

The way I thought about doing this is to use the *initialWeights* input
argument to load my previous found weights and use them to train a new batch
with a new RDD.

1) What I struggle to understand is that if *includeIntercept = False* then
the initial weights length is exactly the length of my input vectors.
However if *includeIntercept = True* then it seems to me that I would need
to *increase *the weights vector by *+1* for the algorithm to update the
intercept term. 
There is however no such option. This seems strange , taking into account
that the intercept should be considered as a regular weight ( at least
mathematically). 

2) the *StreamingLogisticRegressionWithSGD* function seems to do exactly
that. it uses an *update* function which itself uses
*LogisticRegressionWithSGD.train* with the already found weights, but
without the intercept weight.
This , to me, seems to give erroneous results.

3) the way I want to implement it then, is to use a pre-processing step to
increase the LabeledPoint feature vector by one more feature that will get a
value of 1 for all samples. this in affect will force the intercept to be
considered as a regular weight, and then I could use
*LogisticRegressionWithSGD.train* with *includeIntercept = False*

Is there any wrong in my logic regarding points 1,2. 
Also is point 3 a correct way ?

Thanks




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/trying-to-implement-mini-batch-GD-in-pyspark-tp26559.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Find all invoices more than 6 months from csv file

2016-03-22 Thread James Hammerton
On 21 March 2016 at 17:57, Mich Talebzadeh 
wrote:

>
> Hi,
>
> For test purposes I am ready a simple csv file as follows:
>
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load("/data/stg/table2")
> df: org.apache.spark.sql.DataFrame = [Invoice Number: string, Payment
> date: string, Net: string, VAT: string, Total: string]
>
> For this work I am interested in column "Payment Date" > 6 months old from
> today
>
> Data is stored in the following format for that column
>
> scala> df.select("Payment date").take(2)
> res40: Array[org.apache.spark.sql.Row] = Array([10/02/2014], [17/02/2014])
>
> stored as 'dd/MM/'
>
> The current time I get as
>
> scala> val today = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(),
> 'dd/MM/') ").collect.apply(0).getString(0)
> today: String = 21/03/2016
>
>
> So I want to filter the csv file
>
> scala>  df.filter(col("Payment date") < lit(today)).show(2)
> +--++-+-+-+
> |Invoice Number|Payment date|  Net|  VAT|Total|
> +--++-+-+-+
> |   360|  10/02/2014|?2,500.00|?0.00|?2,500.00|
> |   361|  17/02/2014|?2,500.00|?0.00|?2,500.00|
> +--++-+-+-+
>
>
> However, I want to use datediff() function here not just < today!
>
>
Could you not compute which the date of the 6 month cut-off point and use
that in place of today?

Looking at the api I see an add_month(), date_add() and date_sub() methods,
the first adds a number of months to a start date (would adding a -ve
number of months to the current date work?), the latter two add or subtract
a specified number of days to/from a date, these are available in 1.5.0
onwards.

Alternatively outside of the SQL api (e.g. in a UDF) you could use
something like:

val c = Calendar.getInstance()
> c.setTime(new Date(System.currentTimeMillis()))
> c.add(Calendar.MONTH, -6)
> val date: Date = c.getTime


Regards,

James




>
> Obviously one can store the file as a table and use SQL on it. However, I
> want to see if there are other ways using fp.
>
> Thanks
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


Re: Work out date column in CSV more than 6 months old (datediff or something)

2016-03-22 Thread Mich Talebzadeh
Thanks Silvio.

The problem I have is that somehow string comparison does not work.

Case in point

val df =
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header", "true").load("/data/stg/table2")
val current_date = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(),
'dd/MM/') ").collect.apply(0).getString(0)
df.filter(*lit(current_date) < col("Payment
date"*)).select(lit(current_date).alias("current_date"),
col("Payment date").alias("PaymentDate")).show(5)

It selects all the rows that are less than today's date (they are old).

++---+
|current_date|PaymentDate|
++---+
|  22/03/2016| 24/02/2014|
|  22/03/2016| 24/03/2014|
|  22/03/2016| 31/03/2015|
|  22/03/2016| 28/04/2014|
|  22/03/2016| 26/05/2014|
++---+

I don't know why this comparison is failing. May be it is comparing the
first two leftmost characters?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 22 March 2016 at 00:26, Silvio Fiorito 
wrote:

> There’s a months_between function you could use, as well:
>
> df.filter(months_between(current_date, $”Payment Date”) > 6).show
>
> From: Mich Talebzadeh 
> Date: Monday, March 21, 2016 at 5:53 PM
> To: "user @spark" 
> Subject: Work out date column in CSV more than 6 months old (datediff or
> something)
>
> Hi,
>
> For test purposes I am reading in a simple csv file as follows:
>
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load("/data/stg/table2")
> df: org.apache.spark.sql.DataFrame = [Invoice Number: string, Payment
> date: string, Net: string, VAT: string, Total: string]
>
> For this work I am interested in column "Payment Date" > 6 months old from
> today
>
> Data is stored in the following format for that column
>
> scala> df.select("Payment date").take(2)
> res40: Array[org.apache.spark.sql.Row] = Array([10/02/2014], [17/02/2014])
>
> stored as 'dd/MM/'
>
> The current time I get as
>
> scala> val today = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(),
> 'dd/MM/') ").collect.apply(0).getString(0)
> today: String = 21/03/2016
>
>
> So I want to filter the csv file
>
> scala>  df.filter(col("Payment date") < lit(today)).show(2)
> +--++-+-+-+
> |Invoice Number|Payment date|  Net|  VAT|Total|
> +--++-+-+-+
> |   360|  10/02/2014|?2,500.00|?0.00|?2,500.00|
> |   361|  17/02/2014|?2,500.00|?0.00|?2,500.00|
> +--++-+-+-+
>
>
> However, I want to use datediff() function here not just < today!
>
>
> Obviously one can store the file as a table and use SQL on it. However, I
> want to see if there are other ways using fp.
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


Re: ALS setIntermediateRDDStorageLevel

2016-03-22 Thread Sean Owen
No, it's been there since 1.1 and still is there:
setIntermediateRDDStorageLevel. Double-check your code.

On Mon, Mar 21, 2016 at 10:09 PM, Roberto Pagliari
 wrote:
> According to this thread
>
> http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-ALS-question-td15420.html
>
> There should be a function to set intermediate storage level in ALS.
> However, I’m getting method not found with Spark 1.6. Is it still available?
> If so, can I get to see a minimal example?
>
> Thank you,
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Best way to store Avro Objects as Parquet using SPARK

2016-03-22 Thread Manivannan Selvadurai
I should have phrased it differently,  Avro schema has additional
properties like required etc.. Right now the json data that I have gets
stored as optional fields in the parquet file. Is there a way to model the
parquet file schema, close to avro schema. I tried using the
sqc.read.schema(avroScehma).jsonRDD(jsonRDD).toDF()  but it has some issues
with longType data. I use the below code to convert the avro schema to
spark specific schema.

  def getSparkSchemaForAvro(sqc: SQLContext, avroSchema: Schema):
StructType = {
val dummyFIle = File.createTempFile("avroSchema_dummy", "avro")
val datumWriter = new GenericDatumWriter[wuser]()
datumWriter.setSchema(avroSchema)
val writer = new
DataFileWriter(datumWriter).create(wuser.getClassSchema, dummyFIle)
writer.flush()
writer.close()
val df =
sqc.read.format("com.databricks.spark.avro").load(dummyFIle.getAbsolutePath)
val sparkSchema = df.schema
sparkSchema
  }


   So the requirement is, how to validate the incoming data with
the avro schema, and handle the bad records as well apart from storing the
data in parquet format with schema matching the Avro schema that I have.


The approach I have taken is

Try converting the json to the avro object and return a tuple having string
and boolean (json, valid). and filter out valid records and write the json
data directly as parquet files. These parquet files have fields with type

message root {
  optional group FIELD_FOO {
optional binary string (UTF8);
  }
   .
   .
   .
}

similarly filter out the invalid records as corrupt data.

 This causes two scans on the rdds
1) filtering valid data
2) filtering invalid data.


 If there is a better approach please guide.



On Mon, Mar 21, 2016 at 11:07 PM, Michael Armbrust 
wrote:

> But when tired using Spark streamng I could not find a way to store the
>> data with the avro schema information. The closest that I got was to create
>> a Dataframe using the json RDDs and store them as parquet. Here the parquet
>> files had a spark specific schema in their footer.
>>
>
> Does this cause a problem?  This is just extra information that we use to
> store metadata that parquet doesn't directly support, but I would still
> expect other systems to be able to read it.
>


Re: Work out date column in CSV more than 6 months old (datediff or something)

2016-03-22 Thread James Hammerton
On 22 March 2016 at 10:57, Mich Talebzadeh 
wrote:

> Thanks Silvio.
>
> The problem I have is that somehow string comparison does not work.
>
> Case in point
>
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load("/data/stg/table2")
> val current_date = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(),
> 'dd/MM/') ").collect.apply(0).getString(0)
> df.filter(*lit(current_date) < col("Payment 
> date"*)).select(lit(current_date).alias("current_date"),
> col("Payment date").alias("PaymentDate")).show(5)
>
>
This is doing a string comparison not a date comparison (assuming "Payment
date" is of type String).

E.g.

scala> "22/03/2016" < "24/02/2015"
>
> res4: Boolean = true
>
>
>> scala> "22/03/2016" < "04/02/2015"
>
> res5: Boolean = false
>
>
This is the correct result for a string comparison but it's not the
comparison you want.

I think you need to convert the "Payment date" with "to_date" and compare
against that.

E.g. something like: df.filter(current_date() < to_date(col("Payment
date")))

Regards,

James



> It selects all the rows that are less than today's date (they are old).
>
> ++---+
> |current_date|PaymentDate|
> ++---+
> |  22/03/2016| 24/02/2014|
> |  22/03/2016| 24/03/2014|
> |  22/03/2016| 31/03/2015|
> |  22/03/2016| 28/04/2014|
> |  22/03/2016| 26/05/2014|
> ++---+
>
> I don't know why this comparison is failing. May be it is comparing the
> first two leftmost characters?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 22 March 2016 at 00:26, Silvio Fiorito 
> wrote:
>
>> There’s a months_between function you could use, as well:
>>
>> df.filter(months_between(current_date, $”Payment Date”) > 6).show
>>
>> From: Mich Talebzadeh 
>> Date: Monday, March 21, 2016 at 5:53 PM
>> To: "user @spark" 
>> Subject: Work out date column in CSV more than 6 months old (datediff or
>> something)
>>
>> Hi,
>>
>> For test purposes I am reading in a simple csv file as follows:
>>
>> val df =
>> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
>> "true").option("header", "true").load("/data/stg/table2")
>> df: org.apache.spark.sql.DataFrame = [Invoice Number: string, Payment
>> date: string, Net: string, VAT: string, Total: string]
>>
>> For this work I am interested in column "Payment Date" > 6 months old
>> from today
>>
>> Data is stored in the following format for that column
>>
>> scala> df.select("Payment date").take(2)
>> res40: Array[org.apache.spark.sql.Row] = Array([10/02/2014], [17/02/2014])
>>
>> stored as 'dd/MM/'
>>
>> The current time I get as
>>
>> scala> val today = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(),
>> 'dd/MM/') ").collect.apply(0).getString(0)
>> today: String = 21/03/2016
>>
>>
>> So I want to filter the csv file
>>
>> scala>  df.filter(col("Payment date") < lit(today)).show(2)
>> +--++-+-+-+
>> |Invoice Number|Payment date|  Net|  VAT|Total|
>> +--++-+-+-+
>> |   360|  10/02/2014|?2,500.00|?0.00|?2,500.00|
>> |   361|  17/02/2014|?2,500.00|?0.00|?2,500.00|
>> +--++-+-+-+
>>
>>
>> However, I want to use datediff() function here not just < today!
>>
>>
>> Obviously one can store the file as a table and use SQL on it. However, I
>> want to see if there are other ways using fp.
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>
>


Re: sliding Top N window

2016-03-22 Thread Jatin Kumar
Hello Yakubovich,

I have been looking into a similar problem. @Lars please note that he wants
to maintain the top N products over a sliding window, whereas the
CountMinSketh algorithm is useful if we want to maintain global top N
products list. Please correct me if I am wrong here.

I tried using CountMinSketch and realized that it doesn't suit my use case
as I also wanted to maintain top N over last H hours. CountMinSketch has no
notion of time, so in my understanding you cannot use that.

Yakubovich, you can try doing something like this:

val stream = 
// I am assuming that each entry is a comma separated list of product ids
// and product ID is a string (doesn't really matter though)
stream
 .flatMap(record => record.split(","))
 .map(pid => (pid, 1L))
 .reduceByKeyAndWindow(_ + _, _ - _, Seconds(S1), Seconds(S2))
 .foreachRDD(rdd => {
   // `rdd` here is of type (pid, count) and has frequency of each PID over
   // a sliding window of S1 seconds which moves by S2 seconds every time.

   implicit val order = new scala.Ordering[(String, Long)] {
 override def compare(a1: (String, Long), a2: (String, Long)):
Boolean = a1._2 > a2._2
   }

   val topNPidTuples = rdd.top(N)
   // do whatever you want here.
 })



--
Thanks
Jatin

On Tue, Mar 22, 2016 at 12:11 PM, Rishi Mishra 
wrote:

> Hi Alexy,
> We are also trying to solve similar problems using approximation. Would
> like to hear more about your usage.  We can discuss this offline without
> boring others.  :)
>
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>
> On Tue, Mar 22, 2016 at 1:19 AM, Lars Albertsson 
> wrote:
>
>> Hi,
>>
>> If you can accept approximate top N results, there is a neat solution
>> for this problem: Use an approximate Map structure called
>> Count-Min Sketch, in combination with a list of the M top items, where
>> M > N. When you encounter an item not in the top M, you look up its
>> count in the Count-Min Sketch do determine whether it qualifies.
>>
>> You will need to break down your event stream into time windows with a
>> certain time unit, e.g. minutes or hours, and keep one Count-Min
>> Sketch for each unit. The CMSs can be added, so you aggregate them to
>> form your sliding windows. You also keep a top M (aka "heavy hitters")
>> list for each window.
>>
>> The data structures required are surprisingly small, and will likely
>> fit in memory on a single machine, if it can handle the traffic
>> volume, so you might not need Spark at all. If you choose to use Spark
>> in order to benefit from windowing, be aware that Spark lumps events
>> in micro batches based on processing time, not event time.
>>
>> I made a presentation on approximate counting a couple of years ago.
>> Slides and video here:
>>
>> http://www.slideshare.net/lallea/scalable-real-time-processing-techniques-39990105
>> .
>> You can also search for presentation by Ted Dunning and Mikio Braun,
>> who have held good presentations on the subject.
>>
>> There are AFAIK two open source implementations of Count-Min Sketch,
>> one of them in Algebird.
>>
>> Let me know if anything is unclear.
>>
>> Good luck, and let us know how it goes.
>>
>> Regards,
>>
>>
>>
>> Lars Albertsson
>> Data engineering consultant
>> www.mapflat.com
>> +46 70 7687109
>>
>>
>> On Fri, Mar 11, 2016 at 9:09 PM, Yakubovich, Alexey
>>  wrote:
>> > Good day,
>> >
>> > I have a following task: a stream of “page vies” coming to kafka topic.
>> Each
>> > view contains list of product Ids from a visited page. The task: to
>> have in
>> > “real time” Top N product.
>> >
>> > I am interested in some solution that would require minimum intermediate
>> > writes … So  need to build a sliding window for top N product, where the
>> > product counters dynamically changes and window should present the TOP
>> > product for the specified period of time.
>> >
>> > I believe there is no way to avoid maintaining all product counters
>> counters
>> > in memory/storage.  But at least I would like to do all logic, all
>> > calculation on a fly, in memory, not spilling multiple RDD from memory
>> to
>> > disk.
>> >
>> > So I believe I see one way of doing it:
>> >Take, msg from kafka take and line up, all elementary action
>> (increase by
>> > 1 the counter for the product PID )
>> >   Each action will be implemented as a call to HTable.increment()  // or
>> > easier, with incrementColumnValue()…
>> >   After each increment I can apply my own operation “offer” would
>> provide
>> > that only top N products with counters are kept in another Hbase table
>> (also
>> > with atomic operations).
>> >  But there is another stream of events: decreasing product counters when
>> > view expires the legth of sliding window….
>> >
>> > So my question: does anybody know/have and can share the piece code/
>> know
>> > how: how to implement “sliding Top N window” better.
>> > If nothing will be offered, I will share what I will do myself.
>> >
>> >

Issue wihle applying filters/conditions in DataFrame in Spark

2016-03-22 Thread Hafsa Asif
Hello everyone,
I am trying to get benefits of DataFrames (to perform all SQL BASED
operations like 'Where Clause', Joining etc.) as mentioned in
https://spark.apache.org/docs/1.5.1/api/java/org/apache/spark/sql/DataFrame.html.

I am using, Aerospike and Spark (1.4.1) Java Client in Spring Framework .
 My scenario as below:
(My apparent guide is :
http://www.programcreek.com/java-api-examples/index.php?source_dir=deep-examples-master/deep-aerospike/)

1. I have a DeepSparkContext.
2. Preparing DataFrame from Aerospike in this way:

public DataFrame createDataFrame (String setName, String[] outputColumns,
String userId ){

DataFrame frame = null;

ExtractorConfig setCells = new ExtractorConfig<>(Cells.class);
setCells.putValue(ExtractorConstants.HOST,
configurationHandler.aerospikeHost())
.putValue(ExtractorConstants.PORT,
configurationHandler.aerospikePort())
.putValue(ExtractorConstants.NAMESPACE,
configurationHandler.getAerospikeNamespace())
.putValue(ExtractorConstants.SET, setName)
.putValue(ExtractorConstants.INPUT_COLUMNS, outputColumns)
   

setCells.setExtractorImplClass(AerospikeCellExtractor.class);

try {

frame= deepSparkContext.createJavaSchemaRDD(setCells);
} catch (UnsupportedDataTypeException undt){
log.error(undt.getMessage());
}

return frame;
}
3. In another method, I am processing DataFrame in this way:
   public void getActivePush (String userId, Integer status){

String[] activePushColumns = new String[] {"clientId", "userId",
"status"};
DataFrame activePushFrame = 
createDataFrame(configurationHandler.getAerospikePushActivationSet(),
activePushColumns, userId);
activePushFrame.registerTempTable("activePush");
DataFrame messagesFiltered = deepSparkContext.sql("SELECT * FROM
activePush");
log.debug("first : " + messagesFiltered.first());
}

The above method gives successfully the first element.
4. But, If I use :
DataFrame messagesFiltered = deepSparkContext.sql("SELECT * FROM activePush
WHERE userId = 'user1'");
It gives me the following exception:
[2016-03-22 13:40:15.413] boot - 10493 ERROR [main] --- SpringApplication:
Application startup failed
org.springframework.beans.factory.BeanCreationException: Error creating bean
with name 'MyAnalyzer': Invocation of init method failed; nested exception
is java.lang.NullPointerException
at
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:136)
at
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:408)
at
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1564)
at
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:539)
at
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:476)
at
org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:303)
at
org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
at
org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:299)
at
org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:194)
at
org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:755)
at
org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:762)
at
org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:480)
at
org.springframework.boot.SpringApplication.refresh(SpringApplication.java:690)
at
org.springframework.boot.SpringApplication.run(SpringApplication.java:322)
springframework.boot.SpringApplication.run(SpringApplication.java:970)
at
org.springframework.boot.SpringApplication.run(SpringApplication.java:959)
at
com.matchinguu.analytics.AnalyticalEngineApplication.main(AnalyticalEngineApplication.java:14)
Caused by: java.lang.NullPointerException

Please guide me that how to apply where clause conditions/filters in
DataFrame? I have also a plan to create multiple DataFrames and to apply
JOIN operation between them, it will be good if u give me some good examples
about it.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issue-wi

Re: sliding Top N window

2016-03-22 Thread Jatin Kumar
I am sorry, the signature of compare() is different. It should be:

   implicit val order = new scala.Ordering[(String, Long)] {
 override def compare(a1: (String, Long), a2: (String, Long)): Int = {
   a1._2.compareTo(a2._2)
 }
   }


--
Thanks
Jatin

On Tue, Mar 22, 2016 at 5:53 PM, Jatin Kumar 
wrote:

> Hello Yakubovich,
>
> I have been looking into a similar problem. @Lars please note that he
> wants to maintain the top N products over a sliding window, whereas the
> CountMinSketh algorithm is useful if we want to maintain global top N
> products list. Please correct me if I am wrong here.
>
> I tried using CountMinSketch and realized that it doesn't suit my use case
> as I also wanted to maintain top N over last H hours. CountMinSketch has no
> notion of time, so in my understanding you cannot use that.
>
> Yakubovich, you can try doing something like this:
>
> val stream = 
> // I am assuming that each entry is a comma separated list of product ids
> // and product ID is a string (doesn't really matter though)
> stream
>  .flatMap(record => record.split(","))
>  .map(pid => (pid, 1L))
>  .reduceByKeyAndWindow(_ + _, _ - _, Seconds(S1), Seconds(S2))
>  .foreachRDD(rdd => {
>// `rdd` here is of type (pid, count) and has frequency of each PID over
>// a sliding window of S1 seconds which moves by S2 seconds every time.
>
>implicit val order = new scala.Ordering[(String, Long)] {
>  override def compare(a1: (String, Long), a2: (String, Long)): Boolean = 
> a1._2 > a2._2
>}
>
>val topNPidTuples = rdd.top(N)
>// do whatever you want here.
>  })
>
>
>
> --
> Thanks
> Jatin
>
> On Tue, Mar 22, 2016 at 12:11 PM, Rishi Mishra 
> wrote:
>
>> Hi Alexy,
>> We are also trying to solve similar problems using approximation. Would
>> like to hear more about your usage.  We can discuss this offline without
>> boring others.  :)
>>
>> Regards,
>> Rishitesh Mishra,
>> SnappyData . (http://www.snappydata.io/)
>>
>> https://in.linkedin.com/in/rishiteshmishra
>>
>> On Tue, Mar 22, 2016 at 1:19 AM, Lars Albertsson 
>> wrote:
>>
>>> Hi,
>>>
>>> If you can accept approximate top N results, there is a neat solution
>>> for this problem: Use an approximate Map structure called
>>> Count-Min Sketch, in combination with a list of the M top items, where
>>> M > N. When you encounter an item not in the top M, you look up its
>>> count in the Count-Min Sketch do determine whether it qualifies.
>>>
>>> You will need to break down your event stream into time windows with a
>>> certain time unit, e.g. minutes or hours, and keep one Count-Min
>>> Sketch for each unit. The CMSs can be added, so you aggregate them to
>>> form your sliding windows. You also keep a top M (aka "heavy hitters")
>>> list for each window.
>>>
>>> The data structures required are surprisingly small, and will likely
>>> fit in memory on a single machine, if it can handle the traffic
>>> volume, so you might not need Spark at all. If you choose to use Spark
>>> in order to benefit from windowing, be aware that Spark lumps events
>>> in micro batches based on processing time, not event time.
>>>
>>> I made a presentation on approximate counting a couple of years ago.
>>> Slides and video here:
>>>
>>> http://www.slideshare.net/lallea/scalable-real-time-processing-techniques-39990105
>>> .
>>> You can also search for presentation by Ted Dunning and Mikio Braun,
>>> who have held good presentations on the subject.
>>>
>>> There are AFAIK two open source implementations of Count-Min Sketch,
>>> one of them in Algebird.
>>>
>>> Let me know if anything is unclear.
>>>
>>> Good luck, and let us know how it goes.
>>>
>>> Regards,
>>>
>>>
>>>
>>> Lars Albertsson
>>> Data engineering consultant
>>> www.mapflat.com
>>> +46 70 7687109
>>>
>>>
>>> On Fri, Mar 11, 2016 at 9:09 PM, Yakubovich, Alexey
>>>  wrote:
>>> > Good day,
>>> >
>>> > I have a following task: a stream of “page vies” coming to kafka
>>> topic. Each
>>> > view contains list of product Ids from a visited page. The task: to
>>> have in
>>> > “real time” Top N product.
>>> >
>>> > I am interested in some solution that would require minimum
>>> intermediate
>>> > writes … So  need to build a sliding window for top N product, where
>>> the
>>> > product counters dynamically changes and window should present the TOP
>>> > product for the specified period of time.
>>> >
>>> > I believe there is no way to avoid maintaining all product counters
>>> counters
>>> > in memory/storage.  But at least I would like to do all logic, all
>>> > calculation on a fly, in memory, not spilling multiple RDD from memory
>>> to
>>> > disk.
>>> >
>>> > So I believe I see one way of doing it:
>>> >Take, msg from kafka take and line up, all elementary action
>>> (increase by
>>> > 1 the counter for the product PID )
>>> >   Each action will be implemented as a call to HTable.increment()  //
>>> or
>>> > easier, with incrementColumnValue()…
>>> >   After each increment I can apply my own operat

Re: Issue wihle applying filters/conditions in DataFrame in Spark

2016-03-22 Thread Ted Yu
bq. Caused by: java.lang.NullPointerException

Can you show the remaining stack trace ?

Thanks

On Tue, Mar 22, 2016 at 5:43 AM, Hafsa Asif 
wrote:

> Hello everyone,
> I am trying to get benefits of DataFrames (to perform all SQL BASED
> operations like 'Where Clause', Joining etc.) as mentioned in
>
> https://spark.apache.org/docs/1.5.1/api/java/org/apache/spark/sql/DataFrame.html
> .
>
> I am using, Aerospike and Spark (1.4.1) Java Client in Spring Framework .
>  My scenario as below:
> (My apparent guide is :
>
> http://www.programcreek.com/java-api-examples/index.php?source_dir=deep-examples-master/deep-aerospike/
> )
>
> 1. I have a DeepSparkContext.
> 2. Preparing DataFrame from Aerospike in this way:
>
> public DataFrame createDataFrame (String setName, String[] outputColumns,
> String userId ){
>
> DataFrame frame = null;
>
> ExtractorConfig setCells = new ExtractorConfig<>(Cells.class);
> setCells.putValue(ExtractorConstants.HOST,
> configurationHandler.aerospikeHost())
> .putValue(ExtractorConstants.PORT,
> configurationHandler.aerospikePort())
> .putValue(ExtractorConstants.NAMESPACE,
> configurationHandler.getAerospikeNamespace())
> .putValue(ExtractorConstants.SET, setName)
> .putValue(ExtractorConstants.INPUT_COLUMNS, outputColumns)
>
>
> setCells.setExtractorImplClass(AerospikeCellExtractor.class);
>
> try {
>
> frame= deepSparkContext.createJavaSchemaRDD(setCells);
> } catch (UnsupportedDataTypeException undt){
> log.error(undt.getMessage());
> }
>
> return frame;
> }
> 3. In another method, I am processing DataFrame in this way:
>public void getActivePush (String userId, Integer status){
>
> String[] activePushColumns = new String[] {"clientId", "userId",
> "status"};
> DataFrame activePushFrame =
> createDataFrame(configurationHandler.getAerospikePushActivationSet(),
> activePushColumns, userId);
> activePushFrame.registerTempTable("activePush");
> DataFrame messagesFiltered = deepSparkContext.sql("SELECT * FROM
> activePush");
> log.debug("first : " + messagesFiltered.first());
> }
>
> The above method gives successfully the first element.
> 4. But, If I use :
> DataFrame messagesFiltered = deepSparkContext.sql("SELECT * FROM activePush
> WHERE userId = 'user1'");
> It gives me the following exception:
> [2016-03-22 13:40:15.413] boot - 10493 ERROR [main] --- SpringApplication:
> Application startup failed
> org.springframework.beans.factory.BeanCreationException: Error creating
> bean
> with name 'MyAnalyzer': Invocation of init method failed; nested exception
> is java.lang.NullPointerException
> at
>
> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:136)
> at
>
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:408)
> at
>
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1564)
> at
>
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:539)
> at
>
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:476)
> at
>
> org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:303)
> at
>
> org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
> at
>
> org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:299)
> at
>
> org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:194)
> at
>
> org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:755)
> at
>
> org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:762)
> at
>
> org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:480)
> at
>
> org.springframework.boot.SpringApplication.refresh(SpringApplication.java:690)
> at
> org.springframework.boot.SpringApplication.run(SpringApplication.java:322)
>
> springframework.boot.SpringApplication.run(SpringApplication.java:970)
> at
> org.springframework.boot.SpringApplication.run(SpringApplication.java:959)
> at
>
> com.matchinguu.analytics.AnalyticalEngineApplication.main(AnalyticalEngineApplication.java:14)
> Caused by: java.lang.NullP

Re: Issue wihle applying filters/conditions in DataFrame in Spark

2016-03-22 Thread Hafsa Asif
yes I know it is because of NullPointerEception, but could not understand
why? 
The complete stack trace is :
[2016-03-22 13:40:14.894] boot - 10493  WARN [main] ---
AnnotationConfigApplicationContext: Exception encountered during context
initialization - cancelling refresh attempt:
org.springframework.beans.factory.BeanCreationException: Error creating bean
with name 'MyAnalyzer': Invocation of init method failed; nested exception
is java.lang.NullPointerException
[2016-03-22 13:40:14.983] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
[2016-03-22 13:40:14.986] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
[2016-03-22 13:40:14.989] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/api,null}
[2016-03-22 13:40:14.994] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/,null}
[2016-03-22 13:40:15.001] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/static,null}
[2016-03-22 13:40:15.002] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
[2016-03-22 13:40:15.002] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null}
[2016-03-22 13:40:15.003] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/executors/json,null}
[2016-03-22 13:40:15.015] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/executors,null}
[2016-03-22 13:40:15.018] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/environment/json,null}
[2016-03-22 13:40:15.019] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/environment,null}
[2016-03-22 13:40:15.024] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
[2016-03-22 13:40:15.024] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/storage/rdd,null}
[2016-03-22 13:40:15.024] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/storage/json,null}
[2016-03-22 13:40:15.024] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/storage,null}
[2016-03-22 13:40:15.024] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null}
[2016-03-22 13:40:15.025] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/stages/pool,null}
[2016-03-22 13:40:15.025] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null}
[2016-03-22 13:40:15.025] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/stages/stage,null}
[2016-03-22 13:40:15.025] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/stages/json,null}
[2016-03-22 13:40:15.025] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/stages,null}
[2016-03-22 13:40:15.025] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
[2016-03-22 13:40:15.025] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
[2016-03-22 13:40:15.025] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
[2016-03-22 13:40:15.025] boot - 10493  INFO [main] --- ContextHandler:
stopped o.s.j.s.ServletContextHandler{/jobs,null}
[2016-03-22 13:40:15.077] boot - 10493  INFO [main] --- SparkUI: Stopped
Spark web UI at http://192.168.116.155:4040
[2016-03-22 13:40:15.081] boot - 10493  INFO [main] --- DAGScheduler:
Stopping DAGScheduler

[2016-03-22 13:40:15.413] boot - 10493 ERROR [main] --- SpringApplication:
Application startup failed
org.springframework.beans.factory.BeanCreationException: Error creating bean
with name 'MyAnalyzer': Invocation of init method failed; nested exception
is java.lang.NullPointerException
at
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:136)
at
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:408)
at
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1564)
at
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:539)
at
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:476)
   

Re: Issue wihle applying filters/conditions in DataFrame in Spark

2016-03-22 Thread Hafsa Asif
Even If I m using this query then also give NullPointerException:
"SELECT clientId FROM activePush"



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issue-wihle-applying-filters-conditions-in-DataFrame-in-Spark-tp26560p26562.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Issue wihle applying filters/conditions in DataFrame in Spark

2016-03-22 Thread Ted Yu
The NullPointerEx came from Spring. 

Which version of Spring do you use ?

Thanks 

> On Mar 22, 2016, at 6:08 AM, Hafsa Asif  wrote:
> 
> yes I know it is because of NullPointerEception, but could not understand
> why? 
> The complete stack trace is :
> [2016-03-22 13:40:14.894] boot - 10493  WARN [main] ---
> AnnotationConfigApplicationContext: Exception encountered during context
> initialization - cancelling refresh attempt:
> org.springframework.beans.factory.BeanCreationException: Error creating bean
> with name 'MyAnalyzer': Invocation of init method failed; nested exception
> is java.lang.NullPointerException
> [2016-03-22 13:40:14.983] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
> [2016-03-22 13:40:14.986] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
> [2016-03-22 13:40:14.989] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/api,null}
> [2016-03-22 13:40:14.994] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/,null}
> [2016-03-22 13:40:15.001] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/static,null}
> [2016-03-22 13:40:15.002] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
> [2016-03-22 13:40:15.002] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null}
> [2016-03-22 13:40:15.003] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/executors/json,null}
> [2016-03-22 13:40:15.015] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/executors,null}
> [2016-03-22 13:40:15.018] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/environment/json,null}
> [2016-03-22 13:40:15.019] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/environment,null}
> [2016-03-22 13:40:15.024] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
> [2016-03-22 13:40:15.024] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/storage/rdd,null}
> [2016-03-22 13:40:15.024] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/storage/json,null}
> [2016-03-22 13:40:15.024] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/storage,null}
> [2016-03-22 13:40:15.024] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null}
> [2016-03-22 13:40:15.025] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/stages/pool,null}
> [2016-03-22 13:40:15.025] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null}
> [2016-03-22 13:40:15.025] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/stages/stage,null}
> [2016-03-22 13:40:15.025] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/stages/json,null}
> [2016-03-22 13:40:15.025] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/stages,null}
> [2016-03-22 13:40:15.025] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
> [2016-03-22 13:40:15.025] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
> [2016-03-22 13:40:15.025] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
> [2016-03-22 13:40:15.025] boot - 10493  INFO [main] --- ContextHandler:
> stopped o.s.j.s.ServletContextHandler{/jobs,null}
> [2016-03-22 13:40:15.077] boot - 10493  INFO [main] --- SparkUI: Stopped
> Spark web UI at http://192.168.116.155:4040
> [2016-03-22 13:40:15.081] boot - 10493  INFO [main] --- DAGScheduler:
> Stopping DAGScheduler
> 
> [2016-03-22 13:40:15.413] boot - 10493 ERROR [main] --- SpringApplication:
> Application startup failed
> org.springframework.beans.factory.BeanCreationException: Error creating bean
> with name 'MyAnalyzer': Invocation of init method failed; nested exception
> is java.lang.NullPointerException
>at
> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:136)
>at
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:408)
>at
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1564)
>at
> org.spring

Spark schema evolution

2016-03-22 Thread gtinside
Hi ,

I have a table sourced from* 2 parquet files* with few extra columns in one
of the parquet file. Simple * queries works fine but queries with predicate
on extra column doesn't work and I get column not found

*Column resp_party_type exist in just one parquet file*

a) Query that work :
select resp_party_type  from operational_analytics 

b) Query that doesn't work : (complains about missing column
*resp_party_type *)
select category as Events, resp_party as Team, count(*) as Total from
operational_analytics where application = 'PeopleMover' and resp_party_type
= 'Team' group by category, resp_party

*Query Plan for (b)*
== Physical Plan ==
TungstenAggregate(key=[category#30986,resp_party#31006],
functions=[(count(1),mode=Final,isDistinct=false)],
output=[Events#36266,Team#36267,Total#36268L])
 TungstenExchange hashpartitioning(category#30986,resp_party#31006)
  TungstenAggregate(key=[category#30986,resp_party#31006],
functions=[(count(1),mode=Partial,isDistinct=false)],
output=[category#30986,resp_party#31006,currentCount#36272L])
   Project [category#30986,resp_party#31006]
Filter ((application#30983 = PeopleMover) && (resp_party_type#31007 =
Team))
 Scan
ParquetRelation[snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_peoplemover.parquet,snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_mis.parquet][category#30986,resp_party#31006,application#30983,resp_party_type#31007]


I have set spark.sql.parquet.mergeSchema = true and
spark.sql.parquet.filterPushdown = true. When I set
spark.sql.parquet.filterPushdown = false Query (b) starts working, execution
plan after setting the filterPushdown = false for Query(b)

== Physical Plan ==
TungstenAggregate(key=[category#30986,resp_party#31006],
functions=[(count(1),mode=Final,isDistinct=false)],
output=[Events#36313,Team#36314,Total#36315L])
 TungstenExchange hashpartitioning(category#30986,resp_party#31006)
  TungstenAggregate(key=[category#30986,resp_party#31006],
functions=[(count(1),mode=Partial,isDistinct=false)],
output=[category#30986,resp_party#31006,currentCount#36319L])
   Project [category#30986,resp_party#31006]
Filter ((application#30983 = PeopleMover) && (resp_party_type#31007 =
Team))
 Scan
ParquetRelation[snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_peoplemover.parquet,snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_mis.parquet][category#30986,resp_party#31006,application#30983,resp_party_type#31007]




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-schema-evolution-tp26563.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Issue wihle applying filters/conditions in DataFrame in Spark

2016-03-22 Thread Hafsa Asif
springBootVersion = '1.2.8.RELEASE'
springDIVersion = '0.5.4.RELEASE'
thriftGradleVersion = '0.3.1'


Other Gradle configs:
 compile "org.apache.thrift:libthrift:0.9.3"
compile 'org.slf4j:slf4j-api:1.7.14'

compile 'org.apache.kafka:kafka_2.11:0.9.0.0'
compile 'org.apache.kafka:kafka-clients:0.9.0.0'

compile 'javax.el:javax.el-api:3.0.1-b04'

compile 'org.apache.spark:spark-core_2.11:1.4.1'

compile 'org.springframework.data:spring-data-mongodb:1.8.4.RELEASE'

compile 'com.stratio.deep:deep-core:0.8.0'
compile 'com.stratio.deep:deep-commons:0.8.0'

compile 'org.json:json:20140107'
compile 'com.aerospike:aerospike-client:3.1.7'
compile 'org.springframework.data:spring-data-keyvalue:1.0.1.RELEASE'
compile 'com.stratio.deep:deep-aerospike:0.8.0'

compile 'org.apache.spark:spark-sql_2.11:1.4.1'
compile 'org.apache.spark:spark-mllib_2.11:1.4.1'

testCompile('org.springframework.boot:spring-boot-starter-test')
testCompile('com.sun.jdmk:jmxtools:1.2.1')



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issue-wihle-applying-filters-conditions-in-DataFrame-in-Spark-tp26560p26564.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



new object store driver for Spark

2016-03-22 Thread Gil Vernik
We recently released an object store connector for Spark. 
https://github.com/SparkTC/stocator
Currently this connector contains driver for the Swift based object store 
( like SoftLayer or any other Swift cluster ), but it can easily support 
additional object stores.
There is a pending patch to support Amazon S3 object store. 

The major highlight is that this connector doesn't create any temporary 
files  and so it achieves very fast response times when Spark persist data 
in the object store.
The new connector supports speculate mode and covers various failure 
scenarios ( like two Spark tasks writing into same object, partial 
corrupted data due to run time exceptions in Spark master, etc ).  It also 
covers https://issues.apache.org/jira/browse/SPARK-10063 and other known 
issues.

The detail algorithm for fault tolerance will be released very soon. For 
now, those who interested, can view the implementation in the code itself.

 https://github.com/SparkTC/stocator contains all the details how to setup 
and use with Spark.

A series of tests showed that the new connector obtains 70% improvements 
for write operations from Spark to Swift and about 30% improvements for 
read operations from Swift into Spark ( comparing to the existing driver 
that Spark uses to integrate with objects stored in Swift). 

There is an ongoing work to add more coverage and fix some known bugs / 
limitations.

All the best
Gil




Re: ALS setIntermediateRDDStorageLevel

2016-03-22 Thread Roberto Pagliari
I have and it¹s under class ALS private

On 22/03/2016 10:58, "Sean Owen"  wrote:

>No, it's been there since 1.1 and still is there:
>setIntermediateRDDStorageLevel. Double-check your code.
>
>On Mon, Mar 21, 2016 at 10:09 PM, Roberto Pagliari
> wrote:
>> According to this thread
>>
>> 
>>http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-ALS-question-td
>>15420.html
>>
>> There should be a function to set intermediate storage level in ALS.
>> However, I¹m getting method not found with Spark 1.6. Is it still
>>available?
>> If so, can I get to see a minimal example?
>>
>> Thank you,
>>


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Serialization issue with Spark

2016-03-22 Thread Hafsa Asif
Hello,
I am facing Spark serialization issue in Spark (1.4.1 - Java Client) with
Spring Framework. It is known that Spark needs serialization and it requires
every class need to be implemented with java.io.Serializable. But, in the
documentation link: http://spark.apache.org/docs/latest/tuning.html, it is
mentioned that it is not a good approach and better to use Kryo.
I am using Kryo in Spark configuration like this:
  public @Bean DeepSparkContext sparkContext(){
DeepSparkConfig conf = new DeepSparkConfig();
conf.setAppName(this.environment.getProperty("APP_NAME"))
.setMaster(master)
.set("spark.executor.memory",
this.environment.getProperty("SPARK_EXECUTOR_MEMORY"))
.set("spark.cores.max",
this.environment.getProperty("SPARK_CORES_MAX"))
.set("spark.default.parallelism",
this.environment.getProperty("SPARK_DEFAULT_PARALLELISM"));
conf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
return new DeepSparkContext(conf);
}

but still getting exception in Spark that 'Task is not serializable'. I also
donot want to make spark contect 'static'.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-with-Spark-tp26565.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ALS setIntermediateRDDStorageLevel

2016-03-22 Thread Sean Owen
https://github.com/apache/spark/blob/branch-1.6/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L185
?

On Tue, Mar 22, 2016 at 2:53 PM, Roberto Pagliari
 wrote:
> I have and it¹s under class ALS private
>
> On 22/03/2016 10:58, "Sean Owen"  wrote:
>
>>No, it's been there since 1.1 and still is there:
>>setIntermediateRDDStorageLevel. Double-check your code.
>>
>>On Mon, Mar 21, 2016 at 10:09 PM, Roberto Pagliari
>> wrote:
>>> According to this thread
>>>
>>>
>>>http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-ALS-question-td
>>>15420.html
>>>
>>> There should be a function to set intermediate storage level in ALS.
>>> However, I¹m getting method not found with Spark 1.6. Is it still
>>>available?
>>> If so, can I get to see a minimal example?
>>>
>>> Thank you,
>>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Serialization issue with Spark

2016-03-22 Thread Ted Yu
Can you show code snippet and the exception for 'Task is not serializable' ?

Please see related JIRA:
  SPARK-10251
whose pull request contains code for registering classes with Kryo.

Cheers

On Tue, Mar 22, 2016 at 7:00 AM, Hafsa Asif 
wrote:

> Hello,
> I am facing Spark serialization issue in Spark (1.4.1 - Java Client) with
> Spring Framework. It is known that Spark needs serialization and it
> requires
> every class need to be implemented with java.io.Serializable. But, in the
> documentation link: http://spark.apache.org/docs/latest/tuning.html, it is
> mentioned that it is not a good approach and better to use Kryo.
> I am using Kryo in Spark configuration like this:
>   public @Bean DeepSparkContext sparkContext(){
> DeepSparkConfig conf = new DeepSparkConfig();
> conf.setAppName(this.environment.getProperty("APP_NAME"))
> .setMaster(master)
> .set("spark.executor.memory",
> this.environment.getProperty("SPARK_EXECUTOR_MEMORY"))
> .set("spark.cores.max",
> this.environment.getProperty("SPARK_CORES_MAX"))
> .set("spark.default.parallelism",
> this.environment.getProperty("SPARK_DEFAULT_PARALLELISM"));
> conf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer");
> return new DeepSparkContext(conf);
> }
>
> but still getting exception in Spark that 'Task is not serializable'. I
> also
> donot want to make spark contect 'static'.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-with-Spark-tp26565.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark schema evolution

2016-03-22 Thread Chris Miller
With Avro you solve this by using a default value for the new field...
maybe Parquet is the same?


--
Chris Miller

On Tue, Mar 22, 2016 at 9:34 PM, gtinside  wrote:

> Hi ,
>
> I have a table sourced from* 2 parquet files* with few extra columns in one
> of the parquet file. Simple * queries works fine but queries with predicate
> on extra column doesn't work and I get column not found
>
> *Column resp_party_type exist in just one parquet file*
>
> a) Query that work :
> select resp_party_type  from operational_analytics
>
> b) Query that doesn't work : (complains about missing column
> *resp_party_type *)
> select category as Events, resp_party as Team, count(*) as Total from
> operational_analytics where application = 'PeopleMover' and resp_party_type
> = 'Team' group by category, resp_party
>
> *Query Plan for (b)*
> == Physical Plan ==
> TungstenAggregate(key=[category#30986,resp_party#31006],
> functions=[(count(1),mode=Final,isDistinct=false)],
> output=[Events#36266,Team#36267,Total#36268L])
>  TungstenExchange hashpartitioning(category#30986,resp_party#31006)
>   TungstenAggregate(key=[category#30986,resp_party#31006],
> functions=[(count(1),mode=Partial,isDistinct=false)],
> output=[category#30986,resp_party#31006,currentCount#36272L])
>Project [category#30986,resp_party#31006]
> Filter ((application#30983 = PeopleMover) && (resp_party_type#31007 =
> Team))
>  Scan
>
> ParquetRelation[snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_peoplemover.parquet,snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_mis.parquet][category#30986,resp_party#31006,application#30983,resp_party_type#31007]
>
>
> I have set spark.sql.parquet.mergeSchema = true and
> spark.sql.parquet.filterPushdown = true. When I set
> spark.sql.parquet.filterPushdown = false Query (b) starts working,
> execution
> plan after setting the filterPushdown = false for Query(b)
>
> == Physical Plan ==
> TungstenAggregate(key=[category#30986,resp_party#31006],
> functions=[(count(1),mode=Final,isDistinct=false)],
> output=[Events#36313,Team#36314,Total#36315L])
>  TungstenExchange hashpartitioning(category#30986,resp_party#31006)
>   TungstenAggregate(key=[category#30986,resp_party#31006],
> functions=[(count(1),mode=Partial,isDistinct=false)],
> output=[category#30986,resp_party#31006,currentCount#36319L])
>Project [category#30986,resp_party#31006]
> Filter ((application#30983 = PeopleMover) && (resp_party_type#31007 =
> Team))
>  Scan
>
> ParquetRelation[snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_peoplemover.parquet,snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_mis.parquet][category#30986,resp_party#31006,application#30983,resp_party_type#31007]
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-schema-evolution-tp26563.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Re: Is there a way to submit spark job to your by YARN REST API?

2016-03-22 Thread tony....@tendcloud.com
Hi, Saisai,
Thanks a lot for your reply. We want to have a way which we can control the 
user who submit spark jobs with program so that we can have security control on 
our data safety. So is there any good way for that? 



阎志涛(Tony)

北京腾云天下科技有限公司

邮箱:tony@tendcloud.com
电话:13911815695
微信: zhitao_yan
QQ : 4707059
地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
邮编:100027

TalkingData.com - 让数据说话
 
From: Saisai Shao
Date: 2016-03-22 18:03
To: tony@tendcloud.com
CC: user
Subject: Re: Is there a way to submit spark job to your by YARN REST API?
I'm afraid currently it is not supported by Spark to submit application through 
Yarn REST API. However Yarn AMRMClient is functionally equal to REST API, not 
sure which specific features are you referring?

Thanks
Saisai 

On Tue, Mar 22, 2016 at 5:27 PM, tony@tendcloud.com 
 wrote:
Hi, All,
We are trying to build a data processing workflow which will call different 
spark jobs and we are using YARN. Because we want to constraint ACL for those 
spark jobs, so we need to submit spark job to use Yarn REST API( which we can 
pass application acl as parameters. So is there any Spark API which can support 
that?   If no, is there any third party solutions for that?


Thanks and Regards,




阎志涛(Tony)

北京腾云天下科技有限公司

邮箱:tony@tendcloud.com
电话:13911815695
微信: zhitao_yan
QQ : 4707059
地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
邮编:100027

TalkingData.com - 让数据说话



Re: new object store driver for Spark

2016-03-22 Thread Benjamin Kim
Hi Gil,

Currently, our company uses S3 heavily for data storage. Can you further 
explain the benefits of this in relation to S3 when the pending patch does come 
out? Also, I have heard of Swift from others. Can you explain to me the pros 
and cons of Swift compared to HDFS? It can be just a brief summary if you like 
or just guide me to material that will help me get a better understanding.

Thanks,
Ben

> On Mar 22, 2016, at 6:35 AM, Gil Vernik  wrote:
> 
> We recently released an object store connector for Spark. 
> https://github.com/SparkTC/stocator 
> Currently this connector contains driver for the Swift based object store ( 
> like SoftLayer or any other Swift cluster ), but it can easily support 
> additional object stores.
> There is a pending patch to support Amazon S3 object store. 
> 
> The major highlight is that this connector doesn't create any temporary files 
>  and so it achieves very fast response times when Spark persist data in the 
> object store.
> The new connector supports speculate mode and covers various failure 
> scenarios ( like two Spark tasks writing into same object, partial corrupted 
> data due to run time exceptions in Spark master, etc ).  It also covers 
> https://issues.apache.org/jira/browse/SPARK-10063 
> and other known issues.
> 
> The detail algorithm for fault tolerance will be released very soon. For now, 
> those who interested, can view the implementation in the code itself.
> 
>  https://github.com/SparkTC/stocator 
> contains all the details how to setup 
> and use with Spark.
> 
> A series of tests showed that the new connector obtains 70% improvements for 
> write operations from Spark to Swift and about 30% improvements for read 
> operations from Swift into Spark ( comparing to the existing driver that 
> Spark uses to integrate with objects stored in Swift). 
> 
> There is an ongoing work to add more coverage and fix some known bugs / 
> limitations.
> 
> All the best
> Gil
> 



Direct Kafka input stream and window(…) function

2016-03-22 Thread Martin Soch

Hi all,

I am using direct-Kafka-input-stream in my Spark app. When I use 
window(...) function in the chain it will cause the processing pipeline 
to stop - when I open the Spark-UI I can see that the streaming batches 
are being queued and the pipeline reports to process one of the first 
batches.


To be more correct: the issue happens only when the windows overlap (if 
sliding_interval < window_length). Otherwise the system behaves as expected.


Derivations of window(..) function - like reduceByKeyAndWindow(..), etc. 
works also as expected - pipeline doesn't stop. The same applies when 
using different type of stream.


Is it some known limitation of window(..) function when used with 
direct-Kafka-input-stream ?


Java pseudo code:
org.apache.spark.streaming.kafka.DirectKafkaInputDStream s;
s.window(Durations.seconds(10)).print();  // the pipeline will stop

Thanks
Martin

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Handling Missing Values in MLLIB Decision Tree

2016-03-22 Thread Joseph Bradley
It does not currently handle surrogate splits.  You will need to preprocess
your data to remove or fill in missing values.  I'd recommend using the
DataFrame API for that since it comes with a number of na methods.
Joseph

On Thu, Mar 17, 2016 at 9:51 PM, Abir Chakraborty 
wrote:

> Hello,
>
>
>
> Can MLLIB Decision Tree (DT) handle missing values by having surrogate
> split (as it is currently being done in “rpart” library in R)?
>
>
>
> Thanks,
>
> Abir
> --
>
> *Principal Data Scientist, Data Science Group, Innovation Labs*
>
> *[24]**7 **Inc. - *The Intuitive Consumer Experience Company™ *|* *We
> make life simple for consumers to connect with companies to get things done*
>
> Mobile: +91-9880755850 *|* e-mail: abi...@247-inc.com
>   Prestige Tech Platina, Kadubeesanahalli, Marathahalli Outer Ring Road
> *|* Bangalore 560087 *|* India *|* www.247-inc.com
>
>
>


RE: Rename Several Aggregated Columns

2016-03-22 Thread Andres.Fernandez
Thank you! Yes that's the way to go taking care of selecting them in the proper 
order first. Added a select before the toDF and it does the trick.

From: Sunitha Kambhampati [mailto:skambha...@gmail.com]
Sent: Friday, March 18, 2016 5:46 PM
To: Fernandez, Andres
Cc: user@spark.apache.org
Subject: Re: Rename Several Aggregated Columns


One way is to rename the columns using the toDF

For eg:


val df = Seq((1, 2),(1,4),(2,3) ).toDF("a","b")
df.printSchema()

val renamedf = df.groupBy('a).agg(sum('b)).toDF("mycola", "mycolb")
renamedf.printSchema()
Best regards,
Sunitha

On Mar 18, 2016, at 9:10 AM, 
andres.fernan...@wellsfargo.com wrote:

Good morning. I have a dataframe and would like to group by on two fields, and 
perform a sum aggregation on more than 500 fields, though I would like to keep 
the same name for the 500 hundred fields (instead of sum(Field)). I do have the 
field names in an array. Could anybody help with this question please?



Re: Direct Kafka input stream and window(…) function

2016-03-22 Thread Cody Koeninger
I definitely have direct stream jobs that use window() without
problems... Can you post a minimal code example that reproduces the
problem?

Using print() will confuse the issue, since print() will try to only
use the first partition.

Use foreachRDD { rdd => rdd.foreach(println)

or something comparable

On Tue, Mar 22, 2016 at 10:14 AM, Martin Soch  wrote:
> Hi all,
>
> I am using direct-Kafka-input-stream in my Spark app. When I use window(...)
> function in the chain it will cause the processing pipeline to stop - when I
> open the Spark-UI I can see that the streaming batches are being queued and
> the pipeline reports to process one of the first batches.
>
> To be more correct: the issue happens only when the windows overlap (if
> sliding_interval < window_length). Otherwise the system behaves as expected.
>
> Derivations of window(..) function - like reduceByKeyAndWindow(..), etc.
> works also as expected - pipeline doesn't stop. The same applies when using
> different type of stream.
>
> Is it some known limitation of window(..) function when used with
> direct-Kafka-input-stream ?
>
> Java pseudo code:
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream s;
> s.window(Durations.seconds(10)).print();  // the pipeline will stop
>
> Thanks
> Martin
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Problem using saveAsNewAPIHadoopFile API

2016-03-22 Thread vetal king
We are using Spark 1.4 for Spark Streaming. Kafka is data source for the
Spark Stream.

Records are published on Kafka every second. Our requirement is to store
records published on Kafka in a single folder per minute. The stream will
read records every five seconds. For instance records published during 1200
PM and 1201PM are stored in folder "1200"; between 1201PM and 1202PM in
folder "1201" and so on.

The code I wrote is as follows

//First Group records in RDD by date
stream.foreachRDD (rddWithinStream -> {
JavaPairRDD> rddGroupedByDirectory =
rddWithinStream.mapToPair(t -> {
return new Tuple2 (targetHadoopFolder, t._2());
}).groupByKey();
// All records grouped by folders they will be stored in


// Create RDD for each target folder.
for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) {
JavaPairRDD > rddByKey =
rddGroupedByDirectory.filter(groupedTuples -> {
return groupedTuples._1().equals(hadoopFolder);
});

// And store it in Hadoop
rddByKey.saveAsNewAPIHadoopFile(directory, String.class,
String.class, TextOutputFormat.class);
}

Since the Stream processes data every five seconds, saveAsNewAPIHadoopFile
gets invoked multiple times in a minute. This causes "Part-0" file to
be overwritten every time.

I was expecting that in the directory specified by "directory" parameter,
saveAsNewAPIHadoopFile will keep creating part-N file even when I've a
sinlge worker node.

Any help/alternatives are greatly appreciated.

Thanks.


Re: Problem using saveAsNewAPIHadoopFile API

2016-03-22 Thread Cody Koeninger
If you want 1 minute granularity, why not use a 1 minute batch time?

Also, HDFS is not a great match for this kind of thing, because of the
small files issue.

On Tue, Mar 22, 2016 at 12:26 PM, vetal king  wrote:
> We are using Spark 1.4 for Spark Streaming. Kafka is data source for the
> Spark Stream.
>
> Records are published on Kafka every second. Our requirement is to store
> records published on Kafka in a single folder per minute. The stream will
> read records every five seconds. For instance records published during 1200
> PM and 1201PM are stored in folder "1200"; between 1201PM and 1202PM in
> folder "1201" and so on.
>
> The code I wrote is as follows
>
> //First Group records in RDD by date
> stream.foreachRDD (rddWithinStream -> {
> JavaPairRDD> rddGroupedByDirectory =
> rddWithinStream.mapToPair(t -> {
> return new Tuple2 (targetHadoopFolder, t._2());
> }).groupByKey();
> // All records grouped by folders they will be stored in
>
>
> // Create RDD for each target folder.
> for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) {
> JavaPairRDD > rddByKey =
> rddGroupedByDirectory.filter(groupedTuples -> {
> return groupedTuples._1().equals(hadoopFolder);
> });
>
> // And store it in Hadoop
> rddByKey.saveAsNewAPIHadoopFile(directory, String.class, String.class,
> TextOutputFormat.class);
> }
>
> Since the Stream processes data every five seconds, saveAsNewAPIHadoopFile
> gets invoked multiple times in a minute. This causes "Part-0" file to be
> overwritten every time.
>
> I was expecting that in the directory specified by "directory" parameter,
> saveAsNewAPIHadoopFile will keep creating part-N file even when I've a
> sinlge worker node.
>
> Any help/alternatives are greatly appreciated.
>
> Thanks.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem using saveAsNewAPIHadoopFile API

2016-03-22 Thread vetal king
Hi Cody,

Thanks for your reply.

Five seconds batch and one min publishing interval is just a representative
example. What we want is, to group data over a certain frequency. That
frequency is configurable. One way we think it can be achieved is
"directory"  will be created per this frequency, and in this directory we
will create folders at when the stream receives data. Something like

rddByKey.saveAsNewAPIHadoopFile(directory + "-" + , String.class, String.class,
TextOutputFormat.class).

But I think it will be too much of nested directory structure, and it
sounds too inefficient as well. since there will be a lot of small files.

Shridhar




On Tue, Mar 22, 2016 at 11:00 PM, Cody Koeninger  wrote:

> If you want 1 minute granularity, why not use a 1 minute batch time?
>
> Also, HDFS is not a great match for this kind of thing, because of the
> small files issue.
>
> On Tue, Mar 22, 2016 at 12:26 PM, vetal king  wrote:
> > We are using Spark 1.4 for Spark Streaming. Kafka is data source for the
> > Spark Stream.
> >
> > Records are published on Kafka every second. Our requirement is to store
> > records published on Kafka in a single folder per minute. The stream will
> > read records every five seconds. For instance records published during
> 1200
> > PM and 1201PM are stored in folder "1200"; between 1201PM and 1202PM in
> > folder "1201" and so on.
> >
> > The code I wrote is as follows
> >
> > //First Group records in RDD by date
> > stream.foreachRDD (rddWithinStream -> {
> > JavaPairRDD> rddGroupedByDirectory =
> > rddWithinStream.mapToPair(t -> {
> > return new Tuple2 (targetHadoopFolder, t._2());
> > }).groupByKey();
> > // All records grouped by folders they will be stored in
> >
> >
> > // Create RDD for each target folder.
> > for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) {
> > JavaPairRDD > rddByKey =
> > rddGroupedByDirectory.filter(groupedTuples -> {
> > return groupedTuples._1().equals(hadoopFolder);
> > });
> >
> > // And store it in Hadoop
> > rddByKey.saveAsNewAPIHadoopFile(directory, String.class,
> String.class,
> > TextOutputFormat.class);
> > }
> >
> > Since the Stream processes data every five seconds,
> saveAsNewAPIHadoopFile
> > gets invoked multiple times in a minute. This causes "Part-0" file
> to be
> > overwritten every time.
> >
> > I was expecting that in the directory specified by "directory" parameter,
> > saveAsNewAPIHadoopFile will keep creating part-N file even when I've
> a
> > sinlge worker node.
> >
> > Any help/alternatives are greatly appreciated.
> >
> > Thanks.
>


Expose spark pre-computed data via thrift server

2016-03-22 Thread rjtokenring
Hi, I'd like to submit a possible use case and have some guidance on the
overall architecture.
I have 2 different datasources (a relational PostgreSQL and a Cassandra
cluster) and I'd like to provide to user the ability to query data 'joining'
the 2 worlds.
So, an idea that comes to my mind is: pre-process data and create 2
dataframes, 1 for PG and 1 for cassandra and register dataframes as tables
in Hive. Then enable thrift server and connect from an external application
via hive JDBC.
In this way, a 3rd party user can perform its own queries on both the DBs,
joining as per need.
>From a mock-up code, this seems to work, but I'm a bit converned about how
spark is handling such use case.
Let's say:
-> PG DB ->> DATAFRAME 1 ->> registered as Hive table DB1
-> CASANDRA DB ->> DATAFRAME 2 ->> registered as Hive table DB2

What happens when a user via thrift server submit a query like 'select ...
from DB1 JOIN DB2 ON ... WHERE ...'?
Are connections to both DBs kept opened or are they reopened at need (i.e.,
is there a way to setup a 'connection pool'/'connection cache')?
Do I have to persist(memory + disk) these dataframes in order to don't
overload databases?
Is spark's embedded thrift server robust enough for such use cases? Is there
any procution use of this component?

Thanks to everybody!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Expose-spark-pre-computed-data-via-thrift-server-tp26567.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Additional classpaths / java options

2016-03-22 Thread Ashic Mahtab
Hello,Is it possible to specify additional class paths / java options "in 
addition to" those specified in spark-defaults.conf? I see that if I specify 
spark.executor.extraJavaOptions or spark.executor.extraClassPaths in defaults, 
and then specify --conf 
"spark.executor.extraJavaOptions=-Dlog4.configuration=file:///some.properties" 
or --conf "spark.executor.extraClassPaths=/usr/lib/something/*" when spark 
submitting, then the spark-submit-passed value completely replaces anything in 
defaults. Is there a way to add some java options and class paths by default, 
and have the spark-submit passed ones be used in addition to the defaults?
Thanks,Ashic. 

Expose spark pre-computed data via thrift server

2016-03-22 Thread rjtokenring
Hi, I'd like to submit a possible use case and have some guidance on the
overall architecture. 
I have 2 different datasources (a relational PostgreSQL and a Cassandra
cluster) and I'd like to provide to user the ability to query data 'joining'
the 2 worlds. 
So, an idea that comes to my mind is: pre-process data and create 2
dataframes, 1 for PG and 1 for cassandra and register dataframes as tables
in Hive. Then enable thrift server and connect from an external application
via hive JDBC. 
In this way, a 3rd party user can perform its own queries on both the DBs,
joining as per need. 
>From a mock-up code, this seems to work, but I'm a bit converned about how
spark is handling such use case. 
Let's say: 
-> PG DB ->> DATAFRAME 1 ->> registered as Hive table DB1 
-> CASANDRA DB ->> DATAFRAME 2 ->> registered as Hive table DB2 

What happens when a user via thrift server submit a query like 'select ...
from DB1 JOIN DB2 ON ... WHERE ...'? 
Are connections to both DBs kept opened or are they reopened at need (i.e.,
is there a way to setup a 'connection pool'/'connection cache')? 
Do I have to persist(memory + disk) these dataframes in order to don't
overload databases? 
Is spark's embedded thrift server robust enough for such use cases? Is there
any procution use of this component? 

Thanks to everybody! 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Expose-spark-pre-computed-data-via-thrift-server-tp26568.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem using saveAsNewAPIHadoopFile API

2016-03-22 Thread Sebastian Piu
As you said, create a folder for each different minute, you can use the
rdd.time also as a timestamp.

Also you might want to have a look at the window function for the batching


On Tue, 22 Mar 2016, 17:43 vetal king,  wrote:

> Hi Cody,
>
> Thanks for your reply.
>
> Five seconds batch and one min publishing interval is just a
> representative example. What we want is, to group data over a certain
> frequency. That frequency is configurable. One way we think it can be
> achieved is "directory"  will be created per this frequency, and in this
> directory we will create folders at when the stream receives data.
> Something like
>
> rddByKey.saveAsNewAPIHadoopFile(directory + "-" +  milliseconds OR some random number>, String.class, String.class,
> TextOutputFormat.class).
>
> But I think it will be too much of nested directory structure, and it
> sounds too inefficient as well. since there will be a lot of small files.
>
> Shridhar
>
>
>
>
> On Tue, Mar 22, 2016 at 11:00 PM, Cody Koeninger 
> wrote:
>
>> If you want 1 minute granularity, why not use a 1 minute batch time?
>>
>> Also, HDFS is not a great match for this kind of thing, because of the
>> small files issue.
>>
>> On Tue, Mar 22, 2016 at 12:26 PM, vetal king 
>> wrote:
>> > We are using Spark 1.4 for Spark Streaming. Kafka is data source for the
>> > Spark Stream.
>> >
>> > Records are published on Kafka every second. Our requirement is to store
>> > records published on Kafka in a single folder per minute. The stream
>> will
>> > read records every five seconds. For instance records published during
>> 1200
>> > PM and 1201PM are stored in folder "1200"; between 1201PM and 1202PM in
>> > folder "1201" and so on.
>> >
>> > The code I wrote is as follows
>> >
>> > //First Group records in RDD by date
>> > stream.foreachRDD (rddWithinStream -> {
>> > JavaPairRDD> rddGroupedByDirectory =
>> > rddWithinStream.mapToPair(t -> {
>> > return new Tuple2 (targetHadoopFolder, t._2());
>> > }).groupByKey();
>> > // All records grouped by folders they will be stored in
>> >
>> >
>> > // Create RDD for each target folder.
>> > for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) {
>> > JavaPairRDD > rddByKey =
>> > rddGroupedByDirectory.filter(groupedTuples -> {
>> > return groupedTuples._1().equals(hadoopFolder);
>> > });
>> >
>> > // And store it in Hadoop
>> > rddByKey.saveAsNewAPIHadoopFile(directory, String.class,
>> String.class,
>> > TextOutputFormat.class);
>> > }
>> >
>> > Since the Stream processes data every five seconds,
>> saveAsNewAPIHadoopFile
>> > gets invoked multiple times in a minute. This causes "Part-0" file
>> to be
>> > overwritten every time.
>> >
>> > I was expecting that in the directory specified by "directory"
>> parameter,
>> > saveAsNewAPIHadoopFile will keep creating part-N file even when
>> I've a
>> > sinlge worker node.
>> >
>> > Any help/alternatives are greatly appreciated.
>> >
>> > Thanks.
>>
>
>


Re: Spark Metrics Framework?

2016-03-22 Thread Mike Sukmanowsky
The Source class is private

to the spark package and any new Sources added to the metrics registry must
be of type Source
.
So unless I'm mistaken, we can't define a custom source. I linked to 1.4.1
code, but the same is true in 1.6.1.

On Mon, 21 Mar 2016 at 12:05 Silvio Fiorito 
wrote:

> You could use the metric sources and sinks described here:
> http://spark.apache.org/docs/latest/monitoring.html#metrics
>
> If you want to push the metrics to another system you can define a custom
> sink. You can also extend the metrics by defining a custom source.
>
> From: Mike Sukmanowsky 
> Date: Monday, March 21, 2016 at 11:54 AM
> To: "user@spark.apache.org" 
> Subject: Spark Metrics Framework?
>
> We make extensive use of the elasticsearch-hadoop library for
> Hadoop/Spark. In trying to troubleshoot our Spark applications, it'd be
> very handy to have access to some of the many metrics
> 
> that the library makes available when running in map reduce mode. The 
> library's
> author noted
> 
> that Spark doesn't offer any kind of a similar metrics API where by these
> metrics could be reported or aggregated on.
>
> Are there any plans to bring a metrics framework similar to Hadoop's
> Counter system to Spark or is there an alternative means for us to grab
> metrics exposed when using Hadoop APIs to load/save RDDs?
>
> Thanks,
> Mike
>


Re: Spark Metrics Framework?

2016-03-22 Thread Ted Yu
See related thread:

http://search-hadoop.com/m/q3RTtuwg442GBwKh

On Tue, Mar 22, 2016 at 12:13 PM, Mike Sukmanowsky <
mike.sukmanow...@gmail.com> wrote:

> The Source class is private
> 
> to the spark package and any new Sources added to the metrics registry must
> be of type Source
> .
> So unless I'm mistaken, we can't define a custom source. I linked to 1.4.1
> code, but the same is true in 1.6.1.
>
> On Mon, 21 Mar 2016 at 12:05 Silvio Fiorito 
> wrote:
>
>> You could use the metric sources and sinks described here:
>> http://spark.apache.org/docs/latest/monitoring.html#metrics
>>
>> If you want to push the metrics to another system you can define a custom
>> sink. You can also extend the metrics by defining a custom source.
>>
>> From: Mike Sukmanowsky 
>> Date: Monday, March 21, 2016 at 11:54 AM
>> To: "user@spark.apache.org" 
>> Subject: Spark Metrics Framework?
>>
>> We make extensive use of the elasticsearch-hadoop library for
>> Hadoop/Spark. In trying to troubleshoot our Spark applications, it'd be
>> very handy to have access to some of the many metrics
>> 
>> that the library makes available when running in map reduce mode. The 
>> library's
>> author noted
>> 
>> that Spark doesn't offer any kind of a similar metrics API where by these
>> metrics could be reported or aggregated on.
>>
>> Are there any plans to bring a metrics framework similar to Hadoop's
>> Counter system to Spark or is there an alternative means for us to grab
>> metrics exposed when using Hadoop APIs to load/save RDDs?
>>
>> Thanks,
>> Mike
>>
>


Re: Spark Metrics Framework?

2016-03-22 Thread Silvio Fiorito
Hi Mike,

It’s been a while since I worked on a custom Source but I think all you need to 
do is make your Source in the org.apache.spark package.

Thanks,
Silvio

From: Mike Sukmanowsky 
mailto:mike.sukmanow...@gmail.com>>
Date: Tuesday, March 22, 2016 at 3:13 PM
To: Silvio Fiorito 
mailto:silvio.fior...@granturing.com>>, 
"user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: Re: Spark Metrics Framework?

The Source class is 
private
 to the spark package and any new Sources added to the metrics registry must be 
of type 
Source.
 So unless I'm mistaken, we can't define a custom source. I linked to 1.4.1 
code, but the same is true in 1.6.1.

On Mon, 21 Mar 2016 at 12:05 Silvio Fiorito 
mailto:silvio.fior...@granturing.com>> wrote:
You could use the metric sources and sinks described here: 
http://spark.apache.org/docs/latest/monitoring.html#metrics

If you want to push the metrics to another system you can define a custom sink. 
You can also extend the metrics by defining a custom source.

From: Mike Sukmanowsky 
mailto:mike.sukmanow...@gmail.com>>
Date: Monday, March 21, 2016 at 11:54 AM
To: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: Spark Metrics Framework?

We make extensive use of the elasticsearch-hadoop library for Hadoop/Spark. In 
trying to troubleshoot our Spark applications, it'd be very handy to have 
access to some of the many 
metrics
 that the library makes available when running in map reduce mode. The 
library's author 
noted 
that Spark doesn't offer any kind of a similar metrics API where by these 
metrics could be reported or aggregated on.

Are there any plans to bring a metrics framework similar to Hadoop's Counter 
system to Spark or is there an alternative means for us to grab metrics exposed 
when using Hadoop APIs to load/save RDDs?

Thanks,
Mike


Re: sliding Top N window

2016-03-22 Thread Lars Albertsson
@Jatin, I touched that case briefly in the linked presentation.

You will have to decide on a time slot size, and then aggregate slots
to form windows. E.g. if you select a time slot of an hour, you build
a CMS and a heavy hitter list for the current hour slot, and start new
ones at 00 minutes. In order to form e.g. a 12 hour window, the
12-hour CMS is calculated as the sum of the 12 hour slot CMSs, and the
12-hour heavy hitters is the union of the hour slot heavy hitters.

Since the data structures are small, one can afford using small time
slots. One can also keep a long history with different combinations of
time windows by pushing out CMSs and heavy hitters to e.g. Kafka, and
have different stream processors that aggregate different time windows
and push results to Kafka or to lookup tables.


Lars Albertsson
Data engineering consultant
www.mapflat.com
+46 70 7687109


On Tue, Mar 22, 2016 at 1:23 PM, Jatin Kumar  wrote:
> Hello Yakubovich,
>
> I have been looking into a similar problem. @Lars please note that he wants
> to maintain the top N products over a sliding window, whereas the
> CountMinSketh algorithm is useful if we want to maintain global top N
> products list. Please correct me if I am wrong here.
>
> I tried using CountMinSketch and realized that it doesn't suit my use case
> as I also wanted to maintain top N over last H hours. CountMinSketch has no
> notion of time, so in my understanding you cannot use that.
>
> Yakubovich, you can try doing something like this:
>
> val stream = 
> // I am assuming that each entry is a comma separated list of product ids
> // and product ID is a string (doesn't really matter though)
> stream
>  .flatMap(record => record.split(","))
>  .map(pid => (pid, 1L))
>  .reduceByKeyAndWindow(_ + _, _ - _, Seconds(S1), Seconds(S2))
>  .foreachRDD(rdd => {
>// `rdd` here is of type (pid, count) and has frequency of each PID over
>// a sliding window of S1 seconds which moves by S2 seconds every time.
>
>implicit val order = new scala.Ordering[(String, Long)] {
>  override def compare(a1: (String, Long), a2: (String, Long)): Boolean =
> a1._2 > a2._2
>}
>
>val topNPidTuples = rdd.top(N)
>// do whatever you want here.
>  })
>
>
>
> --
> Thanks
> Jatin
>
> On Tue, Mar 22, 2016 at 12:11 PM, Rishi Mishra 
> wrote:
>>
>> Hi Alexy,
>> We are also trying to solve similar problems using approximation. Would
>> like to hear more about your usage.  We can discuss this offline without
>> boring others.  :)
>>
>> Regards,
>> Rishitesh Mishra,
>> SnappyData . (http://www.snappydata.io/)
>>
>> https://in.linkedin.com/in/rishiteshmishra
>>
>> On Tue, Mar 22, 2016 at 1:19 AM, Lars Albertsson 
>> wrote:
>>>
>>> Hi,
>>>
>>> If you can accept approximate top N results, there is a neat solution
>>> for this problem: Use an approximate Map structure called
>>> Count-Min Sketch, in combination with a list of the M top items, where
>>> M > N. When you encounter an item not in the top M, you look up its
>>> count in the Count-Min Sketch do determine whether it qualifies.
>>>
>>> You will need to break down your event stream into time windows with a
>>> certain time unit, e.g. minutes or hours, and keep one Count-Min
>>> Sketch for each unit. The CMSs can be added, so you aggregate them to
>>> form your sliding windows. You also keep a top M (aka "heavy hitters")
>>> list for each window.
>>>
>>> The data structures required are surprisingly small, and will likely
>>> fit in memory on a single machine, if it can handle the traffic
>>> volume, so you might not need Spark at all. If you choose to use Spark
>>> in order to benefit from windowing, be aware that Spark lumps events
>>> in micro batches based on processing time, not event time.
>>>
>>> I made a presentation on approximate counting a couple of years ago.
>>> Slides and video here:
>>>
>>> http://www.slideshare.net/lallea/scalable-real-time-processing-techniques-39990105.
>>> You can also search for presentation by Ted Dunning and Mikio Braun,
>>> who have held good presentations on the subject.
>>>
>>> There are AFAIK two open source implementations of Count-Min Sketch,
>>> one of them in Algebird.
>>>
>>> Let me know if anything is unclear.
>>>
>>> Good luck, and let us know how it goes.
>>>
>>> Regards,
>>>
>>>
>>>
>>> Lars Albertsson
>>> Data engineering consultant
>>> www.mapflat.com
>>> +46 70 7687109
>>>
>>>
>>> On Fri, Mar 11, 2016 at 9:09 PM, Yakubovich, Alexey
>>>  wrote:
>>> > Good day,
>>> >
>>> > I have a following task: a stream of “page vies” coming to kafka topic.
>>> > Each
>>> > view contains list of product Ids from a visited page. The task: to
>>> > have in
>>> > “real time” Top N product.
>>> >
>>> > I am interested in some solution that would require minimum
>>> > intermediate
>>> > writes … So  need to build a sliding window for top N product, where
>>> > the
>>> > product counters dynamically changes and window should present the TOP
>>> > product for the specified period of time.

Mapping csv columns and reformatting date string

2016-03-22 Thread Mich Talebzadeh
Hi,

I have the following CSV load

val df =
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header", "true").load("/data/stg/table2")

I have defined this UDF

def ChangeDate(word : String) : String = {
   return
word.substring(6,10)+"-"+word.substring(3,5)+"-"+word.substring(0,2)
}

I use the following mapping

scala> df.map(x => (x(1).toString,
x(1).toString.substring(6,10)+"-"+x(1).toString.substring(3,5)+"-"+x(1).toString.substring(0,2))).take(1)
res20: Array[(String, String)] = Array((10/02/2014,2014-02-10))

Now rather than using that longwinded substring can I use some variation of
that UDF.

This does not work

scala> df.map(x => (x(1).toString, changeDate(x(1).toString))
 | )
:22: error: not found: value changeDate
  df.map(x => (x(1).toString, changeDate(x(1).toString))

Any ideas from experts?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


Re: Mapping csv columns and reformatting date string

2016-03-22 Thread Mich Talebzadeh
Just being too lazy. should define it as custom UDF

def ChangeDate(word : String) : String = {
   return
word.substring(6,10)+"-"+word.substring(3,5)+"-"+word.substring(0,2)
}

Register it as custom UDF


sqlContext.udf.register("ChangeDate", ChangeDate(_:String))

And use it in mapping

scala> df.map(x => (x(1).toString, ChangeDate(x(1).toString))).take(1)
res40: Array[(String, String)] = Array((10/02/2014,2014-02-10))

Cheers

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 22 March 2016 at 22:10, Mich Talebzadeh 
wrote:

> Hi,
>
> I have the following CSV load
>
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load("/data/stg/table2")
>
> I have defined this UDF
>
> def ChangeDate(word : String) : String = {
>return
> word.substring(6,10)+"-"+word.substring(3,5)+"-"+word.substring(0,2)
> }
>
> I use the following mapping
>
> scala> df.map(x => (x(1).toString,
> x(1).toString.substring(6,10)+"-"+x(1).toString.substring(3,5)+"-"+x(1).toString.substring(0,2))).take(1)
> res20: Array[(String, String)] = Array((10/02/2014,2014-02-10))
>
> Now rather than using that longwinded substring can I use some variation
> of that UDF.
>
> This does not work
>
> scala> df.map(x => (x(1).toString, changeDate(x(1).toString))
>  | )
> :22: error: not found: value changeDate
>   df.map(x => (x(1).toString, changeDate(x(1).toString))
>
> Any ideas from experts?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


Re: Spark schema evolution

2016-03-22 Thread Michael Armbrust
Which version of Spark?  This sounds like a bug (that might be fixed).

On Tue, Mar 22, 2016 at 6:34 AM, gtinside  wrote:

> Hi ,
>
> I have a table sourced from* 2 parquet files* with few extra columns in one
> of the parquet file. Simple * queries works fine but queries with predicate
> on extra column doesn't work and I get column not found
>
> *Column resp_party_type exist in just one parquet file*
>
> a) Query that work :
> select resp_party_type  from operational_analytics
>
> b) Query that doesn't work : (complains about missing column
> *resp_party_type *)
> select category as Events, resp_party as Team, count(*) as Total from
> operational_analytics where application = 'PeopleMover' and resp_party_type
> = 'Team' group by category, resp_party
>
> *Query Plan for (b)*
> == Physical Plan ==
> TungstenAggregate(key=[category#30986,resp_party#31006],
> functions=[(count(1),mode=Final,isDistinct=false)],
> output=[Events#36266,Team#36267,Total#36268L])
>  TungstenExchange hashpartitioning(category#30986,resp_party#31006)
>   TungstenAggregate(key=[category#30986,resp_party#31006],
> functions=[(count(1),mode=Partial,isDistinct=false)],
> output=[category#30986,resp_party#31006,currentCount#36272L])
>Project [category#30986,resp_party#31006]
> Filter ((application#30983 = PeopleMover) && (resp_party_type#31007 =
> Team))
>  Scan
>
> ParquetRelation[snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_peoplemover.parquet,snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_mis.parquet][category#30986,resp_party#31006,application#30983,resp_party_type#31007]
>
>
> I have set spark.sql.parquet.mergeSchema = true and
> spark.sql.parquet.filterPushdown = true. When I set
> spark.sql.parquet.filterPushdown = false Query (b) starts working,
> execution
> plan after setting the filterPushdown = false for Query(b)
>
> == Physical Plan ==
> TungstenAggregate(key=[category#30986,resp_party#31006],
> functions=[(count(1),mode=Final,isDistinct=false)],
> output=[Events#36313,Team#36314,Total#36315L])
>  TungstenExchange hashpartitioning(category#30986,resp_party#31006)
>   TungstenAggregate(key=[category#30986,resp_party#31006],
> functions=[(count(1),mode=Partial,isDistinct=false)],
> output=[category#30986,resp_party#31006,currentCount#36319L])
>Project [category#30986,resp_party#31006]
> Filter ((application#30983 = PeopleMover) && (resp_party_type#31007 =
> Team))
>  Scan
>
> ParquetRelation[snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_peoplemover.parquet,snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_mis.parquet][category#30986,resp_party#31006,application#30983,resp_party_type#31007]
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-schema-evolution-tp26563.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Using Spark to retrieve a HDFS file protected by Kerberos

2016-03-22 Thread Nkechi Achara
I am having issues setting up my spark environment to read from a
kerberized HDFS file location.

At the moment I have tried to do the following:

def ugiDoAs[T](ugi:   Option[UserGroupInformation])(code: => T) = ugi match
{
case None => code
case Some(u) => u.doAs(new PrivilegedExceptionAction[T] {
  override def run(): T = code }) }

val sparkConf =
defaultSparkConf.setAppName("file-test").setMaster("yarn-client")

val sc = ugiDoAs(ugi) {new SparkContext(conf)}

val file = sc.textFile("path")

It fails at the point of creating the Spark Context, with the following
error:

Exception in thread "main"
org.apache.hadoop.security.AccessControlException: SIMPLE authentication is
not enabled. Available:[TOKEN, KERBEROS] at
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at
org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53) at
org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104)
at
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterMetrics(ApplicationClientProtocolPBClientImpl.java:155)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497) at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)


Has anyone got a simple example on how to allow spark to connect to a
kerberized HDFS location?

I know that spark needs to be in Yarn mode to be able to make it work, but
the login method does not seem to be working in this respect. Although I
know that the User Group Information (ugi) object is valid as I have used
it to connect to ZK in the same object and HBase.


Cached Parquet file paths problem

2016-03-22 Thread Piotr Smoliński
Hi,

After migration from Spark 1.5.2 to 1.6.1 I faced strange issue. I have a
Parquet directory
with partitions. Each partition (month) is a subject of incremental ETL
that takes current
Avro files and replaces the corresponding Parquet files.

Now there is a problem that appeared in 1.6.x:
I have a couple of derived data frames. After ETL finishes all RDDs and
DataFrames are
properly recreated, but for some reason the originally captured file paths
are retained.
Of course due to the override some paths are gone.

As a result I am getting exceptions as shown below. As I mentioned it all
worked flawlessly
in Spark 1.5.x, i.e. after ETL the engine nicely read the new directory
structure.

Is there any setting to restore the previous behaviour?

Regards,
Piotr

org.apache.spark.SparkException: Job aborted due to stage failure: Task 7
in stage 32.0 failed 1 times, most recent failure: Lost task 7.0 in stage
32.0 (TID 386, localhost): java.io.FileNotFoundException: File does not
exist:
hdfs://demo.sample/apps/demo/transactions/month=2015-09-01/part-r-00026-792365f9-d1f5-4a70-a3d4-e0b87f6ee087.gz.parquet
at
org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1122)
at
org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385)
at
org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:157)
at
org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
at
org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.(SqlNewHadoopRDD.scala:180)
at
org.apache.spark.rdd.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:126)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Re: pyspark sql convert long to timestamp?

2016-03-22 Thread Andy Davidson
Thanks

createdAt is a long

from_unixtime(createdAt / 1000, '-MM-dd HH:mm:ss z') as fromUnix,

Worked


From:  Akhil Das 
Date:  Monday, March 21, 2016 at 11:56 PM
To:  Andrew Davidson 
Cc:  "user @spark" 
Subject:  Re: pyspark sql convert long to timestamp?

> Have a look at the from_unixtime() functions.
> https://spark.apache.org/docs/1.5.0/api/python/_modules/pyspark/sql/functions.
> html#from_unixtime
> 
> Thanks
> Best Regards
> 
> On Tue, Mar 22, 2016 at 4:49 AM, Andy Davidson 
> wrote:
>> Any idea how I have a col in a data frame that is of type long any idea how I
>> create a column who¹s type is time stamp?
>> 
>> The long is unix epoch in ms
>> 
>> Thanks
>> 
>> Andy
> 




MLlib for Spark

2016-03-22 Thread Abid Muslim Malik
Dear all,

Is there any library/framework for Spark that uses accelerators for machine
learning algorithms?

Thanks,

-- 
Abid M. Malik
**
"I have learned silence from the talkative, toleration from the intolerant,
and kindness from the unkind"---Gibran
"Success is not for the chosen few, but for the few who choose" --- John
Maxwell
"Being a good person does not depend on your religion or status in life,
your race or skin color, political views or culture. IT DEPENDS ON HOW GOOD
YOU TREAT OTHERS"--- Abid
"The Universe is talking to us, and the language of the Universe is
mathematics."Abid


DataFrame vs RDD

2016-03-22 Thread asethia
Hi,

I am new to Spark, would like to know any guidelines when to use Data Frame
vs. RDD. 

Thanks,
As





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-vs-RDD-tp26570.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DataFrame vs RDD

2016-03-22 Thread Jeff Zhang
Please check the offical doc

http://spark.apache.org/docs/latest/


On Wed, Mar 23, 2016 at 10:08 AM, asethia  wrote:

> Hi,
>
> I am new to Spark, would like to know any guidelines when to use Data Frame
> vs. RDD.
>
> Thanks,
> As
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-vs-RDD-tp26570.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


Re: DataFrame vs RDD

2016-03-22 Thread Vinay Kashyap
DataFrame is when there is a schema associated with your RDD..
For any of your transformation on the data, you have a defined schema then
it is always advised to use DataFrame as there are efficient supporting
APIs for the same..
It is neatly explained in the official docs..

Thanks and regards
Vinay Kashyap
On Wed, Mar 23, 2016 at 7:56 AM Jeff Zhang  wrote:

> Please check the offical doc
>
> http://spark.apache.org/docs/latest/
>
>
> On Wed, Mar 23, 2016 at 10:08 AM, asethia  wrote:
>
>> Hi,
>>
>> I am new to Spark, would like to know any guidelines when to use Data
>> Frame
>> vs. RDD.
>>
>> Thanks,
>> As
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-vs-RDD-tp26570.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: sliding Top N window

2016-03-22 Thread Jatin Kumar
Lar, can you please point to an example?
On Mar 23, 2016 2:16 AM, "Lars Albertsson"  wrote:

> @Jatin, I touched that case briefly in the linked presentation.
>
> You will have to decide on a time slot size, and then aggregate slots
> to form windows. E.g. if you select a time slot of an hour, you build
> a CMS and a heavy hitter list for the current hour slot, and start new
> ones at 00 minutes. In order to form e.g. a 12 hour window, the
> 12-hour CMS is calculated as the sum of the 12 hour slot CMSs, and the
> 12-hour heavy hitters is the union of the hour slot heavy hitters.
>
> Since the data structures are small, one can afford using small time
> slots. One can also keep a long history with different combinations of
> time windows by pushing out CMSs and heavy hitters to e.g. Kafka, and
> have different stream processors that aggregate different time windows
> and push results to Kafka or to lookup tables.
>
>
> Lars Albertsson
> Data engineering consultant
> www.mapflat.com
> +46 70 7687109
>
>
> On Tue, Mar 22, 2016 at 1:23 PM, Jatin Kumar 
> wrote:
> > Hello Yakubovich,
> >
> > I have been looking into a similar problem. @Lars please note that he
> wants
> > to maintain the top N products over a sliding window, whereas the
> > CountMinSketh algorithm is useful if we want to maintain global top N
> > products list. Please correct me if I am wrong here.
> >
> > I tried using CountMinSketch and realized that it doesn't suit my use
> case
> > as I also wanted to maintain top N over last H hours. CountMinSketch has
> no
> > notion of time, so in my understanding you cannot use that.
> >
> > Yakubovich, you can try doing something like this:
> >
> > val stream = 
> > // I am assuming that each entry is a comma separated list of product ids
> > // and product ID is a string (doesn't really matter though)
> > stream
> >  .flatMap(record => record.split(","))
> >  .map(pid => (pid, 1L))
> >  .reduceByKeyAndWindow(_ + _, _ - _, Seconds(S1), Seconds(S2))
> >  .foreachRDD(rdd => {
> >// `rdd` here is of type (pid, count) and has frequency of each PID
> over
> >// a sliding window of S1 seconds which moves by S2 seconds every
> time.
> >
> >implicit val order = new scala.Ordering[(String, Long)] {
> >  override def compare(a1: (String, Long), a2: (String, Long)):
> Boolean =
> > a1._2 > a2._2
> >}
> >
> >val topNPidTuples = rdd.top(N)
> >// do whatever you want here.
> >  })
> >
> >
> >
> > --
> > Thanks
> > Jatin
> >
> > On Tue, Mar 22, 2016 at 12:11 PM, Rishi Mishra 
> > wrote:
> >>
> >> Hi Alexy,
> >> We are also trying to solve similar problems using approximation. Would
> >> like to hear more about your usage.  We can discuss this offline without
> >> boring others.  :)
> >>
> >> Regards,
> >> Rishitesh Mishra,
> >> SnappyData . (http://www.snappydata.io/)
> >>
> >> https://in.linkedin.com/in/rishiteshmishra
> >>
> >> On Tue, Mar 22, 2016 at 1:19 AM, Lars Albertsson 
> >> wrote:
> >>>
> >>> Hi,
> >>>
> >>> If you can accept approximate top N results, there is a neat solution
> >>> for this problem: Use an approximate Map structure called
> >>> Count-Min Sketch, in combination with a list of the M top items, where
> >>> M > N. When you encounter an item not in the top M, you look up its
> >>> count in the Count-Min Sketch do determine whether it qualifies.
> >>>
> >>> You will need to break down your event stream into time windows with a
> >>> certain time unit, e.g. minutes or hours, and keep one Count-Min
> >>> Sketch for each unit. The CMSs can be added, so you aggregate them to
> >>> form your sliding windows. You also keep a top M (aka "heavy hitters")
> >>> list for each window.
> >>>
> >>> The data structures required are surprisingly small, and will likely
> >>> fit in memory on a single machine, if it can handle the traffic
> >>> volume, so you might not need Spark at all. If you choose to use Spark
> >>> in order to benefit from windowing, be aware that Spark lumps events
> >>> in micro batches based on processing time, not event time.
> >>>
> >>> I made a presentation on approximate counting a couple of years ago.
> >>> Slides and video here:
> >>>
> >>>
> http://www.slideshare.net/lallea/scalable-real-time-processing-techniques-39990105
> .
> >>> You can also search for presentation by Ted Dunning and Mikio Braun,
> >>> who have held good presentations on the subject.
> >>>
> >>> There are AFAIK two open source implementations of Count-Min Sketch,
> >>> one of them in Algebird.
> >>>
> >>> Let me know if anything is unclear.
> >>>
> >>> Good luck, and let us know how it goes.
> >>>
> >>> Regards,
> >>>
> >>>
> >>>
> >>> Lars Albertsson
> >>> Data engineering consultant
> >>> www.mapflat.com
> >>> +46 70 7687109
> >>>
> >>>
> >>> On Fri, Mar 11, 2016 at 9:09 PM, Yakubovich, Alexey
> >>>  wrote:
> >>> > Good day,
> >>> >
> >>> > I have a following task: a stream of “page vies” coming to kafka
> topic.
> >>> > Each
> >>> > view contains list of product Ids from a visite

Re: DataFrame vs RDD

2016-03-22 Thread Arun Sethia
Thanks Vinay.

Is it fair to say creating RDD and Creating DataFrame from Cassandra uses
SparkSQL, with help of Spark-Cassandra Connector API?

On Tue, Mar 22, 2016 at 9:32 PM, Vinay Kashyap  wrote:

> DataFrame is when there is a schema associated with your RDD..
> For any of your transformation on the data, you have a defined schema then
> it is always advised to use DataFrame as there are efficient supporting
> APIs for the same..
> It is neatly explained in the official docs..
>
> Thanks and regards
> Vinay Kashyap
>
> On Wed, Mar 23, 2016 at 7:56 AM Jeff Zhang  wrote:
>
>> Please check the offical doc
>>
>> http://spark.apache.org/docs/latest/
>>
>>
>> On Wed, Mar 23, 2016 at 10:08 AM, asethia  wrote:
>>
>>> Hi,
>>>
>>> I am new to Spark, would like to know any guidelines when to use Data
>>> Frame
>>> vs. RDD.
>>>
>>> Thanks,
>>> As
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-vs-RDD-tp26570.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


[Critical] Issue with cached RDDs created from hadoop sequence files

2016-03-22 Thread Thamme Gowda N.
Hi spark experts,

I am facing issues with cached RDDs. I noticed that few entries
get duplicated for n times when the RDD is cached.

I asked a question on Stackoverflow with my code snippet to reproduce it.

I really appreciate  if you can visit
http://stackoverflow.com/q/36168827/1506477
and answer my question / give your comments.

Or at the least confirm that it is a bug.

Thanks in advance for your help!

--
Thamme


Re: DataFrame vs RDD

2016-03-22 Thread asethia
creating RDD is done via spark context where as creating Dataframe is from
sqlcontext...

so Dataframe is part of sparksql where as RDD is spark core



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-vs-RDD-tp26570p26573.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DataFrame vs RDD

2016-03-22 Thread Vinay Kashyap
As mentioned earlier, since DataFrame is associated with schema... It makes
sense to be created from sqlContext.. So ur statement holds true with that
understanding..
On Wed, Mar 23, 2016 at 8:28 AM asethia  wrote:

> creating RDD is done via spark context where as creating Dataframe is from
> sqlcontext...
>
> so Dataframe is part of sparksql where as RDD is spark core
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-vs-RDD-tp26570p26573.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Re: Is there a way to submit spark job to your by YARN REST API?

2016-03-22 Thread Saisai Shao
My question is this ACL control is provided by Yarn or you have an in-house
facility to handle this?

If you're referring to this #ContainerLaunchContext#setApplicationACLs, I
think current Spark on Yarn doesn't support this. From feature level, this
is doable in the current yarn/client code, no need to use REST API.


 setApplicationACLs

(Map

https://hadoop.apache.org/docs/stable2/api/org/apache/hadoop/yarn/api/records/ApplicationAccessType.html>
,String

> acls)



On Tue, Mar 22, 2016 at 10:28 PM, tony@tendcloud.com <
tony@tendcloud.com> wrote:

> Hi, Saisai,
> Thanks a lot for your reply. We want to have a way which we can control
> the user who submit spark jobs with program so that we can have security
> control on our data safety. So is there any good way for that?
>
> --
> 阎志涛(Tony)
>
> 北京腾云天下科技有限公司
> -
> ---
> 邮箱:tony@tendcloud.com
> 电话:13911815695
> 微信: zhitao_yan
> QQ : 4707059
> 地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
> 邮编:100027
>
> 
> TalkingData.com  - 让数据说话
>
>
> *From:* Saisai Shao 
> *Date:* 2016-03-22 18:03
> *To:* tony@tendcloud.com
> *CC:* user 
> *Subject:* Re: Is there a way to submit spark job to your by YARN REST
> API?
> I'm afraid currently it is not supported by Spark to submit application
> through Yarn REST API. However Yarn AMRMClient is functionally equal to
> REST API, not sure which specific features are you referring?
>
> Thanks
> Saisai
>
> On Tue, Mar 22, 2016 at 5:27 PM, tony@tendcloud.com <
> tony@tendcloud.com> wrote:
>
>> Hi, All,
>> We are trying to build a data processing workflow which will call
>> different spark jobs and we are using YARN. Because we want to constraint
>> ACL for those spark jobs, so we need to submit spark job to use Yarn REST
>> API( which we can pass application acl as parameters. So is there any Spark
>> API which can support that?   If no, is there any third party solutions for
>> that?
>>
>>
>> Thanks and Regards,
>>
>>
>> --
>> 阎志涛(Tony)
>>
>> 北京腾云天下科技有限公司
>> -
>> ---
>> 邮箱:tony@tendcloud.com
>> 电话:13911815695
>> 微信: zhitao_yan
>> QQ : 4707059
>> 地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
>> 邮编:100027
>>
>> 
>> TalkingData.com  - 让数据说话
>>
>
>


Does anyone install netlib-java on AWS EMR Spark?

2016-03-22 Thread greg huang
Hi All,

   I want to enable the netlib-java feather for Spark ML module base on AWS
EMR. But the Spark cluster has install spark default except I install it
myself and configure all the cluster. Does anyone have some idea to just
enable the netlib-java base on the standard EMR Spark cluster?

Regards,
   Greg


Re: [Critical] Issue with cached RDDs created from hadoop sequence files

2016-03-22 Thread Jeff Zhang
Looks like a spark bug. I can reproduce it for sequence file, but it works
for text file.

On Wed, Mar 23, 2016 at 10:56 AM, Thamme Gowda N.  wrote:

> Hi spark experts,
>
> I am facing issues with cached RDDs. I noticed that few entries
> get duplicated for n times when the RDD is cached.
>
> I asked a question on Stackoverflow with my code snippet to reproduce it.
>
> I really appreciate  if you can visit
> http://stackoverflow.com/q/36168827/1506477
> and answer my question / give your comments.
>
> Or at the least confirm that it is a bug.
>
> Thanks in advance for your help!
>
> --
> Thamme
>



-- 
Best Regards

Jeff Zhang


Re: [Critical] Issue with cached RDDs created from hadoop sequence files

2016-03-22 Thread Jeff Zhang
I think I got the root cause, you can use Text.toString() to solve this
issue.  Because the Text is shared so the last record display multiple
times.

On Wed, Mar 23, 2016 at 11:37 AM, Jeff Zhang  wrote:

> Looks like a spark bug. I can reproduce it for sequence file, but it works
> for text file.
>
> On Wed, Mar 23, 2016 at 10:56 AM, Thamme Gowda N. 
> wrote:
>
>> Hi spark experts,
>>
>> I am facing issues with cached RDDs. I noticed that few entries
>> get duplicated for n times when the RDD is cached.
>>
>> I asked a question on Stackoverflow with my code snippet to reproduce it.
>>
>> I really appreciate  if you can visit
>> http://stackoverflow.com/q/36168827/1506477
>> and answer my question / give your comments.
>>
>> Or at the least confirm that it is a bug.
>>
>> Thanks in advance for your help!
>>
>> --
>> Thamme
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>



-- 
Best Regards

Jeff Zhang


Re: [Critical] Issue with cached RDDs created from hadoop sequence files

2016-03-22 Thread Jeff Zhang
Zhan's reply on stackoverflow is correct.


down vote

Please refer to the comments in sequenceFile.

/** Get an RDD for a Hadoop SequenceFile with given key and value types. *
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable
object for each * record, directly caching the returned RDD or directly
passing it to an aggregation or shuffle * operation will create many
references to the same object. * If you plan to directly cache, sort, or
aggregate Hadoop writable objects, you should first * copy them using
a map function.
*/



On Wed, Mar 23, 2016 at 11:58 AM, Jeff Zhang  wrote:

> I think I got the root cause, you can use Text.toString() to solve this
> issue.  Because the Text is shared so the last record display multiple
> times.
>
> On Wed, Mar 23, 2016 at 11:37 AM, Jeff Zhang  wrote:
>
>> Looks like a spark bug. I can reproduce it for sequence file, but it
>> works for text file.
>>
>> On Wed, Mar 23, 2016 at 10:56 AM, Thamme Gowda N. 
>> wrote:
>>
>>> Hi spark experts,
>>>
>>> I am facing issues with cached RDDs. I noticed that few entries
>>> get duplicated for n times when the RDD is cached.
>>>
>>> I asked a question on Stackoverflow with my code snippet to reproduce it.
>>>
>>> I really appreciate  if you can visit
>>> http://stackoverflow.com/q/36168827/1506477
>>> and answer my question / give your comments.
>>>
>>> Or at the least confirm that it is a bug.
>>>
>>> Thanks in advance for your help!
>>>
>>> --
>>> Thamme
>>>
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>



-- 
Best Regards

Jeff Zhang


Re: [Critical] Issue with cached RDDs created from hadoop sequence files

2016-03-22 Thread Thamme Gowda N.
Hi Jeff,

Yes, you are absolutely right.
It is because of the RecordReader reusing the Writable Instance. I did not
anticipate this as it worked for text files.

Thank you so much for doing this.
 Your answer is accepted!


Best,
Thamme



--
*Thamme Gowda N. *
Grad Student at usc.edu
Twitter: @thammegowda
Website : http://scf.usc.edu/~tnarayan/

On Tue, Mar 22, 2016 at 9:00 PM, Jeff Zhang  wrote:

> Zhan's reply on stackoverflow is correct.
>
>
> down vote
>
> Please refer to the comments in sequenceFile.
>
> /** Get an RDD for a Hadoop SequenceFile with given key and value types. *
> * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable
> object for each * record, directly caching the returned RDD or directly
> passing it to an aggregation or shuffle * operation will create many
> references to the same object. * If you plan to directly cache, sort, or
> aggregate Hadoop writable objects, you should first * copy them using a
> map function. */
>
>
>
> On Wed, Mar 23, 2016 at 11:58 AM, Jeff Zhang  wrote:
>
>> I think I got the root cause, you can use Text.toString() to solve this
>> issue.  Because the Text is shared so the last record display multiple
>> times.
>>
>> On Wed, Mar 23, 2016 at 11:37 AM, Jeff Zhang  wrote:
>>
>>> Looks like a spark bug. I can reproduce it for sequence file, but it
>>> works for text file.
>>>
>>> On Wed, Mar 23, 2016 at 10:56 AM, Thamme Gowda N. 
>>> wrote:
>>>
 Hi spark experts,

 I am facing issues with cached RDDs. I noticed that few entries
 get duplicated for n times when the RDD is cached.

 I asked a question on Stackoverflow with my code snippet to reproduce
 it.

 I really appreciate  if you can visit
 http://stackoverflow.com/q/36168827/1506477
 and answer my question / give your comments.

 Or at the least confirm that it is a bug.

 Thanks in advance for your help!

 --
 Thamme

>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


find the matching and get the value

2016-03-22 Thread Divya Gehlot
Hi,
I am using Spark1.5.2
My requirement is as below

df.withColumn("NoOfDays",lit(datediff(df("Start_date"),df("end_date"


Now have to add one more columnn where my datediff(Start_date,end_date))
should match with map keys

Map looks like MyMap(1->1D,2->2D,3->3M,4->4W)

I want to do something like this

> val
> condition= MyMap.contains(lit(datediff(df("END_DATE"),df("START_DATE"
> val geId =MyMap(datediff(df("END_DATE"),df("START_DATE")))
> df.withColumn("AddColumn",when(cond,lit(getId)))


Is it possible ?

What I am missing here ..
I am beginner in scala and Spark.

Would really appreciate the help.

Thanks,
Divya


Re: Problem using saveAsNewAPIHadoopFile API

2016-03-22 Thread Surendra , Manchikanti
Hi Vetal,

You may try with MultiOutPutFormat instead of TextOutPutFormat in
saveAsNewAPIHadoopFile().

Regards,
Surendra M

-- Surendra Manchikanti

On Tue, Mar 22, 2016 at 10:26 AM, vetal king  wrote:

> We are using Spark 1.4 for Spark Streaming. Kafka is data source for the
> Spark Stream.
>
> Records are published on Kafka every second. Our requirement is to store
> records published on Kafka in a single folder per minute. The stream will
> read records every five seconds. For instance records published during 1200
> PM and 1201PM are stored in folder "1200"; between 1201PM and 1202PM in
> folder "1201" and so on.
>
> The code I wrote is as follows
>
> //First Group records in RDD by date
> stream.foreachRDD (rddWithinStream -> {
> JavaPairRDD> rddGroupedByDirectory = 
> rddWithinStream.mapToPair(t -> {
> return new Tuple2 (targetHadoopFolder, t._2());
> }).groupByKey();
> // All records grouped by folders they will be stored in
>
>
> // Create RDD for each target folder.
> for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) {
> JavaPairRDD > rddByKey = 
> rddGroupedByDirectory.filter(groupedTuples -> {
> return groupedTuples._1().equals(hadoopFolder);
> });
>
> // And store it in Hadoop
> rddByKey.saveAsNewAPIHadoopFile(directory, String.class, String.class, 
> TextOutputFormat.class);
> }
>
> Since the Stream processes data every five seconds, saveAsNewAPIHadoopFile
> gets invoked multiple times in a minute. This causes "Part-0" file to
> be overwritten every time.
>
> I was expecting that in the directory specified by "directory" parameter,
> saveAsNewAPIHadoopFile will keep creating part-N file even when I've a
> sinlge worker node.
>
> Any help/alternatives are greatly appreciated.
>
> Thanks.
>