Re: Spark Streaming reads from stdin or output from command line utility

2015-06-12 Thread Gerard Maas
Would using the socketTextStream and `yourApp | nc -lk ` work?? Not
sure how resilient the socket receiver is though. I've been playing with it
for a little demo and I don't understand yet its reconnection behavior.

Although I would think that putting some elastic buffer in between would be
a good idea to decouple producer from consumer. Kafka would be my first
choice.

-kr, Gerard.

On Fri, Jun 12, 2015 at 8:46 AM, Heath Guo  wrote:

>  Yes, it is lots of data, and the utility I'm working with prints out
> infinite real time data stream. Thanks.
>
>
>   From: Tathagata Das 
> Date: Thursday, June 11, 2015 at 11:43 PM
>
> To: Heath Guo 
> Cc: user 
> Subject: Re: Spark Streaming reads from stdin or output from command line
> utility
>
>   Is it a lot of data that is expected to come through stdin? I mean is
> it even worth parallelizing the computation using something like Spark
> Streaming?
>
> On Thu, Jun 11, 2015 at 9:56 PM, Heath Guo  wrote:
>
>>   Thanks for your reply! In my use case, it would be stream from only
>> one stdin. Also I'm working with Scala.
>> It would be great if you could talk about multi stdin case as well!
>> Thanks.
>>
>>   From: Tathagata Das 
>> Date: Thursday, June 11, 2015 at 8:11 PM
>> To: Heath Guo 
>> Cc: user 
>> Subject: Re: Spark Streaming reads from stdin or output from command
>> line utility
>>
>>Are you going to receive data from one stdin from one machine, or
>> many stdins on many machines?
>>
>>
>> On Thu, Jun 11, 2015 at 7:25 PM, foobar  wrote:
>>
>>> Hi, I'm new to Spark Streaming, and I want to create a application where
>>> Spark Streaming could create DStream from stdin. Basically I have a
>>> command
>>> line utility that generates stream data, and I'd like to pipe data into
>>> DStream. What's the best way to do that? I thought rdd.pipe() could help,
>>> but it seems that requires an rdd in the first place, which does not
>>> apply.
>>> Thanks!
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reads-from-stdin-or-output-from-command-line-utility-tp23289.html
>>> 
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Reading Really Big File Stream from HDFS

2015-06-12 Thread Saisai Shao
Using sc.textFile will also read the file from HDFS one by one line through
iterator, don't need to fit all into memory, even you have small size of
memory, it still can be worked.

2015-06-12 13:19 GMT+08:00 SLiZn Liu :

> Hmm, you have a good point. So should I load the file by `sc.textFile()`
> and specify a high number of partitions, and the file is then split into
> partitions in memory across the cluster?
>
> On Thu, Jun 11, 2015 at 9:27 PM ayan guha  wrote:
>
>> Why do you need to use stream in this use case? 50g need not to be in
>> memory. Give it a try with high number of partitions.
>> On 11 Jun 2015 23:09, "SLiZn Liu"  wrote:
>>
>>> Hi Spark Users,
>>>
>>> I'm trying to load a literally big file (50GB when compressed as gzip
>>> file, stored in HDFS) by receiving a DStream using `ssc.textFileStream`, as
>>> this file cannot be fitted in my memory. However, it looks like no RDD will
>>> be received until I copy this big file to a prior-specified location on
>>> HDFS. Ideally, I'd like read this file by a small number of lines at a
>>> time, but receiving a file stream requires additional writing to HDFS. Any
>>> idea to achieve this?
>>>
>>> BEST REGARDS,
>>> Todd Leo
>>>
>>


Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.

2015-06-12 Thread Cheng Lian

Hi Chaocai,

Glad that 1.4 fixes your case. However, I'm a bit confused by your last 
comment saying "The OOM or lose heartbeat was occurred on slave node". 
Because from the log files you attached at first, those OOM actually 
happens on driver side (Thrift server log only contains log lines from 
driver side). Did you see OOM from executor stderr output? I ask this 
because there are still a large portion of users are using 1.3, and we 
may want deliver a fix if there does exist bugs that causes unexpected OOM.


Cheng

On 6/12/15 3:14 PM, 姜超才 wrote:

Hi Lian,

Today I update my spark to v1.4. This issue resolved.

Thanks,
SuperJ

- 原始邮件信息 -
*发件人:* "姜超才" 
*收件人:* "Cheng Lian" , "Hester wang" 
, 
*主题:* 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 
1,000,000 rows.

*日期:* 2015/06/11 08:56:28 (Thu)

No problem on Local mode. I can get all rows.

Select * from foo;

The OOM or lose heartbeat was occured on slave node.

Thanks,

SuperJ


- 原始邮件信息 -
*发件人:* "Cheng Lian"
*收件人:* "姜超才" , "Hester wang" ,
*主题:* Re: 回复: Re: 回复: Re: Met OOM when fetching more than 
1,000,000 rows.

*日期:* 2015/06/10 19:58:59 (Wed)

Hm, I tried the following with 0.13.1 and 0.13.0 on my laptop (don't 
have access to a cluster for now) but couldn't reproduce this issue. 
Your program just executed smoothly... :-/


Command line used to start the Thrift server:

./sbin/start-thriftserver.sh --driver-memory 4g --master local

SQL statements used to create the table with your data:

create table foo(k string, v double);
load data local inpath '/tmp/bar' into table foo;

Tried this via Beeline:

select * from foo limit 160;

Also tried the Java program you provided.

Could you also try to verify whether this single node local mode works 
for you?  Will investigate this with a cluster when I get chance.


Cheng

On 6/10/15 5:19 PM, 姜超才 wrote:
When set "spark.sql.thriftServer.incrementalCollect" and set driver 
memory to 7G, Things seems stable and simple:
It can quickly run through the query line, but when traversal the 
result set ( while rs.hasNext ), it can quickly get the OOM: java 
heap space. See attachment.


/usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh --master 
spark://cx-spark-001:7077 --conf spark.executor.memory=4g --conf 
spark.driver.memory=7g --conf spark.shuffle.consolidateFiles=true 
--conf spark.shuffle.manager=sort --conf 
"spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit" --conf 
spark.file.transferTo=false --conf spark.akka.timeout=2000 --conf 
spark.storage.memoryFraction=0.4 --conf spark.cores.max=8 --conf 
spark.kryoserializer.buffer.mb=256 --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf 
spark.akka.frameSize=512 --driver-class-path 
/usr/local/hive/lib/classes12.jar --conf 
spark.sql.thriftServer.incrementalCollect=true


Thanks,

SuperJ


- 原始邮件信息 -
*发件人:* "Cheng Lian"
*收件人:* "姜超才" , "Hester wang" ,
*主题:* Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
*日期:* 2015/06/10 16:37:34 (Wed)

Also, if the data isn't confidential, would you mind to send me a 
compressed copy (don't cc user@spark.apache.org)?


Cheng

On 6/10/15 4:23 PM, 姜超才 wrote:

Hi Lian,

Thanks for your quick response.

I forgot mention that I have tuned driver memory from 2G to 4G, 
seems got minor improvement, The dead way when fetching 1,400,000 
rows changed from "OOM::GC overhead limit exceeded" to " lost worker 
heartbeat after 120s".


I will try  to set "spark.sql.thriftServer.incrementalCollect" and 
continue increase driver memory to 7G, and will send the result to you.


Thanks,

SuperJ


- 原始邮件信息 -
*发件人:* "Cheng Lian"
*收件人:* "Hester wang" ,
*主题:* Re: Met OOM when fetching more than 1,000,000 rows.
*日期:* 2015/06/10 16:15:47 (Wed)

Hi Xiaohan,

Would you please try to set 
"spark.sql.thriftServer.incrementalCollect" to "true" and increasing 
driver memory size? In this way, HiveThriftServer2 uses 
RDD.toLocalIterator rather than RDD.collect().iterator to return the 
result set. The key difference is that RDD.toLocalIterator retrieves 
a single partition at a time, thus avoid holding the whole result 
set on driver side. The memory issue happens on driver side rather 
than executor side, so tuning executor memory size doesn't help.


Cheng

On 6/10/15 3:46 PM, Hester wang wrote:

Hi Lian,


I met a SparkSQL problem. I really appreciate it if you could give 
me some help! Below is the detailed description of the problem, for 
more information, attached are the original code and the log that 
you may need.


Problem:
I want to query my table which stored in Hive through the SparkSQL 
JDBC interface.

And want to fetch more than 1,000,000 rows. But met OOM.
sql = "select * from TEMP_ADMIN_150601_01 limit XXX ";

My Env:
5 Nodes = One master + 4 workers,  1000M Network Switch ,  Redhat 6.5
Each node: 8G RAM, 500G Harddisk
Java 1.6, Scala 2.10.4, Hadoop 2.6, Spark 1.3.0, Hive 0.13

Data:
A table wit

Optimizing Streaming from Websphere MQ

2015-06-12 Thread Chaudhary, Umesh
Hi,
I have created a Custom Receiver in Java which receives data from Websphere MQ 
and I am only writing the received records on HDFS.

I have referred many forums for optimizing speed of spark streaming 
application. Here I am listing a few:


* Spark 
Official

* VIrdata

*  TD's Slide (A bit Old but 
Useful)

I got mainly two point for my applicability :


* giving batch interval as 1 sec

* Controlling "spark.streaming.blockInterval" =200ms

* inputStream.repartition(3)

But that did not improve my actual speed (records/sec) of receiver which is MAX 
5-10 records /sec. This is way less from my expectation.
Am I missing something?

Regards,
Umesh Chaudhary

This message, including any attachments, is the property of Sears Holdings 
Corporation and/or one of its subsidiaries. It is confidential and may contain 
proprietary or legally privileged information. If you are not the intended 
recipient, please delete it without reading the contents. Thank you.


Re: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.

2015-06-12 Thread Cheng Lian
Thanks for the extra details and explanations Chaocai, will try to 
reproduce this when I got chance.


Cheng

On 6/12/15 3:44 PM, 姜超才 wrote:
I said "OOM occurred on slave node", because I monitored memory 
utilization during the query task, on driver, very few memory was 
ocupied. And i remember i have ever seen the OOM stderr log on slave 
node.


But recently there seems no OOM log on slave node.
Follow the cmd 、data 、env and the code I gave you, the OOM can 100% 
repro on cluster mode.


Thanks,

SuperJ


- 原始邮件信息 -
*发件人:* "Cheng Lian" 
*收件人:* "姜超才" , "Hester wang" 
, 
*主题:* Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more 
than 1,000,000 rows.

*日期:* 2015/06/12 15:30:08 (Fri)

Hi Chaocai,

Glad that 1.4 fixes your case. However, I'm a bit confused by your 
last comment saying "The OOM or lose heartbeat was occurred on slave 
node". Because from the log files you attached at first, those OOM 
actually happens on driver side (Thrift server log only contains log 
lines from driver side). Did you see OOM from executor stderr output? 
I ask this because there are still a large portion of users are using 
1.3, and we may want deliver a fix if there does exist bugs that 
causes unexpected OOM.


Cheng

On 6/12/15 3:14 PM, 姜超才 wrote:

Hi Lian,

Today I update my spark to v1.4. This issue resolved.

Thanks,
SuperJ

- 原始邮件信息 -
*发件人:* "姜超才"
*收件人:* "Cheng Lian" , "Hester wang" ,
*主题:* 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 
1,000,000 rows.

*日期:* 2015/06/11 08:56:28 (Thu)

No problem on Local mode. I can get all rows.

Select * from foo;

The OOM or lose heartbeat was occured on slave node.

Thanks,

SuperJ


- 原始邮件信息 -
*发件人:* "Cheng Lian"
*收件人:* "姜超才" , "Hester wang" ,
*主题:* Re: 回复: Re: 回复: Re: Met OOM when fetching more than 
1,000,000 rows.

*日期:* 2015/06/10 19:58:59 (Wed)

Hm, I tried the following with 0.13.1 and 0.13.0 on my laptop (don't 
have access to a cluster for now) but couldn't reproduce this issue. 
Your program just executed smoothly... :-/


Command line used to start the Thrift server:

./sbin/start-thriftserver.sh --driver-memory 4g --master local

SQL statements used to create the table with your data:

create table foo(k string, v double);
load data local inpath '/tmp/bar' into table foo;

Tried this via Beeline:

select * from foo limit 160;

Also tried the Java program you provided.

Could you also try to verify whether this single node local mode 
works for you?  Will investigate this with a cluster when I get chance.


Cheng

On 6/10/15 5:19 PM, 姜超才 wrote:
When set "spark.sql.thriftServer.incrementalCollect" and set driver 
memory to 7G, Things seems stable and simple:
It can quickly run through the query line, but when traversal the 
result set ( while rs.hasNext ), it can quickly get the OOM: java 
heap space. See attachment.


/usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh --master 
spark://cx-spark-001:7077 --conf spark.executor.memory=4g --conf 
spark.driver.memory=7g --conf spark.shuffle.consolidateFiles=true 
--conf spark.shuffle.manager=sort --conf 
"spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit" --conf 
spark.file.transferTo=false --conf spark.akka.timeout=2000 --conf 
spark.storage.memoryFraction=0.4 --conf spark.cores.max=8 --conf 
spark.kryoserializer.buffer.mb=256 --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf 
spark.akka.frameSize=512 --driver-class-path 
/usr/local/hive/lib/classes12.jar --conf 
spark.sql.thriftServer.incrementalCollect=true


Thanks,

SuperJ


- 原始邮件信息 -
*发件人:* "Cheng Lian"
*收件人:* "姜超才" , "Hester wang" ,
*主题:* Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
*日期:* 2015/06/10 16:37:34 (Wed)

Also, if the data isn't confidential, would you mind to send me a 
compressed copy (don't cc user@spark.apache.org)?


Cheng

On 6/10/15 4:23 PM, 姜超才 wrote:

Hi Lian,

Thanks for your quick response.

I forgot mention that I have tuned driver memory from 2G to 4G, 
seems got minor improvement, The dead way when fetching 1,400,000 
rows changed from "OOM::GC overhead limit exceeded" to " lost 
worker heartbeat after 120s".


I will try  to set "spark.sql.thriftServer.incrementalCollect" and 
continue increase driver memory to 7G, and will send the result to 
you.


Thanks,

SuperJ


- 原始邮件信息 -
*发件人:* "Cheng Lian"
*收件人:* "Hester wang" ,
*主题:* Re: Met OOM when fetching more than 1,000,000 rows.
*日期:* 2015/06/10 16:15:47 (Wed)

Hi Xiaohan,

Would you please try to set 
"spark.sql.thriftServer.incrementalCollect" to "true" and 
increasing driver memory size? In this way, HiveThriftServer2 uses 
RDD.toLocalIterator rather than RDD.collect().iterator to return 
the result set. The key difference is that RDD.toLocalIterator 
retrieves a single partition at a time, thus avoid holding the 
whole result set on driver side. The memory issue happens on driver 
side rather than executor side, so tuning exe

How to use Window Operations with kafka Direct-API?

2015-06-12 Thread ZIGEN
Hi, I'm using Spark Streaming(1.3.1).
I want to get exactly-once messaging from Kafka and use Window operations of
DStraem,

When Window operations(eg DStream#reduceByKeyAndWindow) with kafka
Direct-API
java.lang.ClassCastException occurs as follows.

--- stacktrace --
java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot
be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
at
org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:146)
at
org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:1)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)

 
--- my source ---

JavaStreamingContext jssc = new JavaStreamingContext(_ctx, batchInterval);
jssc.checkpoint("checkpoint");

JavaPairInputDStream messages =
KafkaUtils.createDirectStream
 (jssc, String.class, String.class, StringDecoder.class,
StringDecoder.class, kafkaParams, topicsSet);

JavaPairDStream> pairDS = messages.mapToPair(...);

JavaPairDStream> windowDs =
pairDS.reduceByKeyAndWindow(new Function2, List,
List>() {
@Override
public List call(List list1, List list2) throws
Exception {
...
}
}, windowDuration, slideDuration);

windowDs.foreachRDD(new Function>,
Void>() {

@Override
public Void call(JavaPairRDD> rdd) throws 
Exception
{


OffsetRange[] offsetsList = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges(); 
// ClassCastException occurred 

KafkaCluster kc = new KafkaCluster(toScalaMap(kafkaParams));
for (OffsetRange offsets : offsetsList) {

TopicAndPartition topicAndPartition = new
TopicAndPartition(offsets.topic(), offsets.partition());

HashMap map = new 
HashMap();
map.put(topicAndPartition, offsets.untilOffset());
kc.setConsumerOffsets("group1", toScalaMap(map));
}

return null;
}
});

Thanks!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Window-Operations-with-kafka-Direct-API-tp23293.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to use Window Operations with kafka Direct-API?

2015-06-12 Thread Saisai Shao
I think you could not use offsetRange in such way, when you transform a
DirectKafkaInputDStream into WindowedDStream, internally the KafkaRDD is
changed into normal RDD, but offsetRange is a specific attribute for
KafkaRDD, so when you convert a normal RDD to HasOffsetRanges, you will
meet such exception.

you could only do something like:

directKafkaInputDStream.foreachRDD { rdd =>
   rdd.asInstanceOf[HasOffsetRanges]
  ...
}

Apply foreachRDD directly on DirectKafkaInputDStream.







2015-06-12 16:10 GMT+08:00 ZIGEN :

> Hi, I'm using Spark Streaming(1.3.1).
> I want to get exactly-once messaging from Kafka and use Window operations
> of
> DStraem,
>
> When Window operations(eg DStream#reduceByKeyAndWindow) with kafka
> Direct-API
> java.lang.ClassCastException occurs as follows.
>
> --- stacktrace --
> java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot
> be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
> at
>
> org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:146)
> at
>
> org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:1)
> at
>
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
> at
>
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
> at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
> at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> at scala.util.Try$.apply(Try.scala:161)
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
> at
>
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
> at
>
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
> at
>
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
>
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)
>
>
> --- my source ---
>
> JavaStreamingContext jssc = new JavaStreamingContext(_ctx, batchInterval);
> jssc.checkpoint("checkpoint");
>
> JavaPairInputDStream messages =
> KafkaUtils.createDirectStream
>  (jssc, String.class, String.class, StringDecoder.class,
> StringDecoder.class, kafkaParams, topicsSet);
>
> JavaPairDStream> pairDS = messages.mapToPair(...);
>
> JavaPairDStream> windowDs =
> pairDS.reduceByKeyAndWindow(new Function2, List,
> List>() {
> @Override
> public List call(List list1, List list2)
> throws
> Exception {
> ...
> }
> }, windowDuration, slideDuration);
>
> windowDs.foreachRDD(new Function>,
> Void>() {
>
> @Override
> public Void call(JavaPairRDD> rdd) throws
> Exception
> {
>
>
> OffsetRange[] offsetsList = ((HasOffsetRanges)
> rdd.rdd()).offsetRanges();
> // ClassCastException occurred
>
> KafkaCluster kc = new
> KafkaCluster(toScalaMap(kafkaParams));
> for (OffsetRange offsets : offsetsList) {
>
> TopicAndPartition topicAndPartition = new
> TopicAndPartition(offsets.topic(), offsets.partition());
>
> HashMap map = new
> HashMap Object>();
> map.put(topicAndPartition, offsets.untilOffset());
> kc.setConsumerOffsets("group1", toScalaMap(map));
> }
>
> return null;
> }
> });
>
> Thanks!
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Window-Operations-with-kafka-Direct-API-tp23293.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RE: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.

2015-06-12 Thread Cheng, Hao
Not sure if Spark Core will provide API to fetch the record one by one from the 
block manager, instead of the pulling them all into the driver memory.

From: Cheng Lian [mailto:l...@databricks.com]
Sent: Friday, June 12, 2015 3:51 PM
To: 姜超才; Hester wang; user@spark.apache.org
Subject: Re: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 
1,000,000 rows.

Thanks for the extra details and explanations Chaocai, will try to reproduce 
this when I got chance.

Cheng
On 6/12/15 3:44 PM, 姜超才 wrote:
I said "OOM occurred on slave node", because I monitored memory utilization 
during the query task, on driver, very few memory was ocupied. And i remember i 
have ever seen the OOM stderr log on slave node.

But recently there seems no OOM log on slave node.
Follow the cmd 、data 、env and the code I gave you, the OOM can 100% repro on 
cluster mode.

Thanks,

SuperJ


- 原始邮件信息 -
发件人: "Cheng Lian" 
收件人: "姜超才" , 
"Hester wang" , 

主题: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/12 15:30:08 (Fri)

Hi Chaocai,

Glad that 1.4 fixes your case. However, I'm a bit confused by your last comment 
saying "The OOM or lose heartbeat was occurred on slave node". Because from the 
log files you attached at first, those OOM actually happens on driver side 
(Thrift server log only contains log lines from driver side). Did you see OOM 
from executor stderr output? I ask this because there are still a large portion 
of users are using 1.3, and we may want deliver a fix if there does exist bugs 
that causes unexpected OOM.

Cheng
On 6/12/15 3:14 PM, 姜超才 wrote:
Hi Lian,

Today I update my spark to v1.4. This issue resolved.

Thanks,
SuperJ

- 原始邮件信息 -
发件人: "姜超才"
收件人: "Cheng Lian" , "Hester wang" ,
主题: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/11 08:56:28 (Thu)

No problem on Local mode. I can get all rows.

Select * from foo;

The OOM or lose heartbeat was occured on slave node.

Thanks,

SuperJ


- 原始邮件信息 -
发件人: "Cheng Lian"
收件人: "姜超才" , "Hester wang" ,
主题: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/10 19:58:59 (Wed)

Hm, I tried the following with 0.13.1 and 0.13.0 on my laptop (don't have 
access to a cluster for now) but couldn't reproduce this issue. Your program 
just executed smoothly... :-/

Command line used to start the Thrift server:

./sbin/start-thriftserver.sh --driver-memory 4g --master local

SQL statements used to create the table with your data:

create table foo(k string, v double);
load data local inpath '/tmp/bar' into table foo;

Tried this via Beeline:

select * from foo limit 160;

Also tried the Java program you provided.

Could you also try to verify whether this single node local mode works for you? 
 Will investigate this with a cluster when I get chance.

Cheng
On 6/10/15 5:19 PM, 姜超才 wrote:
When set "spark.sql.thriftServer.incrementalCollect" and set driver memory to 
7G, Things seems stable and simple:
It can quickly run through the query line, but when traversal the result set ( 
while rs.hasNext ), it can quickly get the OOM: java heap space. See attachment.

/usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh --master 
spark://cx-spark-001:7077 --conf spark.executor.memory=4g --conf 
spark.driver.memory=7g --conf spark.shuffle.consolidateFiles=true --conf 
spark.shuffle.manager=sort --conf 
"spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit" --conf 
spark.file.transferTo=false --conf spark.akka.timeout=2000 --conf 
spark.storage.memoryFraction=0.4 --conf spark.cores.max=8 --conf 
spark.kryoserializer.buffer.mb=256 --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf 
spark.akka.frameSize=512 --driver-class-path /usr/local/hive/lib/classes12.jar 
--conf spark.sql.thriftServer.incrementalCollect=true

Thanks,

SuperJ


- 原始邮件信息 -
发件人: "Cheng Lian"
收件人: "姜超才" , "Hester wang" ,
主题: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/10 16:37:34 (Wed)

Also, if the data isn't confidential, would you mind to send me a compressed 
copy (don't cc user@spark.apache.org)?

Cheng
On 6/10/15 4:23 PM, 姜超才 wrote:
Hi Lian,

Thanks for your quick response.






I forgot mention that I have tuned driver memory from 2G to 4G, 
seems got minor improvement, The dead way when fetching 1,400,000 rows changed 
from "OOM::GC overhead limit exceeded" to " lost worker heartbeat after 120s".


  I will try  to set 
"spark.sql.thriftServer.incrementalCollect" and continue increase driver memory 
to 7G, and will send the result to you.

Thanks,

SuperJ


- 原始邮件信息 -
发件人: "Cheng Lian"
收件人: "Hester wang" ,
主题: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/

RE: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.

2015-06-12 Thread Cheng, Hao
Not sure if Spark RDD will provide API to fetch the record one by one from the 
final result set, instead of the pulling them all / (or whole partition data) 
and fit in the driver memory.

Seems a big change.

From: Cheng Lian [mailto:l...@databricks.com]
Sent: Friday, June 12, 2015 3:51 PM
To: 姜超才; Hester wang; user@spark.apache.org
Subject: Re: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 
1,000,000 rows.

Thanks for the extra details and explanations Chaocai, will try to reproduce 
this when I got chance.

Cheng
On 6/12/15 3:44 PM, 姜超才 wrote:
I said "OOM occurred on slave node", because I monitored memory utilization 
during the query task, on driver, very few memory was ocupied. And i remember i 
have ever seen the OOM stderr log on slave node.

But recently there seems no OOM log on slave node.
Follow the cmd 、data 、env and the code I gave you, the OOM can 100% repro on 
cluster mode.

Thanks,

SuperJ


- 原始邮件信息 -
发件人: "Cheng Lian" 
收件人: "姜超才" , 
"Hester wang" , 

主题: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/12 15:30:08 (Fri)

Hi Chaocai,

Glad that 1.4 fixes your case. However, I'm a bit confused by your last comment 
saying "The OOM or lose heartbeat was occurred on slave node". Because from the 
log files you attached at first, those OOM actually happens on driver side 
(Thrift server log only contains log lines from driver side). Did you see OOM 
from executor stderr output? I ask this because there are still a large portion 
of users are using 1.3, and we may want deliver a fix if there does exist bugs 
that causes unexpected OOM.

Cheng
On 6/12/15 3:14 PM, 姜超才 wrote:
Hi Lian,

Today I update my spark to v1.4. This issue resolved.

Thanks,
SuperJ

- 原始邮件信息 -
发件人: "姜超才"
收件人: "Cheng Lian" , "Hester wang" ,
主题: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/11 08:56:28 (Thu)

No problem on Local mode. I can get all rows.

Select * from foo;

The OOM or lose heartbeat was occured on slave node.

Thanks,

SuperJ


- 原始邮件信息 -
发件人: "Cheng Lian"
收件人: "姜超才" , "Hester wang" ,
主题: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/10 19:58:59 (Wed)

Hm, I tried the following with 0.13.1 and 0.13.0 on my laptop (don't have 
access to a cluster for now) but couldn't reproduce this issue. Your program 
just executed smoothly... :-/

Command line used to start the Thrift server:

./sbin/start-thriftserver.sh --driver-memory 4g --master local

SQL statements used to create the table with your data:

create table foo(k string, v double);
load data local inpath '/tmp/bar' into table foo;

Tried this via Beeline:

select * from foo limit 160;

Also tried the Java program you provided.

Could you also try to verify whether this single node local mode works for you? 
 Will investigate this with a cluster when I get chance.

Cheng
On 6/10/15 5:19 PM, 姜超才 wrote:
When set "spark.sql.thriftServer.incrementalCollect" and set driver memory to 
7G, Things seems stable and simple:
It can quickly run through the query line, but when traversal the result set ( 
while rs.hasNext ), it can quickly get the OOM: java heap space. See attachment.

/usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh --master 
spark://cx-spark-001:7077 --conf spark.executor.memory=4g --conf 
spark.driver.memory=7g --conf spark.shuffle.consolidateFiles=true --conf 
spark.shuffle.manager=sort --conf 
"spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit" --conf 
spark.file.transferTo=false --conf spark.akka.timeout=2000 --conf 
spark.storage.memoryFraction=0.4 --conf spark.cores.max=8 --conf 
spark.kryoserializer.buffer.mb=256 --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf 
spark.akka.frameSize=512 --driver-class-path /usr/local/hive/lib/classes12.jar 
--conf spark.sql.thriftServer.incrementalCollect=true

Thanks,

SuperJ


- 原始邮件信息 -
发件人: "Cheng Lian"
收件人: "姜超才" , "Hester wang" ,
主题: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/10 16:37:34 (Wed)

Also, if the data isn't confidential, would you mind to send me a compressed 
copy (don't cc user@spark.apache.org)?

Cheng
On 6/10/15 4:23 PM, 姜超才 wrote:
Hi Lian,

Thanks for your quick response.






I forgot mention that I have tuned driver memory from 2G to 4G, 
seems got minor improvement, The dead way when fetching 1,400,000 rows changed 
from "OOM::GC overhead limit exceeded" to " lost worker heartbeat after 120s".


  I will try  to set 
"spark.sql.thriftServer.incrementalCollect" and continue increase driver memory 
to 7G, and will send the result to you.

Thanks,

SuperJ


- 原始邮件信息 -
发件人: "Cheng Lian"
收件人: "Hester wang" ,
主题: Re:

Re: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.

2015-06-12 Thread Cheng Lian
My guess the reason why local mode is OK while standalone cluster 
doesn't work is that in cluster mode, task results are serialized and 
sent to driver side. Driver need to deserialize the result, and thus 
occupies much more memory then local mode (where task result 
de/serialization is not performed).


Cheng

On 6/12/15 4:17 PM, Cheng, Hao wrote:


Not sure if Spark Core will provide API to fetch the record one by one 
from the block manager, instead of the pulling them all into the 
driver memory.


*From:*Cheng Lian [mailto:l...@databricks.com]
*Sent:* Friday, June 12, 2015 3:51 PM
*To:* 姜超才; Hester wang; user@spark.apache.org
*Subject:* Re: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when 
fetching more than 1,000,000 rows.


Thanks for the extra details and explanations Chaocai, will try to 
reproduce this when I got chance.


Cheng

On 6/12/15 3:44 PM, 姜超才 wrote:

I said "OOM occurred on slave node", because I monitored memory
utilization during the query task, on driver, very few memory was
ocupied. And i remember i have ever seen the OOM stderr log on
slave node.

But recently there seems no OOM log on slave node.

Follow the cmd 、data 、env and the code I gave you, the OOM can
100% repro on cluster mode.

Thanks,

SuperJ


- 原始邮件信息 -
*发件人**:* "Cheng Lian" 

*收件人**:* "姜超才" 
, "Hester wang"
 ,
 
*主题**:* Re: 回复: Re: 回 复: Re: 回复: Re: Met OOM when fetching
more than 1,000,000 rows.
*日期**:* 2015/06/12 15:30:08 (Fri)

Hi Chaocai,

Glad that 1.4 fixes your case. However, I'm a bit confused by your
last comment saying "The OOM or lose heartbeat was occurred on
slave node". Because from the log files you attached at first,
those OOM actually happens on driver side (Thrift server log only
contains log lines from driver side). Did you see OOM from
executor stderr output? I ask this because there are still a large
portion of users are using 1.3, and we may want deliver a fix if
there does exist bugs that causes unexpected OOM.

Cheng

On 6/12/15 3:14 PM, 姜超才 wrote:

Hi Lian,

Today I update my spark to v1.4. This issue resolved.

Thanks,
SuperJ

- 原始邮件信 息 -
*发件人**:* "姜超才"
*收件人**:* "Cheng Lian" , "Hester wang" ,
*主题**:* 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching
more than 1,000,000 rows.
*日期**:* 2015/06/11 08:56:28 (Thu)

No problem on Local mode. I can get all rows.

Select * from foo;

The OOM or lose heartbeat was occured on slave node.

Thanks,

SuperJ


- 原始邮件信 息 -
*发件人**:* "Cheng Lian"
*收件人**:* "姜超才" , "Hester wang" ,
*主题**:* Re: 回复: Re: 回复: Re: Met OOM when fetching more
than 1,000,000 rows.
*日期**:* 2015/06/10 19:58:59 (Wed)

Hm, I tried the following with 0.13.1 and 0.13.0 on my laptop
(don't have access to a cluster for now) but couldn't
reproduce this issue. Your program just executed smoothly... :-/

Command line used to start the Thrift server:

./sbin/start-thriftserver.sh --driver-memory 4g --master local

SQL statements used to create the table with your data:

create table foo(k string, v double);
load data local inpath '/tmp/bar' into table foo;

Tried this via Beeline:

select * from foo limit 160;

Also tried the Java program you provided.

Could you also try to verify whether this single node local
mode works for you?  Will investigate this with a cluster when
I get chance.

Cheng

On 6/10/15 5:19 PM, 姜超才 wrote:

When set "spark.sql.thriftServer.incrementalCollect" and
set driver memory to 7G, Things seems stable and simple:

It can quickly run through the query line, but when
traversal the result set ( while rs.hasNext ), it can
quickly get the OOM: java heap space. See attachment.

/usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh
--master spark://cx-spark-001:7077 --conf
spark.executor.memory=4g --conf spark.driver.memory=7g
--conf spark.shuffle.consolidateFiles=true --conf
spark.shuffle.manager=sort --conf
"spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit"
--conf spark.file.transferTo=false --conf
spark.akka.timeout=2000 --conf
spark.storage.memoryFraction=0.4 --conf spark.cores.max=8
--conf spark.kryoserializer.buffer.mb=256 --conf
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf
spark.akka.frameSize=512 --driver-class-path
/usr/local/hive/lib/class

If not stop StreamingContext gracefully, will checkpoint data be consistent?

2015-06-12 Thread Haopu Wang
This is a quick question about Checkpoint. The question is: if the
StreamingContext is not stopped gracefully, will the checkpoint be
consistent?
Or I should always gracefully shutdown the application even in order to
use the checkpoint?

Thank you very much!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: BigDecimal problem in parquet file

2015-06-12 Thread Cheng Lian

On 6/10/15 8:53 PM, Bipin Nag wrote:


Hi Cheng,

I am using Spark 1.3.1 binary available for Hadoop 2.6. I am loading 
an existing parquet file, then repartitioning and saving it. Doing 
this gives the error. The code for this doesn't look like causing  
problem. I have a feeling the source - the existing parquet is the 
culprit.


I created that parquet using a jdbcrdd (pulled from microsoft sql 
server). First I saved jdbcrdd as an objectfile on disk. Then loaded 
it again, made a dataframe from it using a schema then saved it as a 
parquet.


Following is the code :
For saving jdbcrdd:
 name - fullqualifiedtablename
 pk - string for primarykey
 pklast - last id to pull
val myRDD = new JdbcRDD( sc, () =>
DriverManager.getConnection(url,username,password) ,
"SELECT * FROM " + name + " WITH (NOLOCK) WHERE ? <= "+pk+" 
and "+pk+" <= ?",

1, lastpk, 1, JdbcRDD.resultSetToObjectArray)
myRDD.saveAsObjectFile("rawdata/"+name);

For applying schema and saving the parquet:
val myschema = schemamap(name)
val myrdd = 
sc.objectFile[Array[Object]]("/home/bipin/rawdata/"+name).map(x => 
org.apache.spark.sql.Row(x:_*))


Have you tried to print out |x| here to check its contents? My guess is 
that |x| actually contains unit values. For example, the follow Spark 
shell code can reproduce a similar exception:


|import  org.apache.spark.sql.types._
import  org.apache.spark.sql.Row

val  schema  =  StructType(StructField("dec",DecimalType(10,0)) ::Nil)
val  rdd  =  sc.parallelize(1  to10).map(_ =>Array(())).map(arr =>Row(arr: _*))
val  df  =  sqlContext.createDataFrame(rdd, schema)

df.saveAsParquetFile("file:///tmp/foo")
|


val actualdata = sqlContext.createDataFrame(myrdd, myschema)
actualdata.saveAsParquetFile("/home/bipin/stageddata/"+name)

Schema structtype can be made manually, though I pull table's metadata 
and make one. It is a simple string translation (see sql docs 
 and/or 
spark datatypes 
)


That is how I created the parquet file. Any help to solve the issue is 
appreciated.

Thanks
Bipin


On 9 June 2015 at 20:44, Cheng Lian > wrote:


Would you please provide a snippet that reproduce this issue? What
version of Spark were you using?

Cheng

On 6/9/15 8:18 PM, bipin wrote:

Hi,
When I try to save my data frame as a parquet file I get the
following
error:

java.lang.ClassCastException: scala.runtime.BoxedUnit cannot
be cast to
org.apache.spark.sql.types.Decimal
at

org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:220)
at

org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192)
at

org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171)
at

org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134)
at

parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
at
parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
at
parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
at
org.apache.spark.sql.parquet.ParquetRelation2.org

$apache$spark$sql$parquet$ParquetRelation2$writeShard$1(newParquet.scala:671)
at

org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
at

org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

How to fix this problem ?



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/BigDecimal-problem-in-parquet-file-tp23221.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org
 

Spark 1.4 release date

2015-06-12 Thread ayan guha
Hi

When is official spark 1.4 release date?
Best
Ayan


Re: Spark 1.4 release date

2015-06-12 Thread Todd Nist
It was released yesterday.

On Friday, June 12, 2015, ayan guha  wrote:

> Hi
>
> When is official spark 1.4 release date?
> Best
> Ayan
>


Re: How to use Window Operations with kafka Direct-API?

2015-06-12 Thread zigen
Hi Shao,

Thank you for your quick prompt.
I was disappointed.
I will try window operations with Receiver-based 
Approach(KafkaUtils.createStream).

Thank you again,
ZIGEN


2015/06/12 17:18、Saisai Shao  のメッセージ:

> I think you could not use offsetRange in such way, when you transform a 
> DirectKafkaInputDStream into WindowedDStream, internally the KafkaRDD is 
> changed into normal RDD, but offsetRange is a specific attribute for 
> KafkaRDD, so when you convert a normal RDD to HasOffsetRanges, you will meet 
> such exception.
> 
> you could only do something like:
> 
> directKafkaInputDStream.foreachRDD { rdd =>
>rdd.asInstanceOf[HasOffsetRanges]
>   ...
> }
> 
> Apply foreachRDD directly on DirectKafkaInputDStream.
> 
> 
> 
> 
> 
> 
> 
> 2015-06-12 16:10 GMT+08:00 ZIGEN :
>> Hi, I'm using Spark Streaming(1.3.1).
>> I want to get exactly-once messaging from Kafka and use Window operations of
>> DStraem,
>> 
>> When Window operations(eg DStream#reduceByKeyAndWindow) with kafka
>> Direct-API
>> java.lang.ClassCastException occurs as follows.
>> 
>> --- stacktrace --
>> java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot
>> be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
>> at
>> org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:146)
>> at
>> org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:1)
>> at
>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
>> at
>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
>> at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
>> at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>> at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>> at scala.util.Try$.apply(Try.scala:161)
>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>> at
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
>> at
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
>> at
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>> at java.lang.Thread.run(Thread.java:722)
>> 
>> 
>> --- my source ---
>> 
>> JavaStreamingContext jssc = new JavaStreamingContext(_ctx, batchInterval);
>> jssc.checkpoint("checkpoint");
>> 
>> JavaPairInputDStream messages =
>> KafkaUtils.createDirectStream
>>  (jssc, String.class, String.class, StringDecoder.class,
>> StringDecoder.class, kafkaParams, topicsSet);
>> 
>> JavaPairDStream> pairDS = messages.mapToPair(...);
>> 
>> JavaPairDStream> windowDs =
>> pairDS.reduceByKeyAndWindow(new Function2, List,
>> List>() {
>> @Override
>> public List call(List list1, List list2) 
>> throws
>> Exception {
>> ...
>> }
>> }, windowDuration, slideDuration);
>> 
>> windowDs.foreachRDD(new Function>,
>> Void>() {
>> 
>> @Override
>> public Void call(JavaPairRDD> rdd) throws 
>> Exception
>> {
>> 
>> 
>> OffsetRange[] offsetsList = ((HasOffsetRanges) 
>> rdd.rdd()).offsetRanges();
>> // ClassCastException occurred
>> 
>> KafkaCluster kc = new KafkaCluster(toScalaMap(kafkaParams));
>> for (OffsetRange offsets : offsetsList) {
>> 
>> TopicAndPartition topicAndPartition = new
>> TopicAndPartition(offsets.topic(), offsets.partition());
>> 
>> HashMap map = new 
>> HashMap> Object>();
>> map.put(topicAndPartition, offsets.untilOffset());
>> kc.setConsumerOffsets("group1", toScalaMap(map));
>> }
>> 
>> return null;
>> }
>> });
>> 
>> Thanks!
>> 
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Window-Operations-with-kafka-Direct-API-

Upgrade to parquet 1.6.0

2015-06-12 Thread Eric Eijkelenboom
Hi 

What is the reason that Spark still comes with Parquet 1.6.0rc3? It seems like 
newer Parquet versions are available (e.g. 1.6.0). This would fix problems with 
‘spark.sql.parquet.filterPushdown’, which currently is disabled by default, 
because of a bug in Parquet 1.6.0rc3.

Thanks! 

Eric

Re: Upgrade to parquet 1.6.0

2015-06-12 Thread Cheng Lian
At the time 1.3.x was released, 1.6.0 hasn't been released yet. And we 
didn't have enough time to upgrade and test Parquet 1.6.0 for Spark 
1.4.0. But we've already upgraded Parquet to 1.7.0 (which is exactly the 
same as 1.6.0 with package name renamed from com.twitter to 
org.apache.parquet) on master branch recently.


Cheng

On 6/12/15 6:16 PM, Eric Eijkelenboom wrote:

Hi

What is the reason that Spark still comes with Parquet 1.6.0rc3? It 
seems like newer Parquet versions are available (e.g. 1.6.0). This 
would fix problems with ‘spark.sql.parquet.filterPushdown’, which 
currently is disabled by default, because of a bug in Parquet 1.6.0rc3.


Thanks!

Eric




Scheduling and node affinity

2015-06-12 Thread Brian Candler
I would like to know if Spark has any facility by which particular tasks 
can be scheduled to run on chosen nodes.


The use case: we have a large custom-format database. It is partitioned 
and the segments are stored on local SSD on multiple nodes. Incoming 
queries are matched against the database; this involves either sending 
each key to the correct node, or sending a batch to all nodes 
simultaneously, where the queries are filtered and processed against one 
segment each, and the results are merged at the end.


Currently we are doing this with htcondor using a DAG to define the 
workflow and requirements expressions to match particular jobs to 
particular databases, but it's coarse-grained and more suited for batch 
processing than real-time, as well as being cumbersome to define and manage.


I wonder whether Spark would suit this workflow, and if so how?

It seems that either we would need to schedule parts of our jobs on the 
appropriate nodes, which I can't see how to do:

http://spark.apache.org/docs/latest/job-scheduling.html

Or possibly we could define our partitioned database as a custom type of 
RDD - however then we would need to define operations which work on two 
RDDs simultaneously (i.e. the static database and the incoming set of 
queries) which doesn't seem to fit Spark well AFAICS.


Any other ideas how we could approach this, either with Spark or 
suggestions for other frameworks to look at? (We would actually prefer 
non-Java frameworks but are happy to look at all options)


Thanks,

Brian Candler.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Upgrade to parquet 1.6.0

2015-06-12 Thread Eric Eijkelenboom
Great, thanks for the extra info! 

> On 12 Jun 2015, at 12:41, Cheng Lian  wrote:
> 
> At the time 1.3.x was released, 1.6.0 hasn't been released yet. And we didn't 
> have enough time to upgrade and test Parquet 1.6.0 for Spark 1.4.0. But we've 
> already upgraded Parquet to 1.7.0 (which is exactly the same as 1.6.0 with 
> package name renamed from com.twitter to org.apache.parquet) on master branch 
> recently.
> 
> Cheng
> 
> On 6/12/15 6:16 PM, Eric Eijkelenboom wrote:
>> Hi 
>> 
>> What is the reason that Spark still comes with Parquet 1.6.0rc3? It seems 
>> like newer Parquet versions are available (e.g. 1.6.0). This would fix 
>> problems with ‘spark.sql.parquet.filterPushdown’, which currently is 
>> disabled by default, because of a bug in Parquet 1.6.0rc3.
>> 
>> Thanks! 
>> 
>> Eric
> 



Re: Spark 1.4 release date

2015-06-12 Thread ayan guha
Thanks a lot.
On 12 Jun 2015 19:46, "Todd Nist"  wrote:

> It was released yesterday.
>
> On Friday, June 12, 2015, ayan guha  wrote:
>
>> Hi
>>
>> When is official spark 1.4 release date?
>> Best
>> Ayan
>>
>


Re: Spark 1.4 release date

2015-06-12 Thread Guru Medasani
Here is a spark 1.4 release blog by data bricks.

https://databricks.com/blog/2015/06/11/announcing-apache-spark-1-4.html 



Guru Medasani
gdm...@gmail.com



> On Jun 12, 2015, at 7:08 AM, ayan guha  wrote:
> 
> Thanks a lot.
> 
> On 12 Jun 2015 19:46, "Todd Nist"  > wrote:
> It was released yesterday.
> 
> On Friday, June 12, 2015, ayan guha  > wrote:
> Hi
> 
> When is official spark 1.4 release date?
> Best
> Ayan
> 



Re: BigDecimal problem in parquet file

2015-06-12 Thread Bipin Nag
Hi Cheng,

Yes, some rows contain unit instead of decimal values. I believe some rows
from original source I had don't have any value i.e. it is null. And that
shows up as unit. How does the spark-sql or parquet handle null in place of
decimal values, assuming that field is nullable. I will have to change it
properly.

Thanks for helping out.
Bipin

On 12 June 2015 at 14:57, Cheng Lian  wrote:

>  On 6/10/15 8:53 PM, Bipin Nag wrote:
>
>   Hi Cheng,
>
>  I am using Spark 1.3.1 binary available for Hadoop 2.6. I am loading an
> existing parquet file, then repartitioning and saving it. Doing this gives
> the error. The code for this doesn't look like causing  problem. I have a
> feeling the source - the existing parquet is the culprit.
>
> I created that parquet using a jdbcrdd (pulled from microsoft sql server).
> First I saved jdbcrdd as an objectfile on disk. Then loaded it again, made
> a dataframe from it using a schema then saved it as a parquet.
>
>  Following is the code :
>  For saving jdbcrdd:
>   name - fullqualifiedtablename
>   pk - string for primarykey
>   pklast - last id to pull
>  val myRDD = new JdbcRDD( sc, () =>
> DriverManager.getConnection(url,username,password) ,
> "SELECT * FROM " + name + " WITH (NOLOCK) WHERE ? <= "+pk+" and
> "+pk+" <= ?",
> 1, lastpk, 1, JdbcRDD.resultSetToObjectArray)
> myRDD.saveAsObjectFile("rawdata/"+name);
>
>  For applying schema and saving the parquet:
> val myschema = schemamap(name)
> val myrdd =
> sc.objectFile[Array[Object]]("/home/bipin/rawdata/"+name).map(x =>
> org.apache.spark.sql.Row(x:_*))
>
>   Have you tried to print out x here to check its contents? My guess is
> that x actually contains unit values. For example, the follow Spark shell
> code can reproduce a similar exception:
>
> import org.apache.spark.sql.types._import org.apache.spark.sql.Row
> val schema = StructType(StructField("dec", DecimalType(10, 0)) :: Nil)val rdd 
> = sc.parallelize(1 to 10).map(_ => Array(())).map(arr => Row(arr: _*))val df 
> = sqlContext.createDataFrame(rdd, schema)
>
> df.saveAsParquetFile("file:///tmp/foo")
>
>val actualdata = sqlContext.createDataFrame(myrdd, myschema)
> actualdata.saveAsParquetFile("/home/bipin/stageddata/"+name)
>
>  Schema structtype can be made manually, though I pull table's metadata
> and make one. It is a simple string translation (see sql docs
> 
> and/or spark datatypes
> 
> )
>
>  That is how I created the parquet file. Any help to solve the issue is
> appreciated.
>  Thanks
>  Bipin
>
>
> On 9 June 2015 at 20:44, Cheng Lian  wrote:
>
>> Would you please provide a snippet that reproduce this issue? What
>> version of Spark were you using?
>>
>> Cheng
>>
>> On 6/9/15 8:18 PM, bipin wrote:
>>
>>> Hi,
>>> When I try to save my data frame as a parquet file I get the following
>>> error:
>>>
>>> java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to
>>> org.apache.spark.sql.types.Decimal
>>> at
>>>
>>> org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:220)
>>> at
>>>
>>> org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192)
>>> at
>>>
>>> org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171)
>>> at
>>>
>>> org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134)
>>> at
>>>
>>> parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
>>> at
>>> parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
>>> at
>>> parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
>>> at
>>> org.apache.spark.sql.parquet.ParquetRelation2.org
>>> $apache$spark$sql$parquet$ParquetRelation2$writeShard$1(newParquet.scala:671)
>>> at
>>>
>>> org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
>>> at
>>>
>>> org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
>>> at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>> at
>>>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> How to fix this problem ?
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/BigDecimal-problem-in-parquet-file-tp23221.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>

Re: Is it possible to see Spark jobs on MapReduce job history ? (running Spark on YARN cluster)

2015-06-12 Thread Steve Loughran

For that you need SPARK-1537 and the patch to go with it

It is still the spark web UI, it just hands off storage and retrieval of the 
history to the underlying Yarn timeline server, rather than through the 
filesystem. You'll get to see things as they go along too.

If you do want to try it, please have a go, and provide any feedback on the 
JIRA/pull request. I should warn: it needs Hadoop 2.6 (Apache,  HDP 2.2, 
CDH5.4), due to some API changes. While the patch is for 1.4+,  I already have 
a local branch with it applied to spark 1.3.1

> On 12 Jun 2015, at 03:01, Elkhan Dadashov  wrote:
> 
> Hi all,
> 
> I wonder if anyone has used use MapReduce Job History to show Spark jobs.
> 
> I can see my Spark jobs (Spark running on Yarn cluster) on Resource manager 
> (RM).
> 
> I start Spark History server, and then through Spark's web-based user 
> interface I can monitor the cluster (and track cluster and job statistics). 
> Basically Yarn RM gets linked to Spark History server, which enables 
> monitoring.
> 
> But instead of using Spark History Server , is it possible to see Spark jobs 
> on MapReduce job history ? (in addition to seeing them on RM)
> 
> (I know through yarn logs -applicationId  we can get all logs after 
> Spark job has completed, but my concern is to see the logs and completed jobs 
>  through common web ui - MapReduce Job History )
> 
> Thanks in advance.
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Broadcast value

2015-06-12 Thread Yasemin Kaya
Hi,

I am taking Broadcast value from file. I want to use it creating Rating
Object (ALS) .
But I am getting null. Here is my code
 :

At lines 17 & 18 is ok but 19 returns null so 21 returns me error. Why I
don't understand.Do you have any idea ?


Best,
yasemin



-- 
hiç ender hiç


Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-12 Thread Steve Loughran
These are both really good posts: you should try and get them in to the 
documentation.

with anything implementing dynamicness, there are some fun problems

(a) detecting the delays in the workflow. There's some good ideas here
(b) deciding where to address it. That means you need to monitor the entire 
pipeline —which you should be doing in production anyway.
(c) choosing the action. More nodes, more memory & CPU (not that useful for 
Java code, even when YARN adds support for dynamic container resize)
(d) choosing the size of the action. In a shared cluster, extra resources for 
one app comes at the expense of others. If you have pre-emption turned on in 
YARN, the scheduler can take containers off lower priority work, which 
automates a lot of this decision making. That will lose other work though, so 
to justify it you'd better hang on those containers
(e) deciding if/when to hand things back. Scaling things down can be very 
expensive if lots of state has to get rebuilt elsewhere.

I think Apache Helix from LinkedIn has done some good work here -worth looking 
at to see what lessons & code to lift. And as you'd expect, it sits right 
behind Kafka in production. I think it gets away with low delays to scale 
up/down and relying on low rebuild costs. [In the work I've been doing with 
colleagues on dynamic HBase and Accumulo clusters, we've not attempted to do 
any autoscale, because scale down is an expensive decision...we're focusing on 
liveness detection and reaction, then publishing the metrics needed to allow 
people or cross-application tools to make the decision)

On 12 Jun 2015, at 04:38, Dmitry Goldenberg 
mailto:dgoldenberg...@gmail.com>> wrote:

Yes, Tathagata, thank you.

For #1, the 'need detection', one idea we're entertaining is timestamping the 
messages coming into the Kafka topics. The consumers would check the interval 
between the time they get the message and that message origination timestamp. 
As Kafka topics start to fill up more, we would presumably see longer and 
longer wait times (delays) for messages to be getting processed by the 
consumers.  The consumers would then start firing off critical events into an 
Event Analyzer/Aggregator which would decide that more resources are needed, 
then ask the Provisioning Component to allocate N new machines.

We do want to set maxRatePerPartition in order to not overwhelm the consumers 
and run out of memory.  Machine provisioning may take a while, and if left with 
no maxRate guards, our consumers could run out of memory.

"Since there are no receivers, if the cluster gets a new executor, it will 
automatically start getting used to run tasks... no need to do anything 
further."  This is great, actually. We were wondering whether we'd need to 
restart the consumers once the new machines have been added. Tathagata's point 
implies, as I read it, that no further orchestration is needed, the load will 
start getting redistributed automatically. This makes implementation of 
autoscaling a lot simpler, as far as #3.

One issue that's not yet been covered much is the scenario when *fewer* cluster 
resources become required (a system load valley rather than a peak). To detect 
a low volume, we'd need to measure the throughput in messages per second over 
time.  Real low volumes would cause firing off of critical events signaling to 
the Analyzer that machines could be decommissioned.

If machines are being decommissioned, it would seem that the consumers would 
need to get acquiesced (allowed to process any current batch, then shut down), 
then they would restart themselves or be restarted. Thoughts on this?

There is also a hefty #4 here which is the "hysteresis" of this, where the 
system operates adaptively and learns over time, remembering the history of 
cluster expansions and contractions and allowing a certain slack for letting 
things cool down or heat up more gradually; also not contracting or expanding 
too frequently.  PID controllers  and thermostat types of design patterns have 
been mentioned before in this discussion.


If you look at the big cloud apps, they dynamically reallocate VM images based 
on load history, with Netflix being the poster user: Hadoop work in the quiet 
hours, user interaction evenings and weekends. Excluding special events 
(including holidays), there's a lot of regularity over time, which lets you 
predict workload in advance.  It's like your thermostat knowing fridays are 
cold and it should crank up the heating in advance.






On Thu, Jun 11, 2015 at 11:08 PM, Tathagata Das 
mailto:t...@databricks.com>> wrote:
Let me try to add some clarity in the different thought directions that's going 
on in this thread.

1. HOW TO DETECT THE NEED FOR MORE CLUSTER RESOURCES?

If there are not rate limits set up, the most reliable way to detect whether 
the current Spark cluster is being insufficient to handle the data load is to 
use the StreamingListner interface which gives all the information about when 

Re: Spark 1.4 release date

2015-06-12 Thread ayan guha
Thanks guys, my question must look like a stupid one today :) Looking
forward to test out 1.4.0, just downloaded it.

Congrats to the team for this much anticipate release.

On Fri, Jun 12, 2015 at 10:12 PM, Guru Medasani  wrote:

> Here is a spark 1.4 release blog by data bricks.
>
> https://databricks.com/blog/2015/06/11/announcing-apache-spark-1-4.html
>
>
> Guru Medasani
> gdm...@gmail.com
>
>
>
> On Jun 12, 2015, at 7:08 AM, ayan guha  wrote:
>
> Thanks a lot.
> On 12 Jun 2015 19:46, "Todd Nist"  wrote:
>
>> It was released yesterday.
>>
>> On Friday, June 12, 2015, ayan guha  wrote:
>>
>>> Hi
>>>
>>> When is official spark 1.4 release date?
>>> Best
>>> Ayan
>>>
>>
>


-- 
Best Regards,
Ayan Guha


Writing data to hbase using Sparkstreaming

2015-06-12 Thread Vamshi Krishna
Hi I am trying to write data that is produced from kafka commandline
producer for some topic. I am facing problem and unable to proceed. Below
is my code which I am creating a jar and running through spark-submit on
spark-shell. Am I doing wrong inside foreachRDD() ? What is wrong with
 SparkKafkaDemo$2.call(SparkKafkaDemo.java:63)   line in below error
message?



SparkConf sparkConf = new
SparkConf().setAppName("JavaKafkaDemo").setMaster("local").setSparkHome("/Users/kvk/softwares/spark-1.3.1-bin-hadoop2.4");
// Create the context with a 1 second batch size
JavaStreamingContext jsc = new
JavaStreamingContext(sparkConf, new Duration(5000));

int numThreads = 2;
Map topicMap = new HashMap();
   // topicMap.put("viewTopic", numThreads);
topicMap.put("nonview", numThreads);

JavaPairReceiverInputDStream messages =
KafkaUtils.createStream(jsc, "localhost",
"ViewConsumer", topicMap);

JavaDStream lines = messages.map(new
Function, String>() {
@Override
public String call(Tuple2 tuple2) {
return tuple2._2();
}
});

lines.foreachRDD(new Function, Void>() {
 @Override
 public Void call(JavaRDD
stringJavaRDD) throws Exception {

 JavaPairRDD hbasePuts =
stringJavaRDD.mapToPair(
 new PairFunction() {
 @Override
 public
Tuple2 call(String line) throws Exception {

 Put put = new
Put(Bytes.toBytes("Rowkey" + Math.random()));

 put.addColumn(Bytes.toBytes("firstFamily"), Bytes.toBytes("firstColumn"),
Bytes.toBytes(line+"fc"));
 return new
Tuple2(new ImmutableBytesWritable(), put);
 }
 });

 // save to HBase- Spark built-in
API method

 
hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration());
 return null;
 }
 }
);
jsc.start();
jsc.awaitTermination();





I see below error on spark-shell.


./bin/spark-submit --class "SparkKafkaDemo" --master local
/Users/kvk/IntelliJWorkspace/HbaseDemo/HbaseDemo.jar

Exception in thread "main" org.apache.spark.SparkException: Task not
serializable

at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)

at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)

at org.apache.spark.rdd.RDD.map(RDD.scala:286)

at
org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:113)

at
org.apache.spark.api.java.AbstractJavaRDDLike.mapToPair(JavaRDDLike.scala:46)

at SparkKafkaDemo$2.call(SparkKafkaDemo.java:63)

at SparkKafkaDemo$2.call(SparkKafkaDemo.java:60)

at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)

at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)

at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)

at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)

at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)

at scala.util.Try$.apply(Try.scala:161)

at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)

at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)

at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)

at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.IllegalStateException: Job in state DEFINE instead of
RUNNING

at org.apache.hadoop.mapreduce.Job.en

Fwd: Spark/PySpark errors on mysterious missing /tmp file

2015-06-12 Thread John Berryman
(This question is also present on StackOverflow
http://stackoverflow.com/questions/30656083/spark-pyspark-errors-on-mysterious-missing-tmp-file
)

I'm having issues with pyspark and a missing /tmp file. I've narrowed down
the behavior to a short snippet.

>>> a=sc.parallelize([(16646160,1)]) # yes, just a single element
>>> b=stuff # this is read in from a text file above - the contents are
shown below
>>> # b=sc.parallelize(b.collect())
>>> a.join(b).take(10)

This fails, but if I include the commented line (which should be the same
thing), then it succeeds. Here is the error:


---
Py4JJavaError Traceback (most recent call
last)
 in ()
  3 b=stuff.map(lambda x:(16646160,1))
  4 #b=sc.parallelize(b.collect())
> 5 a.join(b).take(10)
  6 b.take(10)

/usr/lib/spark/python/pyspark/rdd.py in take(self, num)
   1109
   1110 p = range(partsScanned, min(partsScanned +
numPartsToTry, totalParts))
->  res = self.context.runJob(self, takeUpToNumLeft, p,
True)
   1112
   1113 items += res

/usr/lib/spark/python/pyspark/context.py in runJob(self, rdd,
partitionFunc, partitions, allowLocal)
816 # SparkContext#runJob.
817 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 818 it = self._jvm.PythonRDD.runJob(self._jsc.sc(),
mappedRDD._jrdd, javaPartitions, allowLocal)
819 return
list(mappedRDD._collect_iterator_through_file(it))
820

/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
__call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer,
self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:

/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in
get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling
{0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(

Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 1 in stage 210.0 failed 1 times, most recent failure: Lost task 1.0 in
stage 210.0 (TID 884, localhost):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File "/usr/lib/spark/python/pyspark/worker.py", line 92, in main
command = pickleSer.loads(command.value)
  File "/usr/lib/spark/python/pyspark/broadcast.py", line 106, in value
self._value = self.load(self._path)
  File "/usr/lib/spark/python/pyspark/broadcast.py", line 87, in load
with open(path, 'rb', 1 << 20) as f:
IOError: [Errno 2] No such file or directory:
'/tmp/spark-4a8c591e-9192-4198-a608-c7daa3a5d494/tmpuzsAVM'

at
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
at
org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:242)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1468)
at
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSet

Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?

2015-06-12 Thread Cody Koeninger
The scala api has 2 ways of calling createDirectStream.  One of them allows
you to pass a message handler that gets full access to the kafka
MessageAndMetadata, including offset.

I don't know why the python api was developed with only one way to call
createDirectStream, but the first thing I'd look at would be adding that
functionality back in.  If someone wants help creating a patch for that,
just let me know.

Dealing with offsets on a per-message basis may not be as efficient as
dealing with them on a batch basis using the HasOffsetRanges interface...
but if efficiency was a primary concern, you probably wouldn't be using
Python anyway.

On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao  wrote:

> Scala KafkaRDD uses a trait to handle this problem, but it is not so easy
> and straightforward in Python, where we need to have a specific API to
> handle this, I'm not sure is there any simple workaround to fix this, maybe
> we should think carefully about it.
>
> 2015-06-12 13:59 GMT+08:00 Amit Ramesh :
>
>>
>> Thanks, Jerry. That's what I suspected based on the code I looked at. Any
>> pointers on what is needed to build in this support would be great. This is
>> critical to the project we are currently working on.
>>
>> Thanks!
>>
>>
>> On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao 
>> wrote:
>>
>>> OK, I get it, I think currently Python based Kafka direct API do not
>>> provide such equivalence like Scala, maybe we should figure out to add this
>>> into Python API also.
>>>
>>> 2015-06-12 13:48 GMT+08:00 Amit Ramesh :
>>>

 Hi Jerry,

 Take a look at this example:
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2

 The offsets are needed because as RDDs get generated within spark the
 offsets move further along. With direct Kafka mode the current offsets are
 no more persisted in Zookeeper but rather within Spark itself. If you want
 to be able to use zookeeper based monitoring tools to keep track of
 progress, then this is needed.

 In my specific case we need to persist Kafka offsets externally so that
 we can continue from where we left off after a code deployment. In other
 words, we need exactly-once processing guarantees across code deployments.
 Spark does not support any state persistence across deployments so this is
 something we need to handle on our own.

 Hope that helps. Let me know if not.

 Thanks!
 Amit


 On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao 
 wrote:

> Hi,
>
> What is your meaning of getting the offsets from the RDD, from my
> understanding, the offsetRange is a parameter you offered to KafkaRDD, why
> do you still want to get the one previous you set into?
>
> Thanks
> Jerry
>
> 2015-06-12 12:36 GMT+08:00 Amit Ramesh :
>
>>
>> Congratulations on the release of 1.4!
>>
>> I have been trying out the direct Kafka support in python but haven't
>> been able to figure out how to get the offsets from the RDD. Looks like 
>> the
>> documentation is yet to be updated to include Python examples (
>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html).
>> I am specifically looking for the equivalent of
>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2.
>> I tried digging through the python code but could not find anything
>> related. Any pointers would be greatly appreciated.
>>
>> Thanks!
>> Amit
>>
>>
>

>>>
>>
>


Re: spark-sql from CLI --->EXCEPTION: java.lang.OutOfMemoryError: Java heap space

2015-06-12 Thread Josh Rosen


Sent from my phone

> On Jun 11, 2015, at 8:43 AM, Sanjay Subramanian 
>  wrote:
> 
> hey guys
> 
> Using Hive and Impala daily intensively.
> Want to transition to spark-sql in CLI mode
> 
> Currently in my sandbox I am using the Spark (standalone mode) in the CDH 
> distribution (starving developer version 5.3.3)
> 3 datanode hadoop cluster
> 32GB RAM per node
> 8 cores per node
> 
> spark 
> 1.2.0+cdh5.3.3+371
> 
> 
> I am testing some stuff on one view and getting memory errors
> Possibly reason is default memory per executor showing on 18080 is 
> 512M
> 
> These options when used to start the spark-sql CLI does not seem to have any 
> effect 
> --total-executor-cores 12 --executor-memory 4G
> 
> 
> 
> /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -e  "select distinct 
> isr,event_dt,age,age_cod,sex,year,quarter from aers.aers_demo_view"
> 
> aers.aers_demo_view (7 million+ records)
> ===
> isr bigint  case id
> event_dtbigint  Event date
> age double  age of patient
> age_cod string  days,months years
> sex string  M or F
> yearint
> quarter int
> 
> 
> VIEW DEFINITION
> 
> CREATE VIEW `aers.aers_demo_view` AS SELECT `isr` AS `isr`, `event_dt` AS 
> `event_dt`, `age` AS `age`, `age_cod` AS `age_cod`, `gndr_cod` AS `sex`, 
> `year` AS `year`, `quarter` AS `quarter` FROM (SELECT
>`aers_demo_v1`.`isr`,
>`aers_demo_v1`.`event_dt`,
>`aers_demo_v1`.`age`,
>`aers_demo_v1`.`age_cod`,
>`aers_demo_v1`.`gndr_cod`,
>`aers_demo_v1`.`year`,
>`aers_demo_v1`.`quarter`
> FROM
>   `aers`.`aers_demo_v1`
> UNION ALL
> SELECT
>`aers_demo_v2`.`isr`,
>`aers_demo_v2`.`event_dt`,
>`aers_demo_v2`.`age`,
>`aers_demo_v2`.`age_cod`,
>`aers_demo_v2`.`gndr_cod`,
>`aers_demo_v2`.`year`,
>`aers_demo_v2`.`quarter`
> FROM
>   `aers`.`aers_demo_v2`
> UNION ALL
> SELECT
>`aers_demo_v3`.`isr`,
>`aers_demo_v3`.`event_dt`,
>`aers_demo_v3`.`age`,
>`aers_demo_v3`.`age_cod`,
>`aers_demo_v3`.`gndr_cod`,
>`aers_demo_v3`.`year`,
>`aers_demo_v3`.`quarter`
> FROM
>   `aers`.`aers_demo_v3`
> UNION ALL
> SELECT
>`aers_demo_v4`.`isr`,
>`aers_demo_v4`.`event_dt`,
>`aers_demo_v4`.`age`,
>`aers_demo_v4`.`age_cod`,
>`aers_demo_v4`.`gndr_cod`,
>`aers_demo_v4`.`year`,
>`aers_demo_v4`.`quarter`
> FROM
>   `aers`.`aers_demo_v4`
> UNION ALL
> SELECT
>`aers_demo_v5`.`primaryid` AS `ISR`,
>`aers_demo_v5`.`event_dt`,
>`aers_demo_v5`.`age`,
>`aers_demo_v5`.`age_cod`,
>`aers_demo_v5`.`gndr_cod`,
>`aers_demo_v5`.`year`,
>`aers_demo_v5`.`quarter`
> FROM
>   `aers`.`aers_demo_v5`
> UNION ALL
> SELECT
>`aers_demo_v6`.`primaryid` AS `ISR`,
>`aers_demo_v6`.`event_dt`,
>`aers_demo_v6`.`age`,
>`aers_demo_v6`.`age_cod`,
>`aers_demo_v6`.`sex` AS `GNDR_COD`,
>`aers_demo_v6`.`year`,
>`aers_demo_v6`.`quarter`
> FROM
>   `aers`.`aers_demo_v6`) `aers_demo_view`
> 
> 
> 
> 
> 
> 
> 
> 15/06/11 08:36:36 WARN DefaultChannelPipeline: An exception was thrown by a 
> user handler while handling an exception event ([id: 0x01b99855, 
> /10.0.0.19:58117 => /10.0.0.19:52016] EXCEPTION: java.lang.OutOfMemoryError: 
> Java heap space)
> java.lang.OutOfMemoryError: Java heap space
> at 
> org.jboss.netty.buffer.HeapChannelBuffer.(HeapChannelBuffer.java:42)
> at 
> org.jboss.netty.buffer.BigEndianHeapChannelBuffer.(BigEndianHeapChannelBuffer.java:34)
> at 
> org.jboss.netty.buffer.ChannelBuffers.buffer(ChannelBuffers.java:134)
> at 
> org.jboss.netty.buffer.HeapChannelBufferFactory.getBuffer(HeapChannelBufferFactory.java:68)
> at 
> org.jboss.netty.buffer.AbstractChannelBufferFactory.getBuffer(AbstractChannelBufferFactory.java:48)
> at 
> org.jboss.netty.handler.codec.frame.FrameDecoder.newCumulationBuffer(FrameDecoder.java:507)
> at 
> org.jboss.netty.handler.codec.frame.FrameDecoder.updateCumulation(FrameDecoder.java:345)
> at 
> org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:312)
> at 
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
> at 
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
> at 
> org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
> at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
> at 
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
> at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
> at 
> org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 15

Re: spark-sql from CLI --->EXCEPTION: java.lang.OutOfMemoryError: Java heap space

2015-06-12 Thread Josh Rosen
It sounds like this might be caused by a memory configuration problem.  In 
addition to looking at the executor memory, I'd also bump up the driver memory, 
since it appears that your shell is running out of memory when collecting a 
large query result.

Sent from my phone

> On Jun 11, 2015, at 8:43 AM, Sanjay Subramanian 
>  wrote:
> 
> hey guys
> 
> Using Hive and Impala daily intensively.
> Want to transition to spark-sql in CLI mode
> 
> Currently in my sandbox I am using the Spark (standalone mode) in the CDH 
> distribution (starving developer version 5.3.3)
> 3 datanode hadoop cluster
> 32GB RAM per node
> 8 cores per node
> 
> spark 
> 1.2.0+cdh5.3.3+371
> 
> 
> I am testing some stuff on one view and getting memory errors
> Possibly reason is default memory per executor showing on 18080 is 
> 512M
> 
> These options when used to start the spark-sql CLI does not seem to have any 
> effect 
> --total-executor-cores 12 --executor-memory 4G
> 
> 
> 
> /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -e  "select distinct 
> isr,event_dt,age,age_cod,sex,year,quarter from aers.aers_demo_view"
> 
> aers.aers_demo_view (7 million+ records)
> ===
> isr bigint  case id
> event_dtbigint  Event date
> age double  age of patient
> age_cod string  days,months years
> sex string  M or F
> yearint
> quarter int
> 
> 
> VIEW DEFINITION
> 
> CREATE VIEW `aers.aers_demo_view` AS SELECT `isr` AS `isr`, `event_dt` AS 
> `event_dt`, `age` AS `age`, `age_cod` AS `age_cod`, `gndr_cod` AS `sex`, 
> `year` AS `year`, `quarter` AS `quarter` FROM (SELECT
>`aers_demo_v1`.`isr`,
>`aers_demo_v1`.`event_dt`,
>`aers_demo_v1`.`age`,
>`aers_demo_v1`.`age_cod`,
>`aers_demo_v1`.`gndr_cod`,
>`aers_demo_v1`.`year`,
>`aers_demo_v1`.`quarter`
> FROM
>   `aers`.`aers_demo_v1`
> UNION ALL
> SELECT
>`aers_demo_v2`.`isr`,
>`aers_demo_v2`.`event_dt`,
>`aers_demo_v2`.`age`,
>`aers_demo_v2`.`age_cod`,
>`aers_demo_v2`.`gndr_cod`,
>`aers_demo_v2`.`year`,
>`aers_demo_v2`.`quarter`
> FROM
>   `aers`.`aers_demo_v2`
> UNION ALL
> SELECT
>`aers_demo_v3`.`isr`,
>`aers_demo_v3`.`event_dt`,
>`aers_demo_v3`.`age`,
>`aers_demo_v3`.`age_cod`,
>`aers_demo_v3`.`gndr_cod`,
>`aers_demo_v3`.`year`,
>`aers_demo_v3`.`quarter`
> FROM
>   `aers`.`aers_demo_v3`
> UNION ALL
> SELECT
>`aers_demo_v4`.`isr`,
>`aers_demo_v4`.`event_dt`,
>`aers_demo_v4`.`age`,
>`aers_demo_v4`.`age_cod`,
>`aers_demo_v4`.`gndr_cod`,
>`aers_demo_v4`.`year`,
>`aers_demo_v4`.`quarter`
> FROM
>   `aers`.`aers_demo_v4`
> UNION ALL
> SELECT
>`aers_demo_v5`.`primaryid` AS `ISR`,
>`aers_demo_v5`.`event_dt`,
>`aers_demo_v5`.`age`,
>`aers_demo_v5`.`age_cod`,
>`aers_demo_v5`.`gndr_cod`,
>`aers_demo_v5`.`year`,
>`aers_demo_v5`.`quarter`
> FROM
>   `aers`.`aers_demo_v5`
> UNION ALL
> SELECT
>`aers_demo_v6`.`primaryid` AS `ISR`,
>`aers_demo_v6`.`event_dt`,
>`aers_demo_v6`.`age`,
>`aers_demo_v6`.`age_cod`,
>`aers_demo_v6`.`sex` AS `GNDR_COD`,
>`aers_demo_v6`.`year`,
>`aers_demo_v6`.`quarter`
> FROM
>   `aers`.`aers_demo_v6`) `aers_demo_view`
> 
> 
> 
> 
> 
> 
> 
> 15/06/11 08:36:36 WARN DefaultChannelPipeline: An exception was thrown by a 
> user handler while handling an exception event ([id: 0x01b99855, 
> /10.0.0.19:58117 => /10.0.0.19:52016] EXCEPTION: java.lang.OutOfMemoryError: 
> Java heap space)
> java.lang.OutOfMemoryError: Java heap space
> at 
> org.jboss.netty.buffer.HeapChannelBuffer.(HeapChannelBuffer.java:42)
> at 
> org.jboss.netty.buffer.BigEndianHeapChannelBuffer.(BigEndianHeapChannelBuffer.java:34)
> at 
> org.jboss.netty.buffer.ChannelBuffers.buffer(ChannelBuffers.java:134)
> at 
> org.jboss.netty.buffer.HeapChannelBufferFactory.getBuffer(HeapChannelBufferFactory.java:68)
> at 
> org.jboss.netty.buffer.AbstractChannelBufferFactory.getBuffer(AbstractChannelBufferFactory.java:48)
> at 
> org.jboss.netty.handler.codec.frame.FrameDecoder.newCumulationBuffer(FrameDecoder.java:507)
> at 
> org.jboss.netty.handler.codec.frame.FrameDecoder.updateCumulation(FrameDecoder.java:345)
> at 
> org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:312)
> at 
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
> at 
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
> at 
> org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
> at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
> at 
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
> at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
> at 
> org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java

Re: How to use Window Operations with kafka Direct-API?

2015-06-12 Thread Cody Koeninger
The other thing to keep in mind about spark window operations against Kafka
is that spark streaming is based on current system clock, not the time
embedded in your messages.

So you're going to get a fundamentally wrong answer from a window operation
after a failure / restart, regardless of whether you're using the
createStream or createDirectStream api.

On Fri, Jun 12, 2015 at 9:14 AM, Cody Koeninger  wrote:

> Casting to HasOffsetRanges would be meaningless anyway if done after an
> operation that changes partitioning.
>
> You can still use the messageHandler argument to createDirectStream to get
> access to offsets on a per-message basis.
>
> Also, it doesn't look like what you're doing is particularly concerned
> about transactional correctness (since you're saving offsets to a kafka api
> backed by zookeeper), so you can try doing a transform as the first step in
> your stream, and casting to HasOffsetRanges there.
>
>
>
> On Fri, Jun 12, 2015 at 5:03 AM, zigen  wrote:
>
>> Hi Shao,
>>
>> Thank you for your quick prompt.
>> I was disappointed.
>> I will try window operations with Receiver-based
>> Approach(KafkaUtils.createStream).
>>
>> Thank you again,
>> ZIGEN
>>
>>
>> 2015/06/12 17:18、Saisai Shao  のメッセージ:
>>
>> I think you could not use offsetRange in such way, when you transform a
>> DirectKafkaInputDStream into WindowedDStream, internally the KafkaRDD is
>> changed into normal RDD, but offsetRange is a specific attribute for
>> KafkaRDD, so when you convert a normal RDD to HasOffsetRanges, you will
>> meet such exception.
>>
>> you could only do something like:
>>
>> directKafkaInputDStream.foreachRDD { rdd =>
>>rdd.asInstanceOf[HasOffsetRanges]
>>   ...
>> }
>>
>> Apply foreachRDD directly on DirectKafkaInputDStream.
>>
>>
>>
>>
>>
>>
>>
>> 2015-06-12 16:10 GMT+08:00 ZIGEN :
>>
>>> Hi, I'm using Spark Streaming(1.3.1).
>>> I want to get exactly-once messaging from Kafka and use Window
>>> operations of
>>> DStraem,
>>>
>>> When Window operations(eg DStream#reduceByKeyAndWindow) with kafka
>>> Direct-API
>>> java.lang.ClassCastException occurs as follows.
>>>
>>> --- stacktrace --
>>> java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD
>>> cannot
>>> be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
>>> at
>>>
>>> org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:146)
>>> at
>>>
>>> org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:1)
>>> at
>>>
>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
>>> at
>>>
>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
>>> at
>>>
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
>>> at
>>>
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
>>> at
>>>
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
>>> at
>>>
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>> at
>>>
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>> at scala.util.Try$.apply(Try.scala:161)
>>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>> at
>>>
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
>>> at
>>>
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
>>> at
>>>
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>> at
>>>
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
>>> at
>>>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>> at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>> at java.lang.Thread.run(Thread.java:722)
>>>
>>>
>>> --- my source ---
>>>
>>> JavaStreamingContext jssc = new JavaStreamingContext(_ctx,
>>> batchInterval);
>>> jssc.checkpoint("checkpoint");
>>>
>>> JavaPairInputDStream messages =
>>> KafkaUtils.createDirectStream
>>>  (jssc, String.class, String.class, StringDecoder.class,
>>> StringDecoder.class, kafkaParams, topicsSet);
>>>
>>> JavaPairDStream> pairDS =
>>> messages.mapToPair(...);
>>>
>>> JavaPairDStream> windowDs =
>>> pairDS.reduceByKeyAndWindow(new Function2, List,
>>> List>() {
>>> @Override
>>> public List call(List li

Re: Optimisation advice for Avro->Parquet merge job

2015-06-12 Thread James Aley
Hey Kiran,

Thanks very much for the response. I left for vacation before I could try
this out, but I'll experiment once I get back and let you know how it goes.

Thanks!

James.

On 8 June 2015 at 12:34, kiran lonikar  wrote:

> It turns out my assumption on load and unionAll being blocking is not
> correct. They are transformations. So instead of just running only the load
> and unionAll in the run() methods, I think you will have to save the
> intermediate dfInput[i] to temp (parquet) files (possibly to in memory DFS
> like http://tachyon-project.org/) in the run() methods. The second for
> loop will also have to load from the intermediate parquet files. Then
> finally save the final dfInput[0] to the HDFS.
>
> I think this way of parallelizing will force the cluster to utilize the
> all the resources.
>
> -Kiran
>
> On Mon, Jun 8, 2015 at 12:30 PM, kiran lonikar  wrote:
>
>> James,
>>
>> As I can see, there are three distinct parts to your program:
>>
>>- for loop
>>- synchronized block
>>- final outputFrame.save statement
>>
>> Can you do a separate timing measurement by putting a simple
>> System.currentTimeMillis() around these blocks to know how much they are
>> taking and then try to optimize where it takes longest? In the second
>> block, you may want to measure the time for the two statements. Improving
>> this boils down to playing with spark settings.
>>
>> Now consider the first block: I think this is a classic case of merge
>> sort or a reduce tree. You already tried to improve this by submitting jobs
>> in parallel using executor pool/Callable etc.
>>
>> To further improve the parallelization, I suggest you use a reduce tree
>> like approach. For example, lets say you want to compute sum of all
>> elements of an array in parallel. The way its solved for a GPU like
>> platform is you divide your input array initially in chunks of 2, compute
>> those n/2 sums parallely on separate threads and save the result in the
>> first of the two elements. In the next iteration, you compute n/4 sums
>> parallely of the earlier sums and so on till you are left with only two
>> elements whose sum gives you final sum.
>>
>> You are performing many sequential unionAll operations for inputs.size()
>> avro files. Assuming the unionAll() on DataFrame is blocking (and not a
>> simple transformation like on RDDs) and actually performs the union
>> operation, you will certainly benefit by parallelizing this loop. You may
>> change the loop to something like below:
>>
>> // pseudo code only
>> int n = inputs.size()
>> // initialize executor
>> executor = new FixedThreadPoolExecutor(n/2)
>> dfInput = new DataFrame[n/2]
>> for(int i =0;i < n/2;i++) {
>> executor.submit(new Runnable() {
>> public void run() {
>> // union of i and i+n/2
>> // showing [] only to bring out array access. Replace with
>> dfInput(i) in your code
>> dfInput[i] = sqlContext.load(inputPaths.get(i),
>> "com.databricks.spark.avro").unionAll(sqlContext.load(inputsPath.get(i +
>> n/2), "com.databricks.spark.avro"))
>> }
>> });
>> }
>>
>> executor.awaitTermination(0, TimeUnit.SECONDS)
>>
>> int steps = log(n)/log(2.0)
>> for(s = 2; s < steps;s++) {
>> int stride = n/(1 << s); // n/(2^s)
>> for(int i = 0;i < stride;i++) {
>> executor.submit(new Runnable() {
>> public void run() {
>> // union of i and i+n/2
>> // showing [] only to bring out array access. Replace
>> with dfInput(i) and dfInput(i+stride) in your code
>> dfInput[i] = dfInput[i].unionAll(dfInput[i + stride])
>> }
>> });
>> }
>> executor.awaitTermination(0, TimeUnit.SECONDS)
>> }
>>
>> Let me know if it helped.
>>
>> -Kiran
>>
>>
>> On Thu, Jun 4, 2015 at 8:57 PM, James Aley 
>> wrote:
>>
>>> Thanks for the confirmation! We're quite new to Spark, so a little
>>> reassurance is a good thing to have sometimes :-)
>>>
>>> The thing that's concerning me at the moment is that my job doesn't seem
>>> to run any faster with more compute resources added to the cluster, and
>>> this is proving a little tricky to debug. There are a lot of variables, so
>>> here's what we've tried already and the apparent impact. If anyone has any
>>> further suggestions, we'd love to hear!
>>>
>>> * Increase the "minimum" number of output files (targetPartitions
>>> above), so that input groups smaller than our minimum chunk size can still
>>> be worked on by more than one executor. This does measurably speed things
>>> up, but obviously it's a trade-off, as the original goal for this job is to
>>> merge our data into fewer, larger files.
>>>
>>> * Submit many jobs in parallel, by running the above code in a Callable,
>>> on an executor pool. This seems to help, to some extent, but I'm not sure
>>> what else needs to be configured alongside it -- driver threads, scheduling
>>> policy, etc. We set scheduling to "FAIR" when doing this, as that

Re: How to pass arguments dynamically, that needs to be used in executors

2015-06-12 Thread gaurav sharma
Thanks Todd, that solved my problem.

Regards,
Gaurav
(please excuse spelling mistakes)
Sent from phone
On Jun 11, 2015 6:42 PM, "Todd Nist"  wrote:

> Hi Gaurav,
>
> Seems like you could use a broadcast variable for this if I understand
> your use case.  Create it in the driver based on the CommandLineArguments
> and then use it in the workers.
>
>
> https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
>
> So something like:
>
> Broadcast cmdLineArg = sc.broadcast(Inetger.parseInd(args[12]));
>
> Then just reference the broadcast variable in you workers.  It will get
> shipped once to all nodes in the cluster and can be referenced by them.
>
> HTH.
>
> -Todd
>
> On Thu, Jun 11, 2015 at 8:23 AM, gaurav sharma 
> wrote:
>
>> Hi,
>>
>> I am using Kafka Spark cluster for real time aggregation analytics use
>> case in production.
>>
>> Cluster details
>> 6 nodes, each node running 1 Spark and kafka processes each.
>> Node1  -> 1 Master , 1 Worker, 1 Driver,
>>  1 Kafka process
>> Node 2,3,4,5,6 -> 1 Worker prcocess each
>>  1 Kafka process each
>>
>> Spark version 1.3.0
>> Kafka Veriosn 0.8.1
>>
>> I am using Kafka Directstream for Kafka Spark Integration.
>> Analytics code is written in using Spark Java API.
>>
>> Problem Statement :
>>
>>   I want to accept a paramter as command line argument, and pass on
>> to the executors.
>>   (want to use the paramter in rdd.foreach method which is executed
>> on executor)
>>
>>   I understand that when driver is started, only the jar is
>> transported to all the Workers.
>>   But i need to use the dynamically passed command line argument in
>> the reduce operation executed on executors.
>>
>>
>> Code Snippets for better understanding my problem :
>>
>> public class KafkaReconcilationJob {
>>
>> private static Logger logger =
>> Logger.getLogger(KafkaReconcilationJob.class);
>>  public static void main(String[] args) throws Exception {
>>   CommandLineArguments.CLICK_THRESHOLD =
>> Integer.parseInt(args[12]);
>> ---> I want to use this
>> command line argument
>> }
>>
>> }
>>
>>
>>
>> JavaRDD adeAggregatedFilteredData =
>> adeAudGeoAggDataRdd.filter(new Function() {
>> @Override
>> public Boolean call(AggregatedAdeStats adeAggregatedObj) throws Exception
>> {
>> if(adeAggregatedObj.getImpr() > CommandLineArguments.IMPR_THRESHOLD ||
>> adeAggregatedObj.getClick() > CommandLineArguments.CLICK_THRESHOLD){
>> return true;
>> }else {
>> return false;
>> }
>> }
>> });
>>
>>
>>
>> The above mentioned Filter operation gets executed on executor which has
>> 0 as the value of the static field CommandLineArguments.CLICK_THRESHOLD
>>
>>
>> Regards,
>> Gaurav
>>
>
>


RE: How to set spark master URL to contain domain name?

2015-06-12 Thread Wang, Ningjun (LNG-NPV)
I think the problem is that in my local etc/hosts file, I have

10.196.116.95 WIN02

I will remove it and try. Thanks for the help.

Ningjun

From: prajod.vettiyat...@wipro.com [mailto:prajod.vettiyat...@wipro.com]
Sent: Friday, June 12, 2015 1:44 AM
To: Wang, Ningjun (LNG-NPV)
Cc: user@spark.apache.org
Subject: RE: How to set spark master URL to contain domain name?

Hi Ningjun,

This is probably a configuration difference between WIN01 and WIN02.

Execute: ipconfig/all on the windows command line on both machines and compare 
them.

Also if you have a localhost entry in the hosts file, it should not have the  
wrong sequence: See the first answer in this link: 
http://stackoverflow.com/questions/6049260/fully-qualified-machine-name-java-with-etc-hosts

Regards,
Prajod

From: Wang, Ningjun (LNG-NPV) [mailto:ningjun.w...@lexisnexis.com]
Sent: 12 June 2015 01:32
To: user@spark.apache.org
Subject: How to set spark master URL to contain domain name?

I start spark master on windows using

bin\spark-class.cmd org.apache.spark.deploy.master.Master

Then I goto http://localhost:8080/ to find the master URL, it is

spark://WIN02:7077

Here WIN02 is my machine name. Why does it missing the domain name? If I start 
the spark master on other machines, the master URL will contain domain name, 
e.g.
spark://WIN01.mycompany.com:7077

Only on machine WIN02, the master URL does not contains the domain name. How 
can I config WIN02 so that spark master URL will also contain the domain name, 
e.g.
Spark://WIN02.mycompany.com

Thanks


Ningjun

The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com


Spark Streaming - Can i BIND Spark Executor to Kafka Partition Leader

2015-06-12 Thread gaurav sharma
Hi,

I am using Kafka Spark cluster for real time aggregation analytics use case
in production.

*Cluster details*

*6 nodes*, each node running 1 Spark and kafka processes each.
Node1  -> 1 Master , 1 Worker, 1 Driver,
   1 Kafka process
Node 2,3,4,5,6 -> 1 Worker prcocess each  1
Kafka process each

Spark version 1.3.0
Kafka Veriosn 0.8.1

I am using *Kafka* *Directstream* for Kafka Spark Integration.
Analytics code is written in using Spark Java API.

*Problem Statement : *

  We are dealing with about *10 M records per hour*.
  My Spark Streaming Batch runs at *1 hour interval*( at 11:30 12:30
1:30 and so on)

  Since i am using Direct Stream, it reads all the data for past hour
at 11:30 12:30 1:30 and so on
  Though as of now it takes *about 3 minutes* to read the data with
Network bandwidth utilization of  *100-200 MBPS per node*( out of 6 node
Spark Cluster)

  Since i am running both Spark and Kafka on same machine
*  I WANT TO BIND MY SPARK EXECUTOR TO KAFKA PARTITION LEADER*, so as
to elliminate the Network bandwidth consumption of Spark.

  I understand that the number of partitions created on Spark for a
Direct Stream is equivalent to the number of partitions on Kafka, which is
the reason got a curiosity, perhaps there might be such a provision in
SPark.



Regards,
Gaurav


Issues with `when` in Column class

2015-06-12 Thread Chris Freeman
I’m trying to iterate through a list of Columns and create new Columns based on 
a condition. However, the when method keeps giving me errors that don’t quite 
make sense.

If I do `when(col === “abc”, 1).otherwise(0)` I get the following error at 
compile time:

[error] not found: value when

However, this works in the REPL just fine after I import 
org.apache.spark.sql.Column.

On the other hand, if I do `col.when(col === “abc”, 1).otherwise(0)`, it will 
compile successfully, but then at runtime, I get this error:

java.lang.IllegalArgumentException: when() can only be applied on a Column 
previously generated by when() function

This appears to be pretty circular logic. How can `when` only be applied to a 
Column previously generated by `when`? How would I call `when` in the first 
place?


Spark Streaming, updateStateByKey and mapPartitions() - and lazy "DatabaseConnection"

2015-06-12 Thread algermissen1971
Hi,

I have a scenario with spark streaming, where I need to write to a database 
from within updateStateByKey[1].

That means that inside my update function I need a connection.

I have so far understood that I should create a new (lazy) connection for every 
partition. But since I am not working in foreachRDD I wonder where I can 
iterate over the partitions.

Should I use mapPartitions() somewhere up the chain? 

Jan



[1] The use case being saving ‘done' sessions during web tracking.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[Spark-1.4.0]jackson-databind conflict?

2015-06-12 Thread Earthson
I'm using Play-2.4 with play-json-2.4, It works fine with spark-1.3.1, but it
failed after I upgrade Spark to spark-1.4.0:(

sc.parallelize(1 to 1).count


[info]   com.fasterxml.jackson.databind.JsonMappingException: Could not find
creator property with name 'id' (in class
org.apache.spark.rdd.RDDOperationScope)
[info]  at [Source: {"id":"0","name":"parallelize"}; line: 1, column: 1]
[info]   at
com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
[info]   at
com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:843)
[info]   at
com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:533)
[info]   at
com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:220)
[info]   at
com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:143)
[info]   at
com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:409)
[info]   at
com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:358)
[info]   at
com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:265)
[info]   at
com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:245)
[info]   at
com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:143)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-jackson-databind-conflict-tp23295.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Spark-1.4.0]jackson-databind conflict?

2015-06-12 Thread Sean Owen
I see the same thing in an app that uses Jackson 2.5. Downgrading to
2.4 made it work. I meant to go back and figure out if there's
something that can be done to work around this in Spark or elsewhere,
but for now, harmonize your Jackson version at 2.4.x if you can.

On Fri, Jun 12, 2015 at 4:20 PM, Earthson  wrote:
> I'm using Play-2.4 with play-json-2.4, It works fine with spark-1.3.1, but it
> failed after I upgrade Spark to spark-1.4.0:(
>
> sc.parallelize(1 to 1).count
>
> 
> [info]   com.fasterxml.jackson.databind.JsonMappingException: Could not find
> creator property with name 'id' (in class
> org.apache.spark.rdd.RDDOperationScope)
> [info]  at [Source: {"id":"0","name":"parallelize"}; line: 1, column: 1]
> [info]   at
> com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
> [info]   at
> com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:843)
> [info]   at
> com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:533)
> [info]   at
> com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:220)
> [info]   at
> com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:143)
> [info]   at
> com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:409)
> [info]   at
> com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:358)
> [info]   at
> com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:265)
> [info]   at
> com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:245)
> [info]   at
> com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:143)
> 
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-jackson-databind-conflict-tp23295.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy "DatabaseConnection"

2015-06-12 Thread Cody Koeninger
There are several database apis that use a thread local or singleton
reference to a connection pool (we use ScalikeJDBC currently, but there are
others).

You can use mapPartitions earlier in the chain to make sure the connection
pool is set up on that executor, then use it inside updateStateByKey

On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 <
algermissen1...@icloud.com> wrote:

> Hi,
>
> I have a scenario with spark streaming, where I need to write to a
> database from within updateStateByKey[1].
>
> That means that inside my update function I need a connection.
>
> I have so far understood that I should create a new (lazy) connection for
> every partition. But since I am not working in foreachRDD I wonder where I
> can iterate over the partitions.
>
> Should I use mapPartitions() somewhere up the chain?
>
> Jan
>
>
>
> [1] The use case being saving ‘done' sessions during web tracking.
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Issues with `when` in Column class

2015-06-12 Thread Yin Huai
Hi Chris,

Have you imported "org.apache.spark.sql.functions._"?

Thanks,

Yin

On Fri, Jun 12, 2015 at 8:05 AM, Chris Freeman  wrote:

>  I’m trying to iterate through a list of Columns and create new Columns
> based on a condition. However, the when method keeps giving me errors that
> don’t quite make sense.
>
>  If I do `when(col === “abc”, 1).otherwise(0)` I get the following error
> at compile time:
>
>  [error] not found: value when
>
>  However, this works in the REPL just fine after I import
> org.apache.spark.sql.Column.
>
>  On the other hand, if I do `col.when(col === “abc”, 1).otherwise(0)`, it
> will compile successfully, but then at runtime, I get this error:
>
>  java.lang.IllegalArgumentException: when() can only be applied on a
> Column previously generated by when() function
>
>  This appears to be pretty circular logic. How can `when` only be applied
> to a Column previously generated by `when`? How would I call `when` in the
> first place?
>


Re: Issues with `when` in Column class

2015-06-12 Thread Chris Freeman
That did it! Thanks!

From: Yin Huai
Date: Friday, June 12, 2015 at 10:31 AM
To: Chris Freeman
Cc: "user@spark.apache.org"
Subject: Re: Issues with `when` in Column class

Hi Chris,

Have you imported "org.apache.spark.sql.functions._"?

Thanks,

Yin

On Fri, Jun 12, 2015 at 8:05 AM, Chris Freeman 
mailto:cfree...@alteryx.com>> wrote:
I’m trying to iterate through a list of Columns and create new Columns based on 
a condition. However, the when method keeps giving me errors that don’t quite 
make sense.

If I do `when(col === “abc”, 1).otherwise(0)` I get the following error at 
compile time:

[error] not found: value when

However, this works in the REPL just fine after I import 
org.apache.spark.sql.Column.

On the other hand, if I do `col.when(col === “abc”, 1).otherwise(0)`, it will 
compile successfully, but then at runtime, I get this error:

java.lang.IllegalArgumentException: when() can only be applied on a Column 
previously generated by when() function

This appears to be pretty circular logic. How can `when` only be applied to a 
Column previously generated by `when`? How would I call `when` in the first 
place?



[Spark-1.4.0] NoSuchMethodError: com.fasterxml.jackson.module.scala.deser.BigDecimalDeserializer

2015-06-12 Thread Tao Li
Hi all:

I complied new spark 1.4.0 version today. But when I run WordCount demo, it
throws NoSuchMethodError "*java.lang.NoSuchMethodError:
com.fasterxml.jackson.module.scala.deser.BigDecimalDeserialize*r".

I found the default "*fasterxml.jackson.version*" is *2.4.4*. It's there
any wrong with the jackson version?


Apache Spark architecture

2015-06-12 Thread Vitalii Duk
Trying to find a complete documentation about an internal architecture of
Apache Spark, but have no results there.

For example I'm trying to understand next thing: Assume that we have 1Tb
text file on HDFS (3 nodes in a cluster, replication factor is 1). This
file will be spitted into 128Mb chunks and each chunk will be stored only
on one node. We run Spark Workers on these nodes. I know that Spark is
trying to work with data stored in HDFS on the same node (to avoid network
I/O). For example I'm trying to do a word count in this 1Tb text file.

Here I have next questions:

   1. Does Spark will load chuck (128Mb) into RAM, count words, and then
   delete it from memory and do it sequentially? What if there will be no
   available RAM?
   2. When does Spark will use not local data on HDFS?
   3. What if I will need to do more complex task, when a results of each
   iteration on each Worker need to be transferred to all other Workers
   (shuffling?), do I need to write them by my self to HDFS and then read
   them? For example I can't understand how does K-means clustering or
   Gradient descent works on Spark.

I will appreciate any link to Apache Spark architecture guide.

-- 
Best regards,
Vitalii Duk


Re: how to use a properties file from a url in spark-submit

2015-06-12 Thread Gary Ogden
That's a great idea. I did what you suggested and added the url to the
props file in the uri of the json. The properties file now shows up in the
sandbox.  But when it goes to run spark-submit  with "--properties-file
props.properties"   it fails to find it:

Exception in thread "main" java.lang.IllegalArgumentException:
requirement failed: Properties file props.properties does not exist


On 11 June 2015 at 22:17, Matthew Jones  wrote:

> If you are using chronos you can just put the url in the task json and
> chronos will download it into your sandbox. Then just use spark-submit
> --properties-file app.properties.
>
> On Thu, 11 Jun 2015 15:52 Marcelo Vanzin  wrote:
>
>> That's not supported. You could use wget / curl to download the file to a
>> temp location before running spark-submit, though.
>>
>> On Thu, Jun 11, 2015 at 12:48 PM, Gary Ogden  wrote:
>>
>>> I have a properties file that is hosted at a url. I would like to be
>>> able to use the url in the --properties-file parameter when submitting a
>>> job to mesos using spark-submit via chronos
>>>
>>> I would rather do this than use a file on the local server.
>>>
>>> This doesn't seem to work though when submitting from chronos:
>>>
>>> bin/spark-submit --properties-file http://server01/props/app.properties
>>>
>>>
>>> Inside the properties file:
>>> spark.executor.memory=256M
>>> spark.cores.max=1
>>> spark.shuffle.consolidateFiles=true
>>> spark.task.cpus=1
>>> spark.deploy.defaultCores=1
>>> spark.driver.cores=1
>>> spark.scheduler.mode=FAIR
>>>
>>> So how do I specify a properties file in a url?
>>>
>>
>>
>>
>> --
>> Marcelo
>>
>


Re: ClassCastException: BlockManagerId cannot be cast to [B

2015-06-12 Thread davidkl
Hello,

Just in case someone finds the same issue, it was caused by running the
streaming app with different version of the cluster jars (the uber jar
contained both yarn and spark).

Regards

J



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ClassCastException-BlockManagerId-cannot-be-cast-to-B-tp23276p23296.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to use a properties file from a url in spark-submit

2015-06-12 Thread Matthew Jones
Hmm either spark-submit isn't picking up the relative path or Chronos is
not setting your working directory to your sandbox. Try using "cd
$MESOS_SANDBOX && spark-submit --properties-file props.properties"

On Fri, Jun 12, 2015 at 12:32 PM Gary Ogden  wrote:

> That's a great idea. I did what you suggested and added the url to the
> props file in the uri of the json. The properties file now shows up in the
> sandbox.  But when it goes to run spark-submit  with "--properties-file
> props.properties"   it fails to find it:
>
> Exception in thread "main" java.lang.IllegalArgumentException: requirement 
> failed: Properties file props.properties does not exist
>
>
> On 11 June 2015 at 22:17, Matthew Jones  wrote:
>
>> If you are using chronos you can just put the url in the task json and
>> chronos will download it into your sandbox. Then just use spark-submit
>> --properties-file app.properties.
>>
>> On Thu, 11 Jun 2015 15:52 Marcelo Vanzin  wrote:
>>
>>> That's not supported. You could use wget / curl to download the file to
>>> a temp location before running spark-submit, though.
>>>
>>> On Thu, Jun 11, 2015 at 12:48 PM, Gary Ogden  wrote:
>>>
 I have a properties file that is hosted at a url. I would like to be
 able to use the url in the --properties-file parameter when submitting a
 job to mesos using spark-submit via chronos

 I would rather do this than use a file on the local server.

 This doesn't seem to work though when submitting from chronos:

 bin/spark-submit --properties-file http://server01/props/app.properties


 Inside the properties file:
 spark.executor.memory=256M
 spark.cores.max=1
 spark.shuffle.consolidateFiles=true
 spark.task.cpus=1
 spark.deploy.defaultCores=1
 spark.driver.cores=1
 spark.scheduler.mode=FAIR

 So how do I specify a properties file in a url?

>>>
>>>
>>>
>>> --
>>> Marcelo
>>>
>>
>


Spark Java API and minimum set of 3rd party dependencies

2015-06-12 Thread Elkhan Dadashov
Hi all,

We want to integrate Spark in our Java application using the Spark Java Api
and run then on the Yarn clusters.

If i want to run Spark on Yarn, which dependencies are must for including ?

I looked at Spark POM

which lists that Spark requires 50+ 3rd party dependencies.

Is there minimum set of Spark dependencies which are necessary for Spark
Java API  (for Spark client run on Yarn cluster) ?

Thanks in advance.


[Spark 1.4.0]How to set driver's system property using spark-submit options?

2015-06-12 Thread Peng Cheng
In Spark <1.3.x, the system property of the driver can be set by --conf
option, shared between setting spark properties and system properties.

In Spark 1.4.0 this feature is removed, the driver instead log the following
warning:

Warning: Ignoring non-spark config property: xxx.xxx=v

How do set driver's system property in 1.4.0? Is there a reason it is
removed without a deprecation warning?

Thanks a lot for your advices.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Java API and minimum set of 3rd party dependencies

2015-06-12 Thread Sean Owen
You don't add dependencies to your app -- you mark Spark as 'provided'
in the build and you rely on the deployed Spark environment to provide
it.

On Fri, Jun 12, 2015 at 7:14 PM, Elkhan Dadashov  wrote:
> Hi all,
>
> We want to integrate Spark in our Java application using the Spark Java Api
> and run then on the Yarn clusters.
>
> If i want to run Spark on Yarn, which dependencies are must for including ?
>
> I looked at Spark POM which lists that Spark requires 50+ 3rd party
> dependencies.
>
> Is there minimum set of Spark dependencies which are necessary for Spark
> Java API  (for Spark client run on Yarn cluster) ?
>
> Thanks in advance.
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Spark 1.4.0]How to set driver's system property using spark-submit options?

2015-06-12 Thread Andrew Or
Hi Peng,

Setting properties through --conf should still work in Spark 1.4. From the
warning it looks like the config you are trying to set does not start with
the prefix "spark.". What is the config that you are trying to set?

-Andrew

2015-06-12 11:17 GMT-07:00 Peng Cheng :

> In Spark <1.3.x, the system property of the driver can be set by --conf
> option, shared between setting spark properties and system properties.
>
> In Spark 1.4.0 this feature is removed, the driver instead log the
> following
> warning:
>
> Warning: Ignoring non-spark config property: xxx.xxx=v
>
> How do set driver's system property in 1.4.0? Is there a reason it is
> removed without a deprecation warning?
>
> Thanks a lot for your advices.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Exception when using CLUSTER BY or ORDER BY

2015-06-12 Thread Reynold Xin
Tom,

Can you file a JIRA and attach a small reproducible test case if possible?


On Tue, May 19, 2015 at 1:50 PM, Thomas Dudziak  wrote:

> Under certain circumstances that I haven't yet been able to isolate, I get
> the following error when doing a HQL query using HiveContext (Spark 1.3.1
> on Mesos, fine-grained mode). Is this a known problem or should I file a
> JIRA for it ?
>
>
> org.apache.spark.SparkException: Can only zip RDDs with same number of 
> elements in each partition
>   at 
> org.apache.spark.rdd.RDD$$anonfun$zip$1$$anon$1.hasNext(RDD.scala:746)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:56)
>   at 
> org.apache.spark.RangePartitioner$$anonfun$8.apply(Partitioner.scala:259)
>   at 
> org.apache.spark.RangePartitioner$$anonfun$8.apply(Partitioner.scala:257)
>   at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
>   at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>   at org.apache.spark.scheduler.Task.run(Task.scala:64)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
>
>


log4j configuration ignored for some classes only

2015-06-12 Thread lomax0...@gmail.com
Hi all,

I am running spark standalone (local[*]), and have tried to cut back on
some of the logging noise from the framework by editing log4j.properties in
spark/conf.
The following lines are working as expected:

log4j.logger.org.apache.spark=WARN
log4j.logger.org.apache.spark.storage.BlockManager=ERROR

(I've even guaranteed that it's definitely using my configuration by adding
a prefix to the conversion pattern).

However, I am still getting log messages at INFO from classes like:
org.apache.spark.Logging$class(should be covered by the
org.apache.spark setting)
kafka.utils.Logging$class(when I add a similar setting for
kafka.utils)

I suspect it's because these are inner classes. It still happens even when
I go up a level and add configurations like "log4j.logger.org=WARN".

Is this a known bug in log4j? Is there any known way to suppress these,
ideally through configuration rather than programmatically?

Many thanks


Re: Spark Java API and minimum set of 3rd party dependencies

2015-06-12 Thread Elkhan Dadashov
Thanks for prompt response, Sean.

The issue is that we are restricted on dependencies we can include in our
project.

There are 2 issues while including dependencies:

1) there are several dependencies which we and Spark has, but the versions
are conflicting.
2) there are dependencies Spark has, and our project does not have.

How do we handle these 2 cases differently for including Spark dependencies
(direct and transitive ones)?

We need to include all dependencies (so there should not be 3rd party
transitive dependency) in our POM file, more like Apache Ivy style of
management of dependencies (which includes all transitive dependencies in
POM file) rather than Maven style.

Our main goal is: We want to integrate Spark in our Java application using
the Spark Java APIi and run then on the Yarn clusters.

Thanks a lot.


On Fri, Jun 12, 2015 at 11:17 AM, Sean Owen  wrote:

> You don't add dependencies to your app -- you mark Spark as 'provided'
> in the build and you rely on the deployed Spark environment to provide
> it.
>
> On Fri, Jun 12, 2015 at 7:14 PM, Elkhan Dadashov 
> wrote:
> > Hi all,
> >
> > We want to integrate Spark in our Java application using the Spark Java
> Api
> > and run then on the Yarn clusters.
> >
> > If i want to run Spark on Yarn, which dependencies are must for
> including ?
> >
> > I looked at Spark POM which lists that Spark requires 50+ 3rd party
> > dependencies.
> >
> > Is there minimum set of Spark dependencies which are necessary for Spark
> > Java API  (for Spark client run on Yarn cluster) ?
> >
> > Thanks in advance.
> >
>



-- 

Best regards,
Elkhan Dadashov


How to use spark for map-reduce flow to filter N columns, top M rows of all csv files under a folder?

2015-06-12 Thread Rex X
To be concrete, say we have a folder with thousands of tab-delimited csv
files with following attributes format (each csv file is about 10GB):

idnameaddresscity...
1Mattadd1LA...
2Willadd2LA...
3Lucyadd3SF...
...

And we have a lookup table based on "name" above

namegender
MattM
LucyF
...

Now we are interested to output from top 1000 rows of each csv file into
following format:

idnamegender
1MattM
...

Can we use pyspark to efficiently handle this?


Re: [Spark-1.4.0] NoSuchMethodError: com.fasterxml.jackson.module.scala.deser.BigDecimalDeserializer

2015-06-12 Thread Tao Li
Anyone met the same problem like me?

2015-06-12 23:40 GMT+08:00 Tao Li :

> Hi all:
>
> I complied new spark 1.4.0 version today. But when I run WordCount demo,
> it throws NoSuchMethodError "*java.lang.NoSuchMethodError:
> com.fasterxml.jackson.module.scala.deser.BigDecimalDeserialize*r".
>
> I found the default "*fasterxml.jackson.version*" is *2.4.4*. It's there
> any wrong with the jackson version?
>


Re: how to use a properties file from a url in spark-submit

2015-06-12 Thread Gary Ogden
Turns out one of the other developers wrapped the jobs in script and did a
cd to another folder in the script before executing spark-submit.

On 12 June 2015 at 14:20, Matthew Jones  wrote:

> Hmm either spark-submit isn't picking up the relative path or Chronos is
> not setting your working directory to your sandbox. Try using "cd
> $MESOS_SANDBOX && spark-submit --properties-file props.properties"
>
> On Fri, Jun 12, 2015 at 12:32 PM Gary Ogden  wrote:
>
>> That's a great idea. I did what you suggested and added the url to the
>> props file in the uri of the json. The properties file now shows up in the
>> sandbox.  But when it goes to run spark-submit  with "--properties-file
>> props.properties"   it fails to find it:
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: requirement 
>> failed: Properties file props.properties does not exist
>>
>>
>> On 11 June 2015 at 22:17, Matthew Jones  wrote:
>>
>>> If you are using chronos you can just put the url in the task json and
>>> chronos will download it into your sandbox. Then just use spark-submit
>>> --properties-file app.properties.
>>>
>>> On Thu, 11 Jun 2015 15:52 Marcelo Vanzin  wrote:
>>>
 That's not supported. You could use wget / curl to download the file to
 a temp location before running spark-submit, though.

 On Thu, Jun 11, 2015 at 12:48 PM, Gary Ogden  wrote:

> I have a properties file that is hosted at a url. I would like to be
> able to use the url in the --properties-file parameter when submitting a
> job to mesos using spark-submit via chronos
>
> I would rather do this than use a file on the local server.
>
> This doesn't seem to work though when submitting from chronos:
>
> bin/spark-submit --properties-file
> http://server01/props/app.properties
>
> Inside the properties file:
> spark.executor.memory=256M
> spark.cores.max=1
> spark.shuffle.consolidateFiles=true
> spark.task.cpus=1
> spark.deploy.defaultCores=1
> spark.driver.cores=1
> spark.scheduler.mode=FAIR
>
> So how do I specify a properties file in a url?
>



 --
 Marcelo

>>>
>>


Re: Reading file from S3, facing java.lang.NoClassDefFoundError: org/jets3t/service/ServiceException

2015-06-12 Thread Akhil Das
Looks like your spark is not able to pick up the HADOOP_CONF. To fix this,
you can actually add jets3t-0.9.0.jar to the classpath
(sc.addJar(/path/to/jets3t-0.9.0.jar).

Thanks
Best Regards

On Thu, Jun 11, 2015 at 6:44 PM, shahab  wrote:

> Hi,
>
> I tried to read a csv file from amazon s3, but I get the following
> exception which I have no clue how to solve this. I tried both spark 1.3.1
> and 1.2.1, but no success.  Any idea how to solve this is appreciated.
>
>
> best,
> /Shahab
>
> the code:
>
> val hadoopConf=sc.hadoopConfiguration;
>
>  hadoopConf.set("fs.s3.impl",
> "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
>
>  hadoopConf.set("fs.s3.awsAccessKeyId", aws_access_key_id)
>
>  hadoopConf.set("fs.s3.awsSecretAccessKey", aws_secret_access_key)
>
>  val csv = sc.textFile(""s3n://mybucket/info.csv")  // original file
>
>  val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines
> in rows
>
>
> Here is the exception I faced:
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/jets3t/service/ServiceException
>
> at org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(
> NativeS3FileSystem.java:280)
>
> at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(
> NativeS3FileSystem.java:270)
>
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
>
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
>
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
>
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
>
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
>
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
>
> at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(
> FileInputFormat.java:256)
>
> at org.apache.hadoop.mapred.FileInputFormat.listStatus(
> FileInputFormat.java:228)
>
> at org.apache.hadoop.mapred.FileInputFormat.getSplits(
> FileInputFormat.java:304)
>
> at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
>
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>
> at scala.Option.getOrElse(Option.scala:120)
>
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>
> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(
> MapPartitionsRDD.scala:32)
>
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>
> at scala.Option.getOrElse(Option.scala:120)
>
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>
> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(
> MapPartitionsRDD.scala:32)
>
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>
> at scala.Option.getOrElse(Option.scala:120)
>
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512)
>
> at org.apache.spark.rdd.RDD.count(RDD.scala:1006)
>


Re: spark stream and spark sql with data warehouse

2015-06-12 Thread Akhil Das
This is a good start, if you haven't read it already
http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations

Thanks
Best Regards

On Thu, Jun 11, 2015 at 8:17 PM, 唐思成  wrote:

> Hi all:
>We are trying to using spark to do some real time data processing.
> I need do some sql-like query and analytical tasks with the real time data
> against historical normalized data stored in data bases. Is there anyone
> has done this kind of work or design? Any suggestion or material would be
> truly welcomed.
>
>
>


[Spark 1.4.0] java.lang.UnsupportedOperationException: Not implemented by the TFS FileSystem implementation

2015-06-12 Thread Peter Haumer


Hello.
I used to be able to run debug my Spark apps in Eclipse for Spark 1.3.1 by
creating a launch and setting the vm var "-Dspark.master=local[4]".
I am not playing with the new 1.4 and trying out some of my simple samples,
which all fail with the same exception as shown below. Running them with
spark-submit works fine.

Anybody has any hints for getting it to work in the IDE again? It seems to
be related to loading input files, which path I provide via the main args
and the load via sc.textFile() in Java8. Are there any new options that I
missed to tell the app to use the local file system?

Exception in thread "main" java.lang.UnsupportedOperationException: Not
implemented by the TFS FileSystem implementation
at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:213)
at org.apache.hadoop.fs.FileSystem.loadFileSystems(
FileSystem.java:2401)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(
FileSystem.java:2411)
at org.apache.hadoop.fs.FileSystem.createFileSystem(
FileSystem.java:2428)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(
FileSystem.java:2467)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:166)
at org.apache.hadoop.mapred.JobConf.getWorkingDirectory(
JobConf.java:653)
at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(
FileInputFormat.java:389)
at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(
FileInputFormat.java:362)
at org.apache.spark.SparkContext$$anonfun$28.apply(
SparkContext.scala:762)
at org.apache.spark.SparkContext$$anonfun$28.apply(
SparkContext.scala:762)
at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(
HadoopRDD.scala:172)
at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(
HadoopRDD.scala:172)
at scala.Option.map(Option.scala:145)
at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:172)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:196)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219
)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217
)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(
MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219
)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217
)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(
MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219
)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217
)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(
MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219
)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217
)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1535)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:900)
at org.apache.spark.api.java.JavaRDDLike$class.reduce(
JavaRDDLike.scala:357)
at org.apache.spark.api.java.AbstractJavaRDDLike.reduce(
JavaRDDLike.scala:46)
at com.databricks.apps.logs.LogAnalyzer.main(LogAnalyzer.java:60)



Thanks and best regards,
Peter Haumer.

Re: Limit Spark Shuffle Disk Usage

2015-06-12 Thread Akhil Das
You can disable shuffle spill (spark.shuffle.spill
)
if you are having enough memory to hold that much data. I believe adding
more resources would be your only choice.

Thanks
Best Regards

On Thu, Jun 11, 2015 at 9:46 PM, Al M  wrote:

> I am using Spark on a machine with limited disk space.  I am using it to
> analyze very large (100GB to 1TB per file) data sets stored in HDFS.  When
> I
> analyze these datasets, I will run groups, joins and cogroups.  All of
> these
> operations mean lots of shuffle files written to disk.
>
> Unfortunately what happens is my disk fills up very quickly (I only have
> 40GB free).  Then my process dies because I don't have enough space on
> disk.
> I don't want to write my shuffles to HDFS because it's already pretty full.
> The shuffle files are cleared up between runs, but this doesnt help when a
> single run requires 300GB+ shuffle disk space.
>
> Is there any way that I can limit the amount of disk space used by my
> shuffles?  I could set up a cron job to delete old shuffle files whilst the
> job is still running, but I'm concerned that they are left there for a good
> reason.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Limit-Spark-Shuffle-Disk-Usage-tp23279.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


--jars not working?

2015-06-12 Thread Jonathan Coveney
Spark version is 1.3.0 (will upgrade as soon as we upgrade past mesos
0.19.0)...

Regardless, I'm running into a really weird situation where when I pass
--jars to bin/spark-shell I can't reference those classes on the repl. Is
this expected? The logs even tell me that my jars have been added, and yet
the classes inside of them are not available.

Am I missing something obvious?


Re: takeSample() results in two stages

2015-06-12 Thread Imran Rashid
It launches two jobs because it doesn't know ahead of time how big your RDD
is, so it doesn't know what the sampling rate should be.  After counting
all the records, it can determine what the sampling rate should be -- then
it does another pass through the data, sampling by the rate its just
determined.

Note that this suggests: (a) if you know the size of your RDD ahead of
time, you could eliminate that first pass and (b) since you end up
computing the input RDD twice, it may make sense to cache it.

On Thu, Jun 11, 2015 at 11:43 AM, barmaley  wrote:

> I've observed interesting behavior in Spark 1.3.1, the reason for which is
> not clear.
>
> Doing something as simple as sc.textFile("...").takeSample(...) always
> results in two stages:Spark's takeSample() results in two stages
>
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n23280/Capture.jpg
> >
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/takeSample-results-in-two-stages-tp23280.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Optimizing Streaming from Websphere MQ

2015-06-12 Thread Akhil Das
How many cores are you allocating for your job? And how many receivers are
you having? It would be good if you can post your custom receiver code, it
will help people to understand it better and shed some light.

Thanks
Best Regards

On Fri, Jun 12, 2015 at 12:58 PM, Chaudhary, Umesh <
umesh.chaudh...@searshc.com> wrote:

>  Hi,
>
> I have created a Custom Receiver in Java which receives data from
> Websphere MQ and I am only writing the received records on HDFS.
>
>
>
> I have referred many forums for optimizing speed of spark streaming
> application. Here I am listing a few:
>
>
>
> · Spark Official
> 
>
> · VIrdata 
>
> ·  TD’s Slide (A bit Old but Useful)
> 
>
>
>
> I got mainly two point for my applicability :
>
>
>
> · giving batch interval as 1 sec
>
> · Controlling “spark.streaming.blockInterval” =200ms
>
> · inputStream.repartition(3)
>
>
>
> But that did not improve my actual speed (records/sec) of receiver which
> is MAX 5-10 records /sec. This is way less from my expectation.
>
> Am I missing something?
>
>
>
> Regards,
>
> Umesh Chaudhary
>  This message, including any attachments, is the property of Sears
> Holdings Corporation and/or one of its subsidiaries. It is confidential and
> may contain proprietary or legally privileged information. If you are not
> the intended recipient, please delete it without reading the contents.
> Thank you.
>


Re: --jars not working?

2015-06-12 Thread Akhil Das
You can verify if the jars are shipped properly by looking at the driver UI
(running on 4040) Environment tab.

Thanks
Best Regards

On Sat, Jun 13, 2015 at 12:43 AM, Jonathan Coveney 
wrote:

> Spark version is 1.3.0 (will upgrade as soon as we upgrade past mesos
> 0.19.0)...
>
> Regardless, I'm running into a really weird situation where when I pass
> --jars to bin/spark-shell I can't reference those classes on the repl. Is
> this expected? The logs even tell me that my jars have been added, and yet
> the classes inside of them are not available.
>
> Am I missing something obvious?
>


Re: Re: How to keep a SQLContext instance alive in a spark streaming application's life cycle?

2015-06-12 Thread Tathagata Das
BTW, in Spark 1.4 announced today, I added SQLContext.getOrCreate. So you
dont need to create the singleton yourself.

On Wed, Jun 10, 2015 at 3:21 AM, Sergio Jiménez Barrio <
drarse.a...@gmail.com> wrote:

> Note: CCing user@spark.apache.org
>
>
> First, you must check if the RDD is empty:
>
>  messages.foreachRDD { rdd =>
>  if (!rdd.isEmpty) { }}
>
> Now, you can obtain the instance of a SQLContext:
>
> val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
>
>
>
>
> *Optional*
> In this moment, I like work with DataFrame. I convert RDD to DataFrame. I
> see that you recive a JSON:
>
> val df :DataFrame = sqlContext.jsonRDD(message,
> getSchema(getSchemaStr)).toDF()
>
>
> My getSchema function create a Schema of my JSON:
>
> def getSchemaStr() :String = "feature1 feature2 ..."
>
> def getSchema(schema: String) :StructType = StructType (schema.split("
> ").map(fieldName => StructField(fieldName, StringType, true)))
>
> I hope you helps.
>
> Regards.
>
>
>
> 2015-06-09 17:36 GMT+02:00 codingforfun [via Apache Spark User List] <
> ml-node+s1001560n23226...@n3.nabble.com>:
>
>> I don't know why, you said “Why? I tried this solution and works fine.”
>> means your SQLContext instance alive all the streaming application’s life
>> time, rather than one bath duration ? My code as below:
>>
>>
>> object SQLContextSingleton extends java.io.Serializable{
>>   @transient private var instance: SQLContext = null
>>
>>   // Instantiate SQLContext on demand
>>   def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
>> if (instance == null) {
>>   instance = new SQLContext(sparkContext)
>> }
>> instance
>>   }
>> }
>>
>> // type_->typex, id_->id, url_->url
>> case class  (time: Timestamp, id: Int, openfrom: Int, tab: Int) extends 
>> Serializable
>> case class Count(x: Int)
>>
>> @transient val ssc = new StreamingContext(sc, new Duration(5 * 1000))
>> ssc.checkpoint(".")
>>
>> val kafkaParams = Map("metadata.broker.list" -> "10.20.30.40:9092,")
>> @transient val dstream = KafkaUtils.createDirectStream[String, String, 
>> StringDecoder, StringDecoder](ssc, kafkaParams, Set("topic_name"))
>> @transient val dddstream= newsIdDStream.map(x => x._2).flatMap(x => 
>> x.split("\n"))
>>
>> dddstream.foreachRDD { rdd =>
>> 
>> SQLContextSingleton.getInstance(rdd.sparkContext).jsonRDD(rdd).registerTempTable("ttable")
>> val ret = SQLContextSingleton.getInstance(rdd.sparkContext).sql("SELECT 
>> COUNT(*) FROM ttable")
>> ret.foreach{ x => println(x(0)) }
>> }
>>
>> ssc.start()
>> ssc.awaitTermination()
>>
>>
>>
>>
>>
>>
>> 在 2015-06-09 17:41:44,"drarse [via Apache Spark User List]" <[hidden
>> email] > 写道:
>>
>> Why? I  tried  this solution and works fine.
>>
>> El martes, 9 de junio de 2015, codingforfun [via Apache Spark User List] 
>> <[hidden
>> email] > escribió:
>>
>>> Hi drarse, thanks for replying, the way you said use a singleton object
>>> does not work
>>>
>>>
>>>
>>>
>>> 在 2015-06-09 16:24:25,"drarse [via Apache Spark User List]" <[hidden
>>> email] > 写道:
>>>
>>> The best way is create a singleton object like:
>>>
>>> object SQLContextSingleton {
   @transient private var instance: SQLContext = null

   // Instantiate SQLContext on demand
   def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
 if (instance == null) {
   instance = new SQLContext(sparkContext)
 }
 instance
   }}

  You have more information in the programming guide:
>>>
>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations
>>>
>>>
>>>
>>> 2015-06-09 9:27 GMT+02:00 codingforfun [via Apache Spark User List] <[hidden
>>> email] >:
>>>
 I used SQLContext in a spark streaming application as blew:

 

 case class topic_name (f1: Int, f2: Int)

 val sqlContext = new SQLContext(sc)
 @transient val ssc = new StreamingContext(sc, new Duration(5 * 1000))
 ssc.checkpoint(".")
 val theDStream = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, Set("topic_name"))

 theDStream.map(x => x._2).foreach { rdd =>
   sqlContext.jsonRDD(newsIdRDD).registerTempTable("topic_name")
   sqlContext.sql("select count(*) from topic_name").foreach { x =>
 WriteToFile("file_path", x(0).toString)
   }
 }

 ssc.start()
 ssc.awaitTermination()
 


 I found i could only get every 5 se

Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy "DatabaseConnection"

2015-06-12 Thread algermissen1971
Cody,

On 12 Jun 2015, at 17:26, Cody Koeninger  wrote:

> There are several database apis that use a thread local or singleton 
> reference to a connection pool (we use ScalikeJDBC currently, but there are 
> others).
>  
> You can use mapPartitions earlier in the chain to make sure the connection 
> pool is set up on that executor, then use it inside updateStateByKey
> 

Thanks. You are saying I should just make an arbitrary use of the ‘connection’ 
to invoke the ‘lazy’. E.g. like this:

object SomeDB {

  lazy val conn = new SomeDB( “some serializable config")

}


Then somewhere else:

theTrackingEvents.map(toPairs).mapPartitions(iter => iter.map( pair => {
  SomeDb.conn.init
  pair
   }
)).updateStateByKey[Session](myUpdateFunction _)


An in myUpdateFunction

def myUpdateFunction( …) {

SomeDb.conn.store(  … )

}


Correct?

Jan




> On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 
>  wrote:
> Hi,
> 
> I have a scenario with spark streaming, where I need to write to a database 
> from within updateStateByKey[1].
> 
> That means that inside my update function I need a connection.
> 
> I have so far understood that I should create a new (lazy) connection for 
> every partition. But since I am not working in foreachRDD I wonder where I 
> can iterate over the partitions.
> 
> Should I use mapPartitions() somewhere up the chain?
> 
> Jan
> 
> 
> 
> [1] The use case being saving ‘done' sessions during web tracking.
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy "DatabaseConnection"

2015-06-12 Thread Cody Koeninger
Close.  the mapPartitions call doesn't need to do anything at all to the
iter.

mapPartitions { iter =>
  SomeDb.conn.init
  iter
}

On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971  wrote:

> Cody,
>
> On 12 Jun 2015, at 17:26, Cody Koeninger  wrote:
>
> > There are several database apis that use a thread local or singleton
> reference to a connection pool (we use ScalikeJDBC currently, but there are
> others).
> >
> > You can use mapPartitions earlier in the chain to make sure the
> connection pool is set up on that executor, then use it inside
> updateStateByKey
> >
>
> Thanks. You are saying I should just make an arbitrary use of the
> ‘connection’ to invoke the ‘lazy’. E.g. like this:
>
> object SomeDB {
>
>   lazy val conn = new SomeDB( “some serializable config")
>
> }
>
>
> Then somewhere else:
>
> theTrackingEvents.map(toPairs).mapPartitions(iter => iter.map( pair => {
>   SomeDb.conn.init
>   pair
>}
> )).updateStateByKey[Session](myUpdateFunction _)
>
>
> An in myUpdateFunction
>
> def myUpdateFunction( …) {
>
> SomeDb.conn.store(  … )
>
> }
>
>
> Correct?
>
> Jan
>
>
>
>
> > On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 <
> algermissen1...@icloud.com> wrote:
> > Hi,
> >
> > I have a scenario with spark streaming, where I need to write to a
> database from within updateStateByKey[1].
> >
> > That means that inside my update function I need a connection.
> >
> > I have so far understood that I should create a new (lazy) connection
> for every partition. But since I am not working in foreachRDD I wonder
> where I can iterate over the partitions.
> >
> > Should I use mapPartitions() somewhere up the chain?
> >
> > Jan
> >
> >
> >
> > [1] The use case being saving ‘done' sessions during web tracking.
> >
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
> >
>
>


Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy "DatabaseConnection"

2015-06-12 Thread algermissen1971

On 12 Jun 2015, at 22:59, Cody Koeninger  wrote:

> Close.  the mapPartitions call doesn't need to do anything at all to the iter.
> 
> mapPartitions { iter =>
>   SomeDb.conn.init
>   iter
> }

Yes, thanks!

Maybe you can confirm two more things and then you helped me make a giant leap 
today:

a) When using spark streaming, will this happen exactly once per executor? I 
mean: is mapPartitions called once per executor for the lifetime of the stream?

Or should I rather think once per stage?


b) I actually need an ActorSystem and FlowMaterializer (for making an Akka-HTTP 
request to store the data), not a DB connection - I presume this does not 
changethe concept?


Jan



> 
> On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971  
> wrote:
> Cody,
> 
> On 12 Jun 2015, at 17:26, Cody Koeninger  wrote:
> 
> > There are several database apis that use a thread local or singleton 
> > reference to a connection pool (we use ScalikeJDBC currently, but there are 
> > others).
> >
> > You can use mapPartitions earlier in the chain to make sure the connection 
> > pool is set up on that executor, then use it inside updateStateByKey
> >
> 
> Thanks. You are saying I should just make an arbitrary use of the 
> ‘connection’ to invoke the ‘lazy’. E.g. like this:
> 
> object SomeDB {
> 
>   lazy val conn = new SomeDB( “some serializable config")
> 
> }
> 
> 
> Then somewhere else:
> 
> theTrackingEvents.map(toPairs).mapPartitions(iter => iter.map( pair => {
>   SomeDb.conn.init
>   pair
>}
> )).updateStateByKey[Session](myUpdateFunction _)
> 
> 
> An in myUpdateFunction
> 
> def myUpdateFunction( …) {
> 
> SomeDb.conn.store(  … )
> 
> }
> 
> 
> Correct?
> 
> Jan
> 
> 
> 
> 
> > On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 
> >  wrote:
> > Hi,
> >
> > I have a scenario with spark streaming, where I need to write to a database 
> > from within updateStateByKey[1].
> >
> > That means that inside my update function I need a connection.
> >
> > I have so far understood that I should create a new (lazy) connection for 
> > every partition. But since I am not working in foreachRDD I wonder where I 
> > can iterate over the partitions.
> >
> > Should I use mapPartitions() somewhere up the chain?
> >
> > Jan
> >
> >
> >
> > [1] The use case being saving ‘done' sessions during web tracking.
> >
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
> >
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy "DatabaseConnection"

2015-06-12 Thread Cody Koeninger
A. No, it's called once per partition.  Usually you have more partitions
than executors, so it will end up getting called multiple times per
executor.  But you can use a lazy val, singleton, etc to make sure the
setup only takes place once per JVM.

B.  I cant speak to the specifics there ... but as long as you're making
sure the setup gets called at most once per executor, before the work that
needs it ... should be ok.

On Fri, Jun 12, 2015 at 4:11 PM, algermissen1971  wrote:

>
> On 12 Jun 2015, at 22:59, Cody Koeninger  wrote:
>
> > Close.  the mapPartitions call doesn't need to do anything at all to the
> iter.
> >
> > mapPartitions { iter =>
> >   SomeDb.conn.init
> >   iter
> > }
>
> Yes, thanks!
>
> Maybe you can confirm two more things and then you helped me make a giant
> leap today:
>
> a) When using spark streaming, will this happen exactly once per executor?
> I mean: is mapPartitions called once per executor for the lifetime of the
> stream?
>
> Or should I rather think once per stage?
>
>
> b) I actually need an ActorSystem and FlowMaterializer (for making an
> Akka-HTTP request to store the data), not a DB connection - I presume this
> does not changethe concept?
>
>
> Jan
>
>
>
> >
> > On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 <
> algermissen1...@icloud.com> wrote:
> > Cody,
> >
> > On 12 Jun 2015, at 17:26, Cody Koeninger  wrote:
> >
> > > There are several database apis that use a thread local or singleton
> reference to a connection pool (we use ScalikeJDBC currently, but there are
> others).
> > >
> > > You can use mapPartitions earlier in the chain to make sure the
> connection pool is set up on that executor, then use it inside
> updateStateByKey
> > >
> >
> > Thanks. You are saying I should just make an arbitrary use of the
> ‘connection’ to invoke the ‘lazy’. E.g. like this:
> >
> > object SomeDB {
> >
> >   lazy val conn = new SomeDB( “some serializable config")
> >
> > }
> >
> >
> > Then somewhere else:
> >
> > theTrackingEvents.map(toPairs).mapPartitions(iter => iter.map( pair => {
> >   SomeDb.conn.init
> >   pair
> >}
> > )).updateStateByKey[Session](myUpdateFunction _)
> >
> >
> > An in myUpdateFunction
> >
> > def myUpdateFunction( …) {
> >
> > SomeDb.conn.store(  … )
> >
> > }
> >
> >
> > Correct?
> >
> > Jan
> >
> >
> >
> >
> > > On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 <
> algermissen1...@icloud.com> wrote:
> > > Hi,
> > >
> > > I have a scenario with spark streaming, where I need to write to a
> database from within updateStateByKey[1].
> > >
> > > That means that inside my update function I need a connection.
> > >
> > > I have so far understood that I should create a new (lazy) connection
> for every partition. But since I am not working in foreachRDD I wonder
> where I can iterate over the partitions.
> > >
> > > Should I use mapPartitions() somewhere up the chain?
> > >
> > > Jan
> > >
> > >
> > >
> > > [1] The use case being saving ‘done' sessions during web tracking.
> > >
> > >
> > > -
> > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > > For additional commands, e-mail: user-h...@spark.apache.org
> > >
> > >
> >
> >
>
>


Parquet Multiple Output

2015-06-12 Thread Xin Liu
Hi,

I have a scenario where I'd like to store a RDD using parquet format in
many files, which corresponds to days, such as 2015/01/01, 2015/02/02, etc.

So far I used this method

http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job

to store text files (then I have to read text files and convert to parquet
and store again). Anyone has tried to store many parquet files from one RDD?

Thanks,
Xin


Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy "DatabaseConnection"

2015-06-12 Thread algermissen1971


On 12 Jun 2015, at 23:19, Cody Koeninger  wrote:

> A. No, it's called once per partition.  Usually you have more partitions than 
> executors, so it will end up getting called multiple times per executor.  But 
> you can use a lazy val, singleton, etc to make sure the setup only takes 
> place once per JVM.
> 
> B.  I cant speak to the specifics there ... but as long as you're making sure 
> the setup gets called at most once per executor, before the work that needs 
> it ... should be ok.
> 

Great thanks so much - 

(I guess I am not yet clear about the relationship of partition / executor / 
stage, but I get the idea.)

Jan


> On Fri, Jun 12, 2015 at 4:11 PM, algermissen1971  
> wrote:
> 
> On 12 Jun 2015, at 22:59, Cody Koeninger  wrote:
> 
> > Close.  the mapPartitions call doesn't need to do anything at all to the 
> > iter.
> >
> > mapPartitions { iter =>
> >   SomeDb.conn.init
> >   iter
> > }
> 
> Yes, thanks!
> 
> Maybe you can confirm two more things and then you helped me make a giant 
> leap today:
> 
> a) When using spark streaming, will this happen exactly once per executor? I 
> mean: is mapPartitions called once per executor for the lifetime of the 
> stream?
> 
> Or should I rather think once per stage?
> 
> 
> b) I actually need an ActorSystem and FlowMaterializer (for making an 
> Akka-HTTP request to store the data), not a DB connection - I presume this 
> does not changethe concept?
> 
> 
> Jan
> 
> 
> 
> >
> > On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 
> >  wrote:
> > Cody,
> >
> > On 12 Jun 2015, at 17:26, Cody Koeninger  wrote:
> >
> > > There are several database apis that use a thread local or singleton 
> > > reference to a connection pool (we use ScalikeJDBC currently, but there 
> > > are others).
> > >
> > > You can use mapPartitions earlier in the chain to make sure the 
> > > connection pool is set up on that executor, then use it inside 
> > > updateStateByKey
> > >
> >
> > Thanks. You are saying I should just make an arbitrary use of the 
> > ‘connection’ to invoke the ‘lazy’. E.g. like this:
> >
> > object SomeDB {
> >
> >   lazy val conn = new SomeDB( “some serializable config")
> >
> > }
> >
> >
> > Then somewhere else:
> >
> > theTrackingEvents.map(toPairs).mapPartitions(iter => iter.map( pair => {
> >   SomeDb.conn.init
> >   pair
> >}
> > )).updateStateByKey[Session](myUpdateFunction _)
> >
> >
> > An in myUpdateFunction
> >
> > def myUpdateFunction( …) {
> >
> > SomeDb.conn.store(  … )
> >
> > }
> >
> >
> > Correct?
> >
> > Jan
> >
> >
> >
> >
> > > On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 
> > >  wrote:
> > > Hi,
> > >
> > > I have a scenario with spark streaming, where I need to write to a 
> > > database from within updateStateByKey[1].
> > >
> > > That means that inside my update function I need a connection.
> > >
> > > I have so far understood that I should create a new (lazy) connection for 
> > > every partition. But since I am not working in foreachRDD I wonder where 
> > > I can iterate over the partitions.
> > >
> > > Should I use mapPartitions() somewhere up the chain?
> > >
> > > Jan
> > >
> > >
> > >
> > > [1] The use case being saving ‘done' sessions during web tracking.
> > >
> > >
> > > -
> > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > > For additional commands, e-mail: user-h...@spark.apache.org
> > >
> > >
> >
> >
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?

2015-06-12 Thread Juan Rodríguez Hortalá
Hi,

If you want I would be happy to work in this. I have worked with
KafkaUtils.createDirectStream before, in a pull request that wasn't
accepted https://github.com/apache/spark/pull/5367. I'm fluent with Python
and I'm starting to feel comfortable with Scala, so if someone opens a JIRA
I can take it.

Greetings,

Juan Rodriguez


2015-06-12 15:59 GMT+02:00 Cody Koeninger :

> The scala api has 2 ways of calling createDirectStream.  One of them
> allows you to pass a message handler that gets full access to the kafka
> MessageAndMetadata, including offset.
>
> I don't know why the python api was developed with only one way to call
> createDirectStream, but the first thing I'd look at would be adding that
> functionality back in.  If someone wants help creating a patch for that,
> just let me know.
>
> Dealing with offsets on a per-message basis may not be as efficient as
> dealing with them on a batch basis using the HasOffsetRanges interface...
> but if efficiency was a primary concern, you probably wouldn't be using
> Python anyway.
>
> On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao 
> wrote:
>
>> Scala KafkaRDD uses a trait to handle this problem, but it is not so easy
>> and straightforward in Python, where we need to have a specific API to
>> handle this, I'm not sure is there any simple workaround to fix this, maybe
>> we should think carefully about it.
>>
>> 2015-06-12 13:59 GMT+08:00 Amit Ramesh :
>>
>>>
>>> Thanks, Jerry. That's what I suspected based on the code I looked at.
>>> Any pointers on what is needed to build in this support would be great.
>>> This is critical to the project we are currently working on.
>>>
>>> Thanks!
>>>
>>>
>>> On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao 
>>> wrote:
>>>
 OK, I get it, I think currently Python based Kafka direct API do not
 provide such equivalence like Scala, maybe we should figure out to add this
 into Python API also.

 2015-06-12 13:48 GMT+08:00 Amit Ramesh :

>
> Hi Jerry,
>
> Take a look at this example:
> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2
>
> The offsets are needed because as RDDs get generated within spark the
> offsets move further along. With direct Kafka mode the current offsets are
> no more persisted in Zookeeper but rather within Spark itself. If you want
> to be able to use zookeeper based monitoring tools to keep track of
> progress, then this is needed.
>
> In my specific case we need to persist Kafka offsets externally so
> that we can continue from where we left off after a code deployment. In
> other words, we need exactly-once processing guarantees across code
> deployments. Spark does not support any state persistence across
> deployments so this is something we need to handle on our own.
>
> Hope that helps. Let me know if not.
>
> Thanks!
> Amit
>
>
> On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao 
> wrote:
>
>> Hi,
>>
>> What is your meaning of getting the offsets from the RDD, from my
>> understanding, the offsetRange is a parameter you offered to KafkaRDD, 
>> why
>> do you still want to get the one previous you set into?
>>
>> Thanks
>> Jerry
>>
>> 2015-06-12 12:36 GMT+08:00 Amit Ramesh :
>>
>>>
>>> Congratulations on the release of 1.4!
>>>
>>> I have been trying out the direct Kafka support in python but
>>> haven't been able to figure out how to get the offsets from the RDD. 
>>> Looks
>>> like the documentation is yet to be updated to include Python examples (
>>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html).
>>> I am specifically looking for the equivalent of
>>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2.
>>> I tried digging through the python code but could not find anything
>>> related. Any pointers would be greatly appreciated.
>>>
>>> Thanks!
>>> Amit
>>>
>>>
>>
>

>>>
>>
>


Extracting k-means cluster values along with centers?

2015-06-12 Thread Minnow Noir
Greetings.

I have been following some of the tutorials online for Spark k-means
clustering.  I would like to be able to just "dump" all the cluster values
and their centroids to text file so I can explore the data.  I have the
clusters as such:

val clusters = KMeans.train(parsedData, numClusters, numIterations)

clusters
res2: org.apache.spark.mllib.clustering.KMeansModel =
org.apache.spark.mllib.clustering.KMeansModel@59de440b

Is there a way to build something akin to a key value RDD that has the
center as the key and the array of values associated with that center as
the value? I don't see anything in the tutorials, API docs, or the
"Learning" book for how to do this.

Thank you


Re: NullPointerException with functions.rand()

2015-06-12 Thread Ted Yu
Created PR and verified the example given by Justin works with the change:
https://github.com/apache/spark/pull/6793

Cheers

On Wed, Jun 10, 2015 at 7:15 PM, Ted Yu  wrote:

> Looks like the NPE came from this line:
>   @transient protected lazy val rng = new XORShiftRandom(seed +
> TaskContext.get().partitionId())
>
> Could TaskContext.get() be null ?
>
> On Wed, Jun 10, 2015 at 6:15 PM, Justin Yip 
> wrote:
>
>> Hello,
>>
>> I am using 1.4.0 and found the following weird behavior.
>>
>> This case works fine:
>>
>> scala> sc.parallelize(Seq((1,2), (3, 100))).toDF.withColumn("index",
>> rand(30)).show()
>> +--+---+---+
>> |_1| _2|  index|
>> +--+---+---+
>> | 1|  2| 0.6662967911724369|
>> | 3|100|0.35734504984676396|
>> +--+---+---+
>>
>> However, when I use sqlContext.createDataFrame instead, I get a NPE:
>>
>> scala> sqlContext.createDataFrame(Seq((1,2), (3,
>> 100))).withColumn("index", rand(30)).show()
>> java.lang.NullPointerException
>> at
>> org.apache.spark.sql.catalyst.expressions.RDG.rng$lzycompute(random.scala:39)
>> at org.apache.spark.sql.catalyst.expressions.RDG.rng(random.scala:39)
>> ..
>>
>>
>> Does any one know why?
>>
>> Thanks.
>>
>> Justin
>>
>> --
>> View this message in context: NullPointerException with functions.rand()
>> 
>> Sent from the Apache Spark User List mailing list archive
>>  at Nabble.com.
>>
>
>


Re: [Spark 1.4.0]How to set driver's system property using spark-submit options?

2015-06-12 Thread Peng Cheng
Hi Andrew,

Thanks a lot! Indeed, it doesn't start with spark, the following properties
are read by implementation of the driver rather than spark conf:

--conf spooky.root=s3n://spooky- \
--conf spooky.checkpoint=s3://spooky-checkpoint \

This used to work from Spark 1.0.0 to 1.3.1. Do you know the new way to set
the same properties?

Yours Peng

On 12 June 2015 at 14:20, Andrew Or  wrote:

> Hi Peng,
>
> Setting properties through --conf should still work in Spark 1.4. From the
> warning it looks like the config you are trying to set does not start with
> the prefix "spark.". What is the config that you are trying to set?
>
> -Andrew
>
> 2015-06-12 11:17 GMT-07:00 Peng Cheng :
>
>> In Spark <1.3.x, the system property of the driver can be set by --conf
>> option, shared between setting spark properties and system properties.
>>
>> In Spark 1.4.0 this feature is removed, the driver instead log the
>> following
>> warning:
>>
>> Warning: Ignoring non-spark config property: xxx.xxx=v
>>
>> How do set driver's system property in 1.4.0? Is there a reason it is
>> removed without a deprecation warning?
>>
>> Thanks a lot for your advices.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: [Spark 1.4.0]How to set driver's system property using spark-submit options?

2015-06-12 Thread Ted Yu
This is the SPARK JIRA which introduced the warning:

[SPARK-7037] [CORE] Inconsistent behavior for non-spark config properties
in spark-shell and spark-submit

On Fri, Jun 12, 2015 at 4:34 PM, Peng Cheng  wrote:

> Hi Andrew,
>
> Thanks a lot! Indeed, it doesn't start with spark, the following
> properties are read by implementation of the driver rather than spark conf:
>
> --conf spooky.root=s3n://spooky- \
> --conf spooky.checkpoint=s3://spooky-checkpoint \
>
> This used to work from Spark 1.0.0 to 1.3.1. Do you know the new way to
> set the same properties?
>
> Yours Peng
>
> On 12 June 2015 at 14:20, Andrew Or  wrote:
>
>> Hi Peng,
>>
>> Setting properties through --conf should still work in Spark 1.4. From
>> the warning it looks like the config you are trying to set does not start
>> with the prefix "spark.". What is the config that you are trying to set?
>>
>> -Andrew
>>
>> 2015-06-12 11:17 GMT-07:00 Peng Cheng :
>>
>>> In Spark <1.3.x, the system property of the driver can be set by --conf
>>> option, shared between setting spark properties and system properties.
>>>
>>> In Spark 1.4.0 this feature is removed, the driver instead log the
>>> following
>>> warning:
>>>
>>> Warning: Ignoring non-spark config property: xxx.xxx=v
>>>
>>> How do set driver's system property in 1.4.0? Is there a reason it is
>>> removed without a deprecation warning?
>>>
>>> Thanks a lot for your advices.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Resource allocation configurations for Spark on Yarn

2015-06-12 Thread Jim Green
Hi Team,

Sharing one article which summarize the Resource allocation configurations
for Spark on Yarn:
Resource allocation configurations for Spark on Yarn


-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Dynamic allocator requests -1 executors

2015-06-12 Thread Patrick Woody
Hey all,

I've recently run into an issue where spark dynamicAllocation has asked for
-1 executors from YARN. Unfortunately, this raises an exception that kills
the executor-allocation thread and the application can't request more
resources.

Has anyone seen this before? It is spurious and the application usually
works, but when this gets hit it becomes unusable when getting stuck at
minimum YARN resources.

Stacktrace below.

Thanks!
-Pat

470 ERROR [2015-06-12 16:44:39,724] org.apache.spark.util.Utils: Uncaught
exception in thread spark-dynamic-executor-allocation-0
471 ! java.lang.IllegalArgumentException: Attempted to request a negative
number of executor(s) -1 from the cluster manager. Please specify a
positive number!
472 ! at
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:338)
~[spark-core_2.10-1.3.1.jar:1.
473 ! at
org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1137)
~[spark-core_2.10-1.3.1.jar:1.3.1]
474 ! at
org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294)
~[spark-core_2.10-1.3.1.jar:1.3.1]
475 ! at
org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263)
~[spark-core_2.10-1.3.1.jar:1.3.1]
476 ! at 
org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230)
~[spark-core_2.10-1.3.1.j
477 ! at
org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189)
~[spark-core_2.10-1.3.1.jar:1.3.1]
478 ! at
org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
~[spark-core_2.10-1.3.1.jar:1.3.1]
479 ! at
org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
~[spark-core_2.10-1.3.1.jar:1.3.1]
480 ! at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
~[spark-core_2.10-1.3.1.jar:1.3.1]
481 ! at
org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189)
[spark-core_2.10-1.3.1.jar:1.3.1]
482 ! at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
[na:1.7.0_71]
483 ! at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
[na:1.7.0_71]
484 ! at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
[na:1.7.0_71]
485 ! at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[na:1.7.0_71]
486 ! at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[na:1.7.0_71]
487 ! at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[na:1.7.0_71]


Re: [Spark 1.4.0]How to set driver's system property using spark-submit options?

2015-06-12 Thread Peng Cheng
Thanks all for your information. Andrew, I dig out one of your old post
which is relevant:

http://apache-spark-user-list.1001560.n3.nabble.com/little-confused-about-SPARK-JAVA-OPTS-alternatives-td5798.html

But didn't mention how to supply the properties that don't start with spark.

On 12 June 2015 at 19:39, Ted Yu  wrote:

> This is the SPARK JIRA which introduced the warning:
>
> [SPARK-7037] [CORE] Inconsistent behavior for non-spark config properties
> in spark-shell and spark-submit
>
> On Fri, Jun 12, 2015 at 4:34 PM, Peng Cheng  wrote:
>
>> Hi Andrew,
>>
>> Thanks a lot! Indeed, it doesn't start with spark, the following
>> properties are read by implementation of the driver rather than spark conf:
>>
>> --conf spooky.root=s3n://spooky- \
>> --conf spooky.checkpoint=s3://spooky-checkpoint \
>>
>> This used to work from Spark 1.0.0 to 1.3.1. Do you know the new way to
>> set the same properties?
>>
>> Yours Peng
>>
>> On 12 June 2015 at 14:20, Andrew Or  wrote:
>>
>>> Hi Peng,
>>>
>>> Setting properties through --conf should still work in Spark 1.4. From
>>> the warning it looks like the config you are trying to set does not start
>>> with the prefix "spark.". What is the config that you are trying to set?
>>>
>>> -Andrew
>>>
>>> 2015-06-12 11:17 GMT-07:00 Peng Cheng :
>>>
 In Spark <1.3.x, the system property of the driver can be set by --conf
 option, shared between setting spark properties and system properties.

 In Spark 1.4.0 this feature is removed, the driver instead log the
 following
 warning:

 Warning: Ignoring non-spark config property: xxx.xxx=v

 How do set driver's system property in 1.4.0? Is there a reason it is
 removed without a deprecation warning?

 Thanks a lot for your advices.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>
>


Re: Parquet Multiple Output

2015-06-12 Thread Cheng Lian
Spark 1.4 supports dynamic partitioning, you can first convert your RDD 
to a DataFrame and then save the contents partitioned by date column. 
Say you have a DataFrame df containing three columns a, b, and c, you 
may have something like this:


df.write.partitionBy("a", 
"b").mode("overwrite").parquet("path/to/file")


Cheng

On 6/13/15 5:31 AM, Xin Liu wrote:

Hi,

I have a scenario where I'd like to store a RDD using parquet format 
in many files, which corresponds to days, such as 2015/01/01, 
2015/02/02, etc.


So far I used this method

http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job

to store text files (then I have to read text files and convert to 
parquet and store again). Anyone has tried to store many parquet files 
from one RDD?


Thanks,
Xin




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL and Skewed Joins

2015-06-12 Thread Jon Walton
Greetings,

I am trying to implement a classic star schema ETL pipeline using Spark
SQL, 1.2.1.  I am running into problems with shuffle joins, for those
dimension tables which have skewed keys and are too large to let Spark
broadcast them.

I have a few questions

1. Can I split my queries so a unique, skewed key gets processed by by
multiple reducer steps?   I have tried this (using a UNION) but I am always
left with the 199/200 executors complete, which times out and even starts
throwing memory errors.   That single executor is processing 95% of the 80G
fact table for the single skewed key.

2. Does 1.3.2 or 1.4 have any enhancements that can help?   I tried to use
1.3.1 but SPARK-6967 prohibits me from doing so.Now that 1.4 is
available, would any of the JOIN enhancements help this situation?

3. Do you have suggestions for memory config if I wanted to broadcast 2G
dimension tables?   Is this even feasible?   Do table broadcasts wind up in
the heap or in dedicated storage space?

Thanks for your help,

Jon


Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?

2015-06-12 Thread Amit Ramesh
Hi Juan,

I have created a ticket for this:
https://issues.apache.org/jira/browse/SPARK-8337

Thanks!
Amit


On Fri, Jun 12, 2015 at 3:17 PM, Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com> wrote:

> Hi,
>
> If you want I would be happy to work in this. I have worked with
> KafkaUtils.createDirectStream before, in a pull request that wasn't
> accepted https://github.com/apache/spark/pull/5367. I'm fluent with
> Python and I'm starting to feel comfortable with Scala, so if someone opens
> a JIRA I can take it.
>
> Greetings,
>
> Juan Rodriguez
>
>
> 2015-06-12 15:59 GMT+02:00 Cody Koeninger :
>
>> The scala api has 2 ways of calling createDirectStream.  One of them
>> allows you to pass a message handler that gets full access to the kafka
>> MessageAndMetadata, including offset.
>>
>> I don't know why the python api was developed with only one way to call
>> createDirectStream, but the first thing I'd look at would be adding that
>> functionality back in.  If someone wants help creating a patch for that,
>> just let me know.
>>
>> Dealing with offsets on a per-message basis may not be as efficient as
>> dealing with them on a batch basis using the HasOffsetRanges interface...
>> but if efficiency was a primary concern, you probably wouldn't be using
>> Python anyway.
>>
>> On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao 
>> wrote:
>>
>>> Scala KafkaRDD uses a trait to handle this problem, but it is not so
>>> easy and straightforward in Python, where we need to have a specific API to
>>> handle this, I'm not sure is there any simple workaround to fix this, maybe
>>> we should think carefully about it.
>>>
>>> 2015-06-12 13:59 GMT+08:00 Amit Ramesh :
>>>

 Thanks, Jerry. That's what I suspected based on the code I looked at.
 Any pointers on what is needed to build in this support would be great.
 This is critical to the project we are currently working on.

 Thanks!


 On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao 
 wrote:

> OK, I get it, I think currently Python based Kafka direct API do not
> provide such equivalence like Scala, maybe we should figure out to add 
> this
> into Python API also.
>
> 2015-06-12 13:48 GMT+08:00 Amit Ramesh :
>
>>
>> Hi Jerry,
>>
>> Take a look at this example:
>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2
>>
>> The offsets are needed because as RDDs get generated within spark the
>> offsets move further along. With direct Kafka mode the current offsets 
>> are
>> no more persisted in Zookeeper but rather within Spark itself. If you 
>> want
>> to be able to use zookeeper based monitoring tools to keep track of
>> progress, then this is needed.
>>
>> In my specific case we need to persist Kafka offsets externally so
>> that we can continue from where we left off after a code deployment. In
>> other words, we need exactly-once processing guarantees across code
>> deployments. Spark does not support any state persistence across
>> deployments so this is something we need to handle on our own.
>>
>> Hope that helps. Let me know if not.
>>
>> Thanks!
>> Amit
>>
>>
>> On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao > > wrote:
>>
>>> Hi,
>>>
>>> What is your meaning of getting the offsets from the RDD, from my
>>> understanding, the offsetRange is a parameter you offered to KafkaRDD, 
>>> why
>>> do you still want to get the one previous you set into?
>>>
>>> Thanks
>>> Jerry
>>>
>>> 2015-06-12 12:36 GMT+08:00 Amit Ramesh :
>>>

 Congratulations on the release of 1.4!

 I have been trying out the direct Kafka support in python but
 haven't been able to figure out how to get the offsets from the RDD. 
 Looks
 like the documentation is yet to be updated to include Python examples 
 (
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html).
 I am specifically looking for the equivalent of
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2.
 I tried digging through the python code but could not find anything
 related. Any pointers would be greatly appreciated.

 Thanks!
 Amit


>>>
>>
>

>>>
>>
>


Reliable SQS Receiver for Spark Streaming

2015-06-12 Thread Michal Čizmazia
I would like to have a Spark Streaming SQS Receiver which deletes SQS
messages only after they were successfully stored on S3.

For this a Custom Receiver can be implemented with the semantics of
the Reliable Receiver.

The store(multiple-records) call blocks until the given records have
been stored and replicated inside Spark.

If the write-ahead logs are enabled, all the data received from a
receiver gets written into a write ahead log in the configuration
checkpoint directory. The checkpoint directory can be pointed to S3.

After the store(multiple-records) blocking call finishes, are the
records already stored in the checkpoint directory (and thus can be
safely deleted from SQS)?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: BigDecimal problem in parquet file

2015-06-12 Thread Davies Liu
Maybe it's related to a bug, which is fixed by
https://github.com/apache/spark/pull/6558 recently.

On Fri, Jun 12, 2015 at 5:38 AM, Bipin Nag  wrote:
> Hi Cheng,
>
> Yes, some rows contain unit instead of decimal values. I believe some rows
> from original source I had don't have any value i.e. it is null. And that
> shows up as unit. How does the spark-sql or parquet handle null in place of
> decimal values, assuming that field is nullable. I will have to change it
> properly.
>
> Thanks for helping out.
> Bipin
>
> On 12 June 2015 at 14:57, Cheng Lian  wrote:
>>
>> On 6/10/15 8:53 PM, Bipin Nag wrote:
>>
>> Hi Cheng,
>>
>> I am using Spark 1.3.1 binary available for Hadoop 2.6. I am loading an
>> existing parquet file, then repartitioning and saving it. Doing this gives
>> the error. The code for this doesn't look like causing  problem. I have a
>> feeling the source - the existing parquet is the culprit.
>>
>> I created that parquet using a jdbcrdd (pulled from microsoft sql server).
>> First I saved jdbcrdd as an objectfile on disk. Then loaded it again, made a
>> dataframe from it using a schema then saved it as a parquet.
>>
>> Following is the code :
>> For saving jdbcrdd:
>>  name - fullqualifiedtablename
>>  pk - string for primarykey
>>  pklast - last id to pull
>> val myRDD = new JdbcRDD( sc, () =>
>> DriverManager.getConnection(url,username,password) ,
>> "SELECT * FROM " + name + " WITH (NOLOCK) WHERE ? <= "+pk+" and
>> "+pk+" <= ?",
>> 1, lastpk, 1, JdbcRDD.resultSetToObjectArray)
>> myRDD.saveAsObjectFile("rawdata/"+name);
>>
>> For applying schema and saving the parquet:
>> val myschema = schemamap(name)
>> val myrdd =
>> sc.objectFile[Array[Object]]("/home/bipin/rawdata/"+name).map(x =>
>> org.apache.spark.sql.Row(x:_*))
>>
>> Have you tried to print out x here to check its contents? My guess is that
>> x actually contains unit values. For example, the follow Spark shell code
>> can reproduce a similar exception:
>>
>> import org.apache.spark.sql.types._
>> import org.apache.spark.sql.Row
>>
>> val schema = StructType(StructField("dec", DecimalType(10, 0)) :: Nil)
>> val rdd = sc.parallelize(1 to 10).map(_ => Array(())).map(arr => Row(arr:
>> _*))
>> val df = sqlContext.createDataFrame(rdd, schema)
>>
>> df.saveAsParquetFile("file:///tmp/foo")
>>
>> val actualdata = sqlContext.createDataFrame(myrdd, myschema)
>> actualdata.saveAsParquetFile("/home/bipin/stageddata/"+name)
>>
>> Schema structtype can be made manually, though I pull table's metadata and
>> make one. It is a simple string translation (see sql docs and/or spark
>> datatypes)
>>
>> That is how I created the parquet file. Any help to solve the issue is
>> appreciated.
>> Thanks
>> Bipin
>>
>>
>> On 9 June 2015 at 20:44, Cheng Lian  wrote:
>>>
>>> Would you please provide a snippet that reproduce this issue? What
>>> version of Spark were you using?
>>>
>>> Cheng
>>>
>>> On 6/9/15 8:18 PM, bipin wrote:

 Hi,
 When I try to save my data frame as a parquet file I get the following
 error:

 java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to
 org.apache.spark.sql.types.Decimal
 at

 org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:220)
 at

 org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192)
 at

 org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171)
 at

 org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134)
 at

 parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
 at
 parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
 at
 parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
 at

 org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$writeShard$1(newParquet.scala:671)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 How to fix this problem ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Big

[Spark] What is the most efficient way to do such a join and column manipulation?

2015-06-12 Thread Rex X
Hi,

I want to use spark to select N columns, top M rows of all csv files under
a folder.

To be concrete, say we have a folder with thousands of tab-delimited csv
files with following attributes format (each csv file is about 10GB):

idnameaddresscity...
1Mattadd1LA...
2Willadd2LA...
3Lucyadd3SF...
...

And we have a lookup table based on "name" above

namegender
MattM
LucyF
...

Now we are interested to output from top 100K rows of each csv file into
following format:

idnamegender
1MattM
...

Can we use pyspark to efficiently handle this?


Re: Spark SQL and Skewed Joins

2015-06-12 Thread Michael Armbrust
>
> 2. Does 1.3.2 or 1.4 have any enhancements that can help?   I tried to use
> 1.3.1 but SPARK-6967 prohibits me from doing so.Now that 1.4 is
> available, would any of the JOIN enhancements help this situation?
>

I would try Spark 1.4 after running "SET
spark.sql.planner.sortMergeJoin=true".  Please report back if this works
for you.


Are there ways to restrict what parameters users can set for a Spark job?

2015-06-12 Thread YaoPau
For example, Hive lets you set a whole bunch of parameters (# of reducers, #
of mappers, size of reducers, cache size, max memory to use for a join),
while Impala gives users a much smaller subset of parameters to work with,
which makes it nice to give to a BI team.

Is there a way to restrict which parameters a user can set for a Spark job? 
Maybe to cap the # of executors, or cap the memory for each executor, or to
enforce a default setting no matter what parameters are used.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Are-there-ways-to-restrict-what-parameters-users-can-set-for-a-Spark-job-tp23301.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Building scaladoc using "build/sbt unidoc" failure

2015-06-12 Thread Reynold Xin
Try build/sbt clean first.


On Tue, May 26, 2015 at 4:45 PM, Justin Yip  wrote:

> Hello,
>
> I am trying to build scala doc from the 1.4 branch. But it failed due to
> [error] (sql/compile:compile) java.lang.AssertionError: assertion failed:
> List(object package$DebugNode, object package$DebugNode)
>
> I followed the instruction on github
>  and used the
> following command:
>
> $ build/sbt unidoc
>
> Please see attachment for detailed error. Did I miss anything?
>
> Thanks.
>
> Justin
>
>
> *unidoc_error.txt* (30K) Download Attachment
> 
>
> --
> View this message in context: Building scaladoc using "build/sbt unidoc"
> failure
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: --jars not working?

2015-06-12 Thread Alex Nakos
Are you using a build for scala 2.11? I’ve encountered the same behaviour
trying to run on Yarn with scala 2.11 and Spark 1.3.0, 1.3.1 and 1.4.0.RC3
and raised JIRA issue here: https://issues.apache.org/jira/browse/SPARK-7944.
Would be good to know if this is identical to what you’re seeing on Mesos.

Thanks
Alex

On Fri, Jun 12, 2015 at 8:45 PM, Akhil Das 
wrote:

> You can verify if the jars are shipped properly by looking at the driver
> UI (running on 4040) Environment tab.
>
> Thanks
> Best Regards
>
> On Sat, Jun 13, 2015 at 12:43 AM, Jonathan Coveney 
> wrote:
>
>> Spark version is 1.3.0 (will upgrade as soon as we upgrade past mesos
>> 0.19.0)...
>>
>> Regardless, I'm running into a really weird situation where when I pass
>> --jars to bin/spark-shell I can't reference those classes on the repl. Is
>> this expected? The logs even tell me that my jars have been added, and yet
>> the classes inside of them are not available.
>>
>> Am I missing something obvious?
>>
>
>